[PATCH] abg-workers: Rework the worker queue to improve concurrent behaviour

Matthias Maennich via libabigail libabigail@sourceware.org
Wed Jan 1 00:00:00 GMT 2020


This patch refactors the abigail::workers::queue and
abigail::workers::worker implementations to avoid holding locking
primitives longer than necessary.

In particular, the queue_cond_mutex was held during the entiry worker
runtime, effectively serializing the workers. Hence, use a mutex+cond
pair for each, the input and output queue and only synchronize around
the interaction with their corresponding queues. The
tasks_todo_(mutex|cond) are meant to synchronize scheduling and
distribution of work among workers, while tasks_done_(mutex|cond) are
used for synchronizing threads when putting back the tasks to the output
queue and to hold back threads waiting for the queue and workers to
drain.

Along that way, I did some cleanup that was now possible.
 - Move entire implementation of abigail::workers::task into header.
 - Make default_notify a static member.
 - Replace the multiple constructors with one with default arguments.

	* include/abg-workers.h (workers::task): move entire
	implementation to header and drop superfluous forward declaration.
	* src/abg-workers.cc (workers::task):: Likewise.
	(workers::queue::priv): Drop queue_cond_mutex, rename queue_cond
	to tasks_todo_cond, add task_done_cond, make default_notify
	static.
	(workers::queue::priv::priv): Add default arguments to fully
	qualified constructor, drop the remaining ones.
	(workers::queue:prive::more_tasks_to_execute): Drop method.
	(workers::queue:prive::schedule_task): Do not synchronize access
	to the queue condition variable, but only on the mutex.
	(do_bring_workers_down): Likewise. Also await tasks_done to be
	empty.
	(workers::queue:prive::worker::wait_to_execute_a_task): Await
	tasks on the tasks_todo with tasks_todo_(cond|mutex) and signal
	task completion to tasks_done_cond.

Signed-off-by: Matthias Maennich <maennich@google.com>
---
 include/abg-workers.h |   8 +--
 src/abg-workers.cc    | 124 ++++++++++++------------------------------
 2 files changed, 39 insertions(+), 93 deletions(-)

diff --git a/include/abg-workers.h b/include/abg-workers.h
index 2892837b631e..ccd547a84270 100644
--- a/include/abg-workers.h
+++ b/include/abg-workers.h
@@ -45,9 +45,6 @@ namespace abigail
 namespace workers
 {
 
-class task;
-typedef shared_ptr<task> task_sptr;
-
 size_t get_number_of_threads();
 
 /// This represents a task to be performed.
@@ -60,13 +57,14 @@ size_t get_number_of_threads();
 class task
 {
 public:
-  task();
   virtual void
   perform() = 0;
 
-  virtual ~task();
+  virtual ~task(){};
 }; // end class task.
 
+typedef shared_ptr<task> task_sptr;
+
 /// This represents a queue of tasks to be performed.
 ///
 /// Tasks are performed by a number of worker threads.
diff --git a/src/abg-workers.cc b/src/abg-workers.cc
index 4771a2050f0d..8af97bafd355 100644
--- a/src/abg-workers.cc
+++ b/src/abg-workers.cc
@@ -90,23 +90,6 @@ size_t
 get_number_of_threads()
 {return sysconf(_SC_NPROCESSORS_ONLN);}
 
-// <task stuff>
-
-/// The default constructor of the @ref task type
-task::task()
-{}
-
-/// Destructor of the @ref task type.
-task::~task()
-{}
-// </task stuff>
-
-// <worker declarations>
-struct worker;
-
-///  A convenience typedef for a shared_ptr to the @ref worker type.
-typedef shared_ptr<worker> worker_sptr;
-
 /// The abstraction of a worker thread.
 ///
 /// This is an implementation detail of the @ref queue public
@@ -135,63 +118,39 @@ struct queue::priv
   bool				bring_workers_down;
   // The number of worker threads.
   size_t			num_workers;
-  // The mutex associated to the queue condition variable below.
-  pthread_mutex_t		queue_cond_mutex;
+  // A mutex that protects the todo tasks queue from being accessed in
+  // read/write by two threads at the same time.
+  pthread_mutex_t		tasks_todo_mutex;
   // The queue condition variable.  This condition is used to make the
   // worker threads sleep until a new task is added to the queue of
   // todo tasks.  Whenever a new task is added to that queue, a signal
-  // is sent to all the threads sleeping on this condition variable,
-  // and only one of them wakes up and takes the mutex
-  // queue_cond_mutex above.
-  pthread_cond_t		queue_cond;
-  // A mutex that protects the todo tasks queue from being accessed in
-  // read/write by two threads at the same time.
-  mutable pthread_mutex_t	tasks_todo_mutex;
+  // is sent to all a thread sleeping on this condition variable.
+  pthread_cond_t		tasks_todo_cond;
   // A mutex that protects the done tasks queue from being accessed in
   // read/write by two threads at the same time.
   pthread_mutex_t		tasks_done_mutex;
+  // A condition to be signalled whenever there is a task done. That is being
+  // used to wait for tasks completed when bringing the workers down.
+  pthread_cond_t		tasks_done_cond;
+
   // The todo task queue itself.
-  std::queue<task_sptr>	tasks_todo;
+  std::queue<task_sptr>		tasks_todo;
   // The done task queue itself.
   std::vector<task_sptr>	tasks_done;
+
   // This functor is invoked to notify the user of this queue that a
   // task has been completed and has been added to the done tasks
   // vector.  We call it a notifier.  This notifier is the default
   // notifier of the work queue; the one that is used when the user
   // has specified no notifier.  It basically does nothing.
-  task_done_notify	default_notify;
+  static task_done_notify	default_notify;
   // This is a reference to the the notifier that is actually used in
   // the queue.  It's either the one specified by the user or the
   // default one.
-  task_done_notify&	notify;
+  task_done_notify&		notify;
   // A vector of the worker threads.
   std::vector<worker>		workers;
 
-  /// The default constructor of @ref queue::priv.
-  priv()
-    : bring_workers_down(),
-      num_workers(get_number_of_threads()),
-      queue_cond_mutex(),
-      queue_cond(),
-      tasks_todo_mutex(),
-      tasks_done_mutex(),
-      notify(default_notify)
-  {create_workers();}
-
-  /// A constructor of @ref queue::priv.
-  ///
-  /// @param nb_workers the number of worker threads to have in the
-  /// thread pool.
-  priv(size_t nb_workers)
-    : bring_workers_down(),
-      num_workers(nb_workers),
-      queue_cond_mutex(),
-      queue_cond(),
-      tasks_todo_mutex(),
-      tasks_done_mutex(),
-      notify(default_notify)
-  {create_workers();}
-
   /// A constructor of @ref queue::priv.
   ///
   /// @param nb_workers the number of worker threads to have in the
@@ -200,28 +159,17 @@ struct queue::priv
   /// @param task_done_notify a functor object that is invoked by the
   /// worker thread which has performed the task, right after it's
   /// added that task to the vector of the done tasks.
-  priv(size_t nb_workers, task_done_notify& n)
+  priv(size_t nb_workers = get_number_of_threads(),
+	      task_done_notify& n = default_notify)
     : bring_workers_down(),
       num_workers(nb_workers),
-      queue_cond_mutex(),
-      queue_cond(),
       tasks_todo_mutex(),
+      tasks_todo_cond(),
       tasks_done_mutex(),
+      tasks_done_cond(),
       notify(n)
   {create_workers();}
 
-  /// Tests if there are more tasks to execute from the task queue.
-  ///
-  /// @return true iff there are more tasks to execute.
-  bool
-  more_tasks_to_execute() const
-  {
-    pthread_mutex_lock(&tasks_todo_mutex);
-    bool result = !tasks_todo.empty();
-    pthread_mutex_unlock(&tasks_todo_mutex);
-    return result;
-  }
-
   /// Create the worker threads pool and have all threads sit idle,
   /// waiting for a task to be added to the todo queue.
   void
@@ -258,10 +206,7 @@ struct queue::priv
     pthread_mutex_lock(&tasks_todo_mutex);
     tasks_todo.push(t);
     pthread_mutex_unlock(&tasks_todo_mutex);
-
-    pthread_mutex_lock(&queue_cond_mutex);
-    pthread_cond_signal(&queue_cond);
-    pthread_mutex_unlock(&queue_cond_mutex);
+    pthread_cond_signal(&tasks_todo_cond);
     return true;
   }
 
