[2/7] RFC: the task pool

Tom Tromey tromey@redhat.com
Mon Jun 14 18:24:00 GMT 2010


This patch adds the task pool data structure.

This is the only module in gdb that actively uses threads (aside from
some __thread markers here and there).

I think the code is fairly self-explanatory, I tried to make the
comments in the new header very complete.

This is the only module that does any locking.  I tried to make it clear
so that one can easily verify that there are no races or deadlocks.

taskpool.c includes a bunch of wrapper functions for pthread primitives.
This is how we handle building on non-threaded hosts.  It is also how we
can handle portability to non-pthread systems.

Tom

>From 68310dec5ff2995a5e25cd287eba9a66c67eeeb3 Mon Sep 17 00:00:00 2001
From: Tom Tromey <tromey@redhat.com>
Date: Fri, 11 Jun 2010 14:54:57 -0600
Subject: [PATCH 2/7] add task pool

2010-06-14  Tom Tromey  <tromey@redhat.com>

	* taskpool.h: New file.
	* taskpool.c: New file.
	* Makefile.in (SFILES): Add taskpool.c.
	(HFILES_NO_SRCDIR): Add taskpool.h.
	(COMMON_OBS): Add taskpool.o.
---
 gdb/Makefile.in |    6 +-
 gdb/taskpool.c  |  609 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 gdb/taskpool.h  |   96 +++++++++
 3 files changed, 708 insertions(+), 3 deletions(-)
 create mode 100644 gdb/taskpool.c
 create mode 100644 gdb/taskpool.h

diff --git a/gdb/Makefile.in b/gdb/Makefile.in
index cbe3ed9..51fc1bd 100644
--- a/gdb/Makefile.in
+++ b/gdb/Makefile.in
@@ -695,7 +695,7 @@ SFILES = ada-exp.y ada-lang.c ada-typeprint.c ada-valprint.c ada-tasks.c \
 	stabsread.c stack.c std-regs.c symfile.c symfile-mem.c symmisc.c \
 	symtab.c \
 	target.c target-descriptions.c target-memory.c \
-	thread.c top.c tracepoint.c \
+	thread.c taskpool.c top.c tracepoint.c \
 	trad-frame.c \
 	tramp-frame.c \
 	typeprint.c \
@@ -777,7 +777,7 @@ annotate.h sim-regno.h dictionary.h dfp.h main.h frame-unwind.h	\
 remote-fileio.h i386-linux-tdep.h vax-tdep.h objc-lang.h \
 sentinel-frame.h bcache.h symfile.h windows-tdep.h linux-tdep.h \
 gdb_usleep.h jit.h xml-syscall.h ada-operator.inc microblaze-tdep.h \
-psymtab.h psympriv.h
+psymtab.h psympriv.h taskpool.h
 
 # Header files that already have srcdir in them, or which are in objdir.
 
@@ -863,7 +863,7 @@ COMMON_OBS = $(DEPFILES) $(CONFIG_OBS) $(YYOBJ) \
 	prologue-value.o memory-map.o xml-support.o xml-syscall.o \
 	target-descriptions.o target-memory.o xml-tdesc.o xml-builtin.o \
 	inferior.o osdata.o gdb_usleep.o record.o gcore.o \
-	jit.o progspace.o
+	jit.o progspace.o taskpool.o
 
 # Definitions for the syscall's XML files and dir
 XML_SYSCALLS_DIR = syscalls/