@@ -301,13 +246,17 @@ struct queue::priv
     if (workers.empty())
       return;
 
-    // Acquire the mutex that protects the queue condition variable
-    // (queue_cond) and wake up all the workers that are sleeping on
-    // the condition.
-    pthread_mutex_lock(&queue_cond_mutex);
+    // Wait for the todo list to be empty to make sure all tasks got picked up
+    pthread_mutex_lock(&tasks_todo_mutex);
+    while (!tasks_todo.empty()) {
+	pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex);
+    }
+    pthread_mutex_unlock(&tasks_todo_mutex);
+
+    // Now that the task queue is empty, drain the workers by waking them up,
+    // letting them finish their final task before termination.
     bring_workers_down = true;
-    ABG_ASSERT(pthread_cond_broadcast(&queue_cond) == 0);
-    pthread_mutex_unlock(&queue_cond_mutex);
+    ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0);
 
     for (std::vector<worker>::const_iterator i = workers.begin();
 	 i != workers.end();
@@ -322,6 +271,9 @@ struct queue::priv
 
 }; //end struct queue::priv
 
+// default initialize the default notifier.
+queue::task_done_notify queue::priv::default_notify;
+
 /// Default constructor of the @ref queue type.
 ///
 /// By default the queue is created with a number of worker threaders
@@ -438,20 +390,17 @@ queue::task_done_notify::operator()(const task_sptr&/*task_done*/)
 queue::priv*
 worker::wait_to_execute_a_task(queue::priv* p)
 {
-
-  pthread_mutex_lock(&p->queue_cond_mutex);
-
   do
     {
+      pthread_mutex_lock(&p->tasks_todo_mutex);
       // If there is no more tasks to perform and the queue is not to
       // be brought down then wait (sleep) for new tasks to come up.
-      while (!p->more_tasks_to_execute() && !p->bring_workers_down)
-	pthread_cond_wait(&p->queue_cond, &p->queue_cond_mutex);
+      while (p->tasks_todo.empty() && !p->bring_workers_down)
+	pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex);
 
       // We were woken up.  So maybe there are tasks to perform?  If
       // so, get a task from the queue ...
       task_sptr t;
-      pthread_mutex_lock(&p->tasks_todo_mutex);
       if (!p->tasks_todo.empty())
 	{
 	  t = p->tasks_todo.front();
@@ -477,11 +426,10 @@ worker::wait_to_execute_a_task(queue::priv* p)
 	  p->tasks_done.push_back(t);
 	  p->notify(t);
 	  pthread_mutex_unlock(&p->tasks_done_mutex);
+	  pthread_cond_signal(&p->tasks_done_cond);
 	}
     }
-  while (!p->bring_workers_down || p->more_tasks_to_execute());
-
-  pthread_mutex_unlock(&p->queue_cond_mutex);
+  while (!p->bring_workers_down);
 
   return p;
 }
-- 
2.25.0.341.g760bfbb309-goog



More information about the Libabigail mailing list