diff --git a/gdb/taskpool.c b/gdb/taskpool.c
new file mode 100644
index 0000000..62156b0
--- /dev/null
+++ b/gdb/taskpool.c
@@ -0,0 +1,609 @@
+/* Task pool implementation.
+
+   Copyright (C) 2009, 2010 Free Software Foundation, Inc.
+
+   This file is part of GDB.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+#include "defs.h"
+#include "exceptions.h"
+#include "vec.h"
+#include "taskpool.h"
+#include "bfd.h"
+
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+#include <time.h>
+#include <sys/time.h>
+#include <signal.h>
+
+/* To port the threading code to a new host, you must define types and
+   functions for the taskpool implementation to use.  You can use the
+   pthreads code as a guide.
+  
+   We use pthread_create as a sentinel for all the pthread
+   functionality.  */
+#ifdef HAVE_PTHREAD_CREATE
+
+/* The type of a mutex.  */
+typedef pthread_mutex_t gdb_mutex;
+
+/* The type of a condition variable.  */
+typedef pthread_cond_t gdb_condition;
+
+/* Initialize a mutex.  */
+static inline void
+gdb_mutex_init (gdb_mutex *m)
+{
+  pthread_mutex_init (m, NULL);
+}
+
+/* Destroy a mutex.  */
+static inline void
+gdb_mutex_destroy (gdb_mutex *m)
+{
+  pthread_mutex_destroy (m);
+}
+
+/* Acquire a mutex.  */
+static inline void
+gdb_mutex_lock (gdb_mutex *m)
+{
+  pthread_mutex_lock (m);
+}
+
+/* Try for N_SECONDS seconds to acquire the lock M.  If the lock is
+   acquired, return 0.  Otherwise, return nonzero.  */
+static inline int
+gdb_mutex_timed_lock (gdb_mutex *m, unsigned int n_seconds)
+{
+  struct timespec ts;
+  struct timeval tv;
+
+  gettimeofday (&tv, NULL);
+  ts.tv_sec = tv.tv_sec + n_seconds;
+  ts.tv_nsec = tv.tv_usec * 1000;
+
+  return pthread_mutex_timedlock (m, &ts);
+}
+
+/* Release a mutex.  */
+static inline void
+gdb_mutex_unlock (gdb_mutex *m)
+{
+  pthread_mutex_unlock (m);
+}
+
+/* Initialize the condition variable C.  */
+static inline void
+gdb_cond_init (gdb_condition *c)
+{
+  pthread_cond_init (c, NULL);
+}
+
+/* Wait on the condition variable C.  The mutex M is associated with
+   C; it will already have been acquired by the caller.  Wait for up
+   to N_SECONDS seconds; return 0 on success, nonzero if the condition
+   was never triggered.  */
+static inline int
+gdb_cond_timed_wait (gdb_condition *c, gdb_mutex *m, unsigned int n_seconds)
+{
+  struct timespec ts;
+  struct timeval tv;
+
+  gettimeofday (&tv, NULL);
+  ts.tv_sec = tv.tv_sec + n_seconds;
+  ts.tv_nsec = tv.tv_usec * 1000;
+
+  return pthread_cond_timedwait (c, m, &ts);
+}
+
+/* Signal the condition variable.  */
+static inline void
+gdb_cond_signal (gdb_condition *c)
+{
+  pthread_cond_signal (c);
+}
+
+/* Start a new thread.  A new thread should be run in the background
+   -- it should not prevent gdb from exiting.  Also, care must be
+   taken to arrange for signals to be delivered to gdb's main thread.
+   FUNC is called in the new thread, with DATUM as its argument.  */
+static inline void
+gdb_create_new_thread (void *(func) (void *), void *datum)
+{
+  pthread_attr_t attr;
+  pthread_t tem;
+  sigset_t block, old;
+
+  /* Temporarily block every signal.  This is used to prevent the
+     worker threads from accepting any signal that must be
+     delivered to the main thread.  */
+  sigfillset (&block);
+  pthread_sigmask (SIG_BLOCK, &block, &old);
+
+  pthread_attr_init (&attr);
+  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+  pthread_create (&tem, &attr, func, datum);
+
+  pthread_attr_destroy (&attr);
+  pthread_sigmask (SIG_SETMASK, &old, NULL);
+}
+
+/* Return the number of processors on this system, or -1 if this is
+   unknown.  If this function returns 0, gdb assumes that no threads
+   are available at all.  */
+static inline int
+gdb_n_processors (void)
+{
+#ifdef _SC_NPROCESSORS_ONLN
+  int n = (int) sysconf (_SC_NPROCESSORS_ONLN);
+  if (n == 0)
+    n = -1;
+  return n;
+#else
+  return -1;
+#endif
+}
+
+/* Return the current thread's ID.  Should never return NULL.  */
+static inline void *
+gdb_self (void)
+{
+  return (void *) /* FIXME */ pthread_self ();
+}
+
+#else /* HAVE_PTHREAD_CREATE */
+
+/* Threadless implementation.  */
+
+typedef int gdb_mutex;
+typedef int gdb_condition;
+
+static inline void
+gdb_mutex_init (gdb_mutex *m)
+{
+  *m = 0;
+}
+
+static inline void
+gdb_mutex_destroy (gdb_mutex *m)
+{
+}
+
+static inline void
+gdb_mutex_lock (gdb_mutex *m)
+{
+}
+
+static inline int
+gdb_mutex_timed_lock (gdb_mutex *m, unsigned int n_seconds)
+{
+  return 0;
+}
+
+static inline void
+gdb_mutex_unlock (gdb_mutex *m)
+{
+}
+
+static inline void
+gdb_cond_init (gdb_condition *c)
+{
+  *c = 0;
+}
+
+static inline int
+gdb_cond_timed_wait (gdb_condition *c, gdb_mutex *m, unsigned int n_seconds)
+{
+  return 1;
+}
+
+static inline void
+gdb_cond_signal (gdb_condition *c)
+{
+}
+
+static inline void
+gdb_create_new_thread (void *(func) (void *), void *datum)
+{
+  internal_error (__FILE__, __LINE__,
+		  _("should not be possible to start a new thread"));
+}
+
+static inline int
+gdb_n_processors (void)
+{
+  return 0;
+}
+
+static inline void *
+gdb_self (void)
+{
+  return "hi bob";
+}
+
+#endif /* HAVE_PTHREAD_CREATE */
+
+
+
+/* Thread functions for BFD.  */
+
+static void *
+gdb_bfd_self (void)
+{
+  return gdb_self ();
+}
+
+static void *
+gdb_bfd_create_mutex (void)
+{
+  gdb_mutex *m = XNEW (gdb_mutex);
+  gdb_mutex_init (m);
+  return m;
+}
+
+static void
+gdb_bfd_lock_mutex (void *m)
+{
+  gdb_mutex_lock (m);
+}
+
+static void
+gdb_bfd_unlock_mutex (void *m)
+{
+  gdb_mutex_unlock (m);
+}
+
+static struct bfd_thread_info gdb_bfd_thread_info =
+{
+  gdb_bfd_self,
+  gdb_bfd_create_mutex,
+  gdb_bfd_lock_mutex,
+  gdb_bfd_unlock_mutex
+};
+
+
+
+typedef struct task *task_p;
+
+DEF_VEC_P (task_p);
+
+/* A task pool.  This holds a number of threads, plus a task queue.  */
+struct task_pool
+{
+  /* The task queue.  */
+  VEC (task_p) *queue;
+
+  /* The queue lock.  This must be held when modifying any element of
+     the task pool.  */
+  gdb_mutex lock;
+
+  /* The queue condition.  This is signaled when a new task is pushed
+     on the queue.  */
+  gdb_condition condition;
+
+  /* The number of worker threads present active.  */
+  int worker_count;
+  /* The number of worker threads waiting for a new task.  */
+  int waiters;
+  /* The maximum number of worker threads.  */
+  int max_workers;
+};
+
+/* A single task in a task pool.  */
+struct task
+{
+  /* The task's message.  */
+  char *message;
+
+  /* The task's priority.  */
+  unsigned long priority;
+
+  /* The function that implements this task.  It either returns a
+     result, or throws an exception.  */
+  void *(*function) (void *arg);
+  /* The user's data that is passed to the function.  If this needs
+     some form of destruction, then FUNCTION is responsible for
+     destroying it.  */
+  void *user_data;
+
+  /* If zero, the task has not been run.  If greater than zero, the
+     task completed successfully, and the result is in RESULT.  If
+     less than zero, then the task failed and the exception is in
+     EXCEPTION.  */
+  int completed;
+  /* The task's result.  */
+  void *result;
+  /* Any exception that was thrown.  */
+  struct gdb_exception exception;
+
+  /* The task lock.  This must be acquired before modifying any field
+     of this task.  */
+  gdb_mutex lock;
+};
+
+/* Acquire the task lock.  If MAIN_THREAD is false, just lock.  If
+   MAIN_THREAD is true, print the task message if the task is taking
+   "too long".  Return 1 if the message was printed, 0 otherwise.  */
+static int
+acquire_task_lock (struct task *task, int main_thread)
+{
+  if (!main_thread)
+    {
+      gdb_mutex_lock (&task->lock);
+      return 0;
+    }
+  else
+    {
+      /* If the task returns in less than 1 second, just go on.  */
+      if (! gdb_mutex_timed_lock (&task->lock, 1))
+	return 0;
+
+      /* Print the message.  */
+      puts_unfiltered (task->message);
+      gdb_flush (gdb_stdout);
+      gdb_mutex_lock (&task->lock);
+
+      return 1;
+    }
+}
+
+/* Run TASK and set its result fields accordingly.  Returns true if
+   the task should now be destroyed, false otherwise.  If MAIN_THREAD
+   is true, possibly print the message to the user.  */
+static int
+run_task (struct task *task, int main_thread)
+{
+  int result, message_printed;
+
+  message_printed = acquire_task_lock (task, main_thread);
+  if (!task->completed)
+    {
+      volatile struct gdb_exception except;
+
+      /* If we are running the task in the main thread, print the
+	 message unconditionally.  */
+      if (main_thread)
+	{
+	  gdb_assert (!message_printed);
+
+	  puts_unfiltered (task->message);
+	  gdb_flush (gdb_stdout);
+	  message_printed = 1;
+	}
+
+      TRY_CATCH (except, RETURN_MASK_ALL)
+	{
+	  task->result = task->function (task->user_data);
+	}
+      if (except.reason < 0)
+	{
+	  task->completed = -1;
+	  task->exception = except;
+	  task->exception.message = xstrdup (except.message);
+	}
+      else
+	task->completed = 1;
+      result = 0;
+    }
+  else
+    result = 1;
+
+  if (message_printed)
+    {
+      puts_unfiltered ("done\n");
+      gdb_flush (gdb_stdout);
+    }
+
+  gdb_mutex_unlock (&task->lock);
+  return result;
+}
+
+/* Destroy a task.  */
+static void
+free_task (struct task *task)
+{
+  gdb_mutex_destroy (&task->lock);
+  xfree (task->message);
+  xfree (task);
+}
+
+/* The body of a worker thread.  It takes a task from the list, runs
+   it, and repeats.  */
+static void *
+worker_thread (void *p)
+{
+  struct task_pool *pool = p;
+  while (1)
+    {
+      int r;
+      struct task *job;
+
+      gdb_mutex_lock (&pool->lock);
+
+      r = 0;
+      while (VEC_empty (task_p, pool->queue) && r == 0)
+	{
+	  ++pool->waiters;
+	  /* Wait up to 15 seconds for a new job.  If we have a
+	     spurious wakeup we will wait the whole time again; this
+	     doesn't seem very important.  */
+	  r = gdb_cond_timed_wait (&pool->condition, &pool->lock, 15);
+	  --pool->waiters;
+	}
+
+      if (VEC_empty (task_p, pool->queue))
+	{
+	  job = NULL;
+	  --pool->worker_count;
+	}
+      else
+	job = VEC_pop (task_p, pool->queue);
+
+      gdb_mutex_unlock (&pool->lock);
+
+      if (!job)
+	{
+	  /* We timed out waiting for a job, so exit.  */
+	  break;
+	}
+
+      if (run_task (job, 0))
+	free_task (job);
+    }
+
+  clear_exception_cache ();
+
+  bfd_thread_exit ();
+
+  /* The result is ignored.  */
+  return NULL;
+}
+
+/* A qsort comparison function that compares task priorities.  */
+static int
+compare_priorities (const void *a, const void *b)
+{
+  const struct task * const *ta = a;
+  const struct task * const *tb = b;
+  return ((*ta)->priority < (*tb)->priority ? -1
+	  : ((*ta)->priority > (*tb)->priority ? 1 : 0));
+}
+
+/* Add a task to the task pool POOL, and return it.  */
+struct task *
+create_task (struct task_pool *pool, char *message,
+	     unsigned long priority,
+	     void *(*function) (void *), void *user_data)
+{
+  struct task *result;
+
+  result = xmalloc (sizeof (struct task));
+
+  result->message = message;
+  result->priority = priority;
+  result->function = function;
+  result->user_data = user_data;
+  result->completed = 0;
+  result->result = NULL;
+  gdb_mutex_init (&result->lock);
+
+  /* Acquire the pool lock while operating on the pool.  */
+  gdb_mutex_lock (&pool->lock);
+
+  if (pool->max_workers == 0)
+    {
+      /* No threads, so run the task right away.  We pretend that this
+	 is not the main thread, because we don't want the message to
+	 be printed.  */
+      run_task (result, 0);
+    }
+  else
+    {	
+      /* Push the task and then sort by priority.  */
+      VEC_safe_push (task_p, pool->queue, result);
+      qsort (VEC_address (task_p, pool->queue),
+	     VEC_length (task_p, pool->queue),
+	     sizeof (task_p),
+	     compare_priorities);
+
+      /* If a thread is already waiting for a job, or if we are
+	 already running the maximum number of worker threads, then we
+	 just notify a waiting thread.  Otherwise, we start a new
+	 thread.  */
+      if (pool->waiters == 0 && pool->worker_count < pool->max_workers)
+	{
+	  gdb_create_new_thread (worker_thread, pool);
+	  ++pool->worker_count;
+	}
+      else
+	gdb_cond_signal (&pool->condition);
+    }
+
+  gdb_mutex_unlock (&pool->lock);
+
+  return result;
+}
+
+/* Return TASK's answer, or throw an exception.  */
+void *
+get_task_answer (struct task *task)
+{
+  struct task *iter;
+  int i, destroy;
+  void *result;
+
+  destroy = run_task (task, 1);
+
+  if (task->completed < 0)
+    {
+      struct gdb_exception exc = task->exception;
+      if (destroy)
+	free_task (task);
+      /* We do this in a funny way to avoid leaking the message.  */
+      make_cleanup (xfree, (char *) exc.message);
+      throw_error (exc.error, "%s", exc.message);
+    }
+
+  result = task->result;
+  if (destroy)
+    free_task (task);
+  return result;
+}
+
+void
+cancel_task (struct task *task)
+{
+  int destroy;
+
+  gdb_mutex_lock (&task->lock);
+  destroy = task->completed;
+  if (!task->completed)
+    task->completed = 1;
+  gdb_mutex_unlock (&task->lock);
+  if (destroy)
+    free_task (task);
+}
+
+/* Create a new task pool.  */
+struct task_pool *
+create_task_pool (int max_workers)
+{
+  struct task_pool *result;
+
+  result = xmalloc (sizeof (struct task_pool));
+  memset (result, 0, sizeof (struct task_pool));
+
+  gdb_mutex_init (&result->lock);
+  gdb_cond_init (&result->condition);
+
+  if (max_workers == -1)
+    {
+      max_workers = gdb_n_processors ();
+      if (max_workers < 0)
+	max_workers = 5;
+      else if (max_workers > 0)
+	max_workers *= 2;
+    }
+  result->max_workers = max_workers;
+
+  if (max_workers != 0)
+    bfd_init_threads (&gdb_bfd_thread_info);
+
+  return result;
+}
diff --git a/gdb/taskpool.h b/gdb/taskpool.h
new file mode 100644
index 0000000..c1d2476
--- /dev/null
+++ b/gdb/taskpool.h
@@ -0,0 +1,96 @@
+/* Task pool implementation.
+
+   Copyright (C) 2009, 2010 Free Software Foundation, Inc.
+
+   This file is part of GDB.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+#ifndef TASKPOOL_H
+#define TASKPOOL_H
+
+struct task_pool;
+struct task;
+
+/* The GDB task pool provides a simple way to hide work in the
+   background.
+
+   A task pool manages a number of threads, with the maximum number
+   set at the time the pool is created.  It holds a queue of tasks,
+   sorted by priority.  A worker thread simply loops, pulling the next
+   task off the queue and running it.
+
+   There is currently no way to destroy a task pool.
+
+   The threads associated with a task pool will exit if no jobs are
+   available for a certain amount of time.  They will be restarted as
+   needed.
+
+   Task pools are designed to work even if threads are not available
+   on the host.  See get_task_answer.
+   
+   If MAX_WORKERS is -1, then create_task_pool tries to choose a
+   number based on the number of available processors.  */
+struct task_pool *create_task_pool (int max_workers);
+   
+/* A task is simply a user-provided function with some user-provided
+   data.  When run it can either throw an exception, or return a
+   result.
+
+   A task has an associated message for the user.  This must be
+   translated by the caller.  It should end with "...".  It is only
+   printed if the task pool determines that the user is waiting for
+   the task to complete.  When the task completes, "done" is printed
+   after the message.  The message should be xmalloc()d.
+
+   A task has a priority, also provided by the user.  Priorities must
+   be comparable within a given pool but are not otherwise meaningful
+   to the pool code.
+
+   Every task is associated with a pool.
+
+   It is up to the task's creator to ensure that the task is in fact
+   thread-safe.  Be warned!  In GDB this can be tricky due to all the
+   lurking global variables.
+
+   A task is not destroyed until either cancel_task or get_task_answer
+   has been called on it.  */
+struct task *create_task (struct task_pool *pool,
+			  char *message,
+			  unsigned long priority,
+			  void *(*function) (void *),
+			  void *user_data);
+
+/* get_task_answer returns the result of a task.  This may only be
+   called once per task, and cannot be called if cancel_task has
+   already been called.
+
+   If the task's function returned an answer, this will return it.
+   Otherwise, if the task's function threw an exception, this function
+   will throw the same exception.
+
+   If the task was not yet processed by some worker thread, it will be
+   processed immediately in the thread that calls get_task_answer.
+   This also enables task pools to work even when threads are
+   unavailable.  */
+void *get_task_answer (struct task *task);
+
+/* Cancel a task.  This may only be called once per task, and cannot
+   be called if get_task_answer has already been called.
+   
+   If the task is already running, this waits until the task has
+   completed.  */
+void cancel_task (struct task *task);
+
+#endif /* TASKPOOL_H */
-- 
1.6.2.5



More information about the Gdb-patches mailing list