libabigail
abg-workers.cc
Go to the documentation of this file.
1 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
2 // -*- Mode: C++ -*-
3 //
4 // Copyright (C) 2013-2023 Red Hat, Inc.
5 //
6 // Author: Dodji Seketeli
7 
8 /// @file
9 ///
10 /// This file implements the worker threads (or thread pool) design
11 /// pattern. It aims at performing a set of tasks in parallel, using
12 /// the multi-threading capabilities of the underlying processor(s).
13 
14 #include <assert.h>
15 #include <unistd.h>
16 #include <pthread.h>
17 #include <queue>
18 #include <vector>
19 #include <iostream>
20 
21 #include "abg-fwd.h"
22 #include "abg-internal.h"
23 // <headers defining libabigail's API go under here>
24 ABG_BEGIN_EXPORT_DECLARATIONS
25 
26 #include "abg-workers.h"
27 
28 ABG_END_EXPORT_DECLARATIONS
29 // </headers defining libabigail's API>
30 
31 namespace abigail
32 {
33 
34 namespace workers
35 {
36 
37 /// @defgroup thread_pool Worker Threads
38 /// @{
39 ///
40 /// \brief Libabigail's implementation of Thread Pools.
41 ///
42 /// The main interface of this pattern is a @ref queue of @ref tasks
43 /// to be performed. Associated to that queue are a set of worker
44 /// threads (these are native posix threads) that sits there, idle,
45 /// until at least one @ref task is added to the queue.
46 ///
47 /// When a @ref task is added to the @ref queue, one thread is woken
48 /// up, picks the @ref task, removes it from the @ref queue, and
49 /// executes the instructions it carries. We say the worker thread
50 /// performs the @ref task.
51 ///
52 /// When the worker thread is done performing the @ref task, the
53 /// performed @ref task is added to another queue, named as the "done
54 /// queue". Then the thread looks at the @ref queue of tasks to be
55 /// performed again, and if there is at least one task in that queue,
56 /// the same process as above is done. Otherwise, the thread blocks,
57 /// waiting for a new task to be added to the queue.
58 ///
59 /// By default, the number of worker threads is equal to the number of
60 /// execution threads advertised by the underlying processor.
61 ///
62 /// Note that the user of the queue can either wait for all the tasks
63 /// to be performed by the pool of threads,and them stop them, get the
64 /// vector of done tasks and proceed to whatever computation she may
65 /// need next.
66 ///
67 /// Or she can choose to be asynchronously notified whenever a task is
68 /// performed and added to the "done queue".
69 ///
70 ///@}
71 
72 /// @return The number of hardware threads of executions advertised by
73 /// the underlying processor.
74 size_t
76 {return sysconf(_SC_NPROCESSORS_ONLN);}
77 
78 /// The abstraction of a worker thread.
79 ///
80 /// This is an implementation detail of the @ref queue public
81 /// interface type of this worker thread design pattern.
82 struct worker
83 {
84  pthread_t tid;
85 
86  worker()
87  : tid()
88  {}
89 
90  static queue::priv*
91  wait_to_execute_a_task(queue::priv*);
92 }; // end struct worker
93 
94 // </worker declarations>
95 
96 // <queue stuff>
97 
98 /// The private data structure of the task queue.
99 struct queue::priv
100 {
101  // A boolean to say if the user wants to shutdown the worker
102  // threads. guarded by tasks_todo_mutex.
103  // TODO: once we have std::atomic<bool>, use it and reconsider the
104  // synchronization around its reads and writes
105  bool bring_workers_down;
106  // The number of worker threads.
107  size_t num_workers;
108  // A mutex that protects the todo tasks queue from being accessed in
109  // read/write by two threads at the same time.
110  pthread_mutex_t tasks_todo_mutex;
111  // The queue condition variable. This condition is used to make the
112  // worker threads sleep until a new task is added to the queue of
113  // todo tasks. Whenever a new task is added to that queue, a signal
114  // is sent to all a thread sleeping on this condition variable.
115  pthread_cond_t tasks_todo_cond;
116  // A mutex that protects the done tasks queue from being accessed in
117  // read/write by two threads at the same time.
118  pthread_mutex_t tasks_done_mutex;
119  // A condition to be signalled whenever there is a task done. That is being
120  // used to wait for tasks completed when bringing the workers down.
121  pthread_cond_t tasks_done_cond;
122  // The todo task queue itself.
123  std::queue<task_sptr> tasks_todo;
124  // The done task queue itself.
125  std::vector<task_sptr> tasks_done;
126  // This functor is invoked to notify the user of this queue that a
127  // task has been completed and has been added to the done tasks
128  // vector. We call it a notifier. This notifier is the default
129  // notifier of the work queue; the one that is used when the user
130  // has specified no notifier. It basically does nothing.
131  static task_done_notify default_notify;
132  // This is a reference to the the notifier that is actually used in
133  // the queue. It's either the one specified by the user or the
134  // default one.
135  task_done_notify& notify;
136  // A vector of the worker threads.
137  std::vector<worker> workers;
138 
139  /// A constructor of @ref queue::priv.
140  ///
141  /// @param nb_workers the number of worker threads to have in the
142  /// thread pool.
143  ///
144  /// @param task_done_notify a functor object that is invoked by the
145  /// worker thread which has performed the task, right after it's
146  /// added that task to the vector of the done tasks.
147  priv(size_t nb_workers = get_number_of_threads(),
148  task_done_notify& n = default_notify)
149  : bring_workers_down(),
150  num_workers(nb_workers),
151  tasks_todo_mutex(),
152  tasks_todo_cond(),
153  tasks_done_mutex(),
154  tasks_done_cond(),
155  notify(n)
156  {create_workers();}
157 
158  /// Create the worker threads pool and have all threads sit idle,
159  /// waiting for a task to be added to the todo queue.
160  void
161  create_workers()
162  {
163  for (unsigned i = 0; i < num_workers; ++i)
164  {
165  worker w;
166  ABG_ASSERT(pthread_create(&w.tid,
167  /*attr=*/0,
168  (void*(*)(void*))&worker::wait_to_execute_a_task,
169  this) == 0);
170  workers.push_back(w);
171  }
172  }
173 
174  /// Submit a task to the queue of tasks to be performed.
175  ///
176  /// This wakes up one thread from the pool which immediatly starts
177  /// performing the task. When it's done with the task, it goes back
178  /// to be suspended, waiting for a new task to be scheduled.
179  ///
180  /// @param t the task to schedule. Note that a nil task won't be
181  /// scheduled. If the queue is empty, the task @p t won't be
182  /// scheduled either.
183  ///
184  /// @return true iff the task @p t was successfully scheduled.
185  bool
186  schedule_task(const task_sptr& t)
187  {
188  if (workers.empty() || !t)
189  return false;
190 
191  pthread_mutex_lock(&tasks_todo_mutex);
192  tasks_todo.push(t);
193  pthread_mutex_unlock(&tasks_todo_mutex);
194  pthread_cond_signal(&tasks_todo_cond);
195  return true;
196  }
197 
198  /// Submit a vector of task to the queue of tasks to be performed.
199  ///
200  /// This wakes up threads of the pool which immediatly start
201  /// performing the tasks. When they are done with the task, they go
202  /// back to be suspended, waiting for new tasks to be scheduled.
203  ///
204  /// @param tasks the tasks to schedule.
205  bool
206  schedule_tasks(const tasks_type& tasks)
207  {
208  bool is_ok= true;
209  for (tasks_type::const_iterator t = tasks.begin(); t != tasks.end(); ++t)
210  is_ok &= schedule_task(*t);
211  return is_ok;
212  }
213 
214  /// Signal all the threads (of the pool) which are suspended and
215  /// waiting to perform a task, so that they wake up and end up their
216  /// execution. If there is no task to perform, they just end their
217  /// execution. If there are tasks to perform, they finish them and
218  /// then end their execution.
219  ///
220  /// This function then joins all the tasks of the pool, waiting for
221  /// them to finish, and then it returns. In other words, this
222  /// function suspends the thread of the caller, waiting for the
223  /// worker threads to finish their tasks, and end their execution.
224  ///
225  /// If the user code wants to work with the thread pool again,
226  /// she'll need to create them again, using the member function
227  /// create_workers().
228  void
229  do_bring_workers_down()
230  {
231  if (workers.empty())
232  return;
233 
234  // Wait for the todo list to be empty to make sure all tasks got picked up
235  pthread_mutex_lock(&tasks_todo_mutex);
236  while (!tasks_todo.empty())
237  pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex);
238 
239  bring_workers_down = true;
240  pthread_mutex_unlock(&tasks_todo_mutex);
241 
242  // Now that the task queue is empty, drain the workers by waking them up,
243  // letting them finish their final task before termination.
244  ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0);
245 
246  for (std::vector<worker>::const_iterator i = workers.begin();
247  i != workers.end();
248  ++i)
249  ABG_ASSERT(pthread_join(i->tid, /*thread_return=*/0) == 0);
250  workers.clear();
251  }
252 
253  /// Destructors of @ref queue::priv type.
254  ~priv()
255  {do_bring_workers_down();}
256 
257 }; //end struct queue::priv
258 
259 // default initialize the default notifier.
260 queue::task_done_notify queue::priv::default_notify;
261 
262 /// Default constructor of the @ref queue type.
263 ///
264 /// By default the queue is created with a number of worker threaders
265 /// which is equals to the number of simultaneous execution threads
266 /// supported by the underlying processor.
268  : p_(new priv())
269 {}
270 
271 /// Constructor of the @ref queue type.
272 ///
273 /// @param number_of_workers the number of worker threads to have in
274 /// the pool.
275 queue::queue(unsigned number_of_workers)
276  : p_(new priv(number_of_workers))
277 {}
278 
279 /// Constructor of the @ref queue type.
280 ///
281 /// @param number_of_workers the number of worker threads to have in
282 /// the pool.
283 ///
284 /// @param the notifier to invoke when a task is done doing its job.
285 /// Users should create a type that inherit this @ref task_done_notify
286 /// class and overload its virtual task_done_notify::operator()
287 /// operator function. Note that the code of that
288 /// task_done_notify::operator() is assured to run in *sequence*, with
289 /// respect to the code of other task_done_notify::operator() from
290 /// other tasks.
291 queue::queue(unsigned number_of_workers,
292  task_done_notify& notifier)
293  : p_(new priv(number_of_workers, notifier))
294 {}
295 
296 /// Getter of the size of the queue. This gives the number of task
297 /// still present in the queue.
298 ///
299 /// @return the number of task still present in the queue.
300 size_t
302 {return p_->tasks_todo.size();}
303 
304 /// Submit a task to the queue of tasks to be performed.
305 ///
306 /// This wakes up one thread from the pool which immediatly starts
307 /// performing the task. When it's done with the task, it goes back
308 /// to be suspended, waiting for a new task to be scheduled.
309 ///
310 /// @param t the task to schedule. Note that if the queue is empty or
311 /// if the task is nil, the task is not scheduled.
312 ///
313 /// @return true iff the task was successfully scheduled.
314 bool
315 queue::schedule_task(const task_sptr& t)
316 {return p_->schedule_task(t);}
317 
318 /// Submit a vector of tasks to the queue of tasks to be performed.
319 ///
320 /// This wakes up one or more threads from the pool which immediatly
321 /// start performing the tasks. When the threads are done with the
322 /// tasks, they goes back to be suspended, waiting for a new task to
323 /// be scheduled.
324 ///
325 /// @param tasks the tasks to schedule.
326 bool
328 {return p_->schedule_tasks(tasks);}
329 
330 /// Suspends the current thread until all worker threads finish
331 /// performing the tasks they are executing.
332 ///
333 /// If the worker threads were suspended waiting for a new task to
334 /// perform, they are woken up and their execution ends.
335 ///
336 /// The execution of the current thread is resumed when all the
337 /// threads of the pool have finished their execution and are
338 /// terminated.
339 void
341 {p_->do_bring_workers_down();}
342 
343 /// Getter of the vector of tasks that got performed.
344 ///
345 /// @return the vector of tasks that got performed.
346 std::vector<task_sptr>&
348 {return p_->tasks_done;}
349 
350 /// Destructor for the @ref queue type.
352 {}
353 
354 /// The default function invocation operator of the @ref queue type.
355 ///
356 /// This does nothing.
357 void
358 queue::task_done_notify::operator()(const task_sptr&/*task_done*/)
359 {
360 }
361 
362 // </queue stuff>
363 
364 // <worker definitions>
365 
366 /// Wait to be woken up by a thread condition signal, then look if
367 /// there is a task to be executed. If there is, then pick one (in a
368 /// FIFO manner), execute it, and put the executed task into the set
369 /// of done tasks.
370 ///
371 /// @param t the private data of the "task queue" type to consider.
372 ///
373 /// @param return the same private data of the task queue type we got
374 /// in argument.
375 queue::priv*
376 worker::wait_to_execute_a_task(queue::priv* p)
377 {
378  while (true)
379  {
380  pthread_mutex_lock(&p->tasks_todo_mutex);
381  // If there is no more tasks to perform and the queue is not to
382  // be brought down then wait (sleep) for new tasks to come up.
383  while (p->tasks_todo.empty() && !p->bring_workers_down)
384  pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex);
385 
386  // We were woken up. So maybe there are tasks to perform? If
387  // so, get a task from the queue ...
388  task_sptr t;
389  if (!p->tasks_todo.empty())
390  {
391  t = p->tasks_todo.front();
392  p->tasks_todo.pop();
393  }
394  pthread_mutex_unlock(&p->tasks_todo_mutex);
395 
396  // If we've got a task to perform then perform it and when it's
397  // done then add to the set of tasks that are done.
398  if (t)
399  {
400  t->perform();
401 
402  // Add the task to the vector of tasks that are done and
403  // notify listeners about the fact that the task is done.
404  //
405  // Note that this (including the notification) is not
406  // happening in parallel. So the code performed by the
407  // notifier during the notification is running sequentially,
408  // not in parallel with any other task that was just done
409  // and that is notifying its listeners.
410  pthread_mutex_lock(&p->tasks_done_mutex);
411  p->tasks_done.push_back(t);
412  p->notify(t);
413  pthread_mutex_unlock(&p->tasks_done_mutex);
414  pthread_cond_signal(&p->tasks_done_cond);
415  }
416 
417  // ensure we access bring_workers_down always guarded
418  bool drop_out = false;
419  pthread_mutex_lock(&p->tasks_todo_mutex);
420  drop_out = p->bring_workers_down;
421  pthread_mutex_unlock(&p->tasks_todo_mutex);
422  if (drop_out)
423  break;
424  }
425 
426  return p;
427 }
428 // </worker definitions>
429 } //end namespace workers
430 } //end namespace abigail
#define ABG_ASSERT(cond)
This is a wrapper around the 'assert' glibc call. It allows for its argument to have side effects,...
Definition: abg-fwd.h:1714
This file declares an interface for the worker threads (or thread pool) design pattern....
tasks_type & get_completed_tasks() const
Getter of the vector of tasks that got performed.
Definition: abg-workers.cc:347
~queue()
Destructor for the queue type.
Definition: abg-workers.cc:351
void wait_for_workers_to_complete()
Suspends the current thread until all worker threads finish performing the tasks they are executing.
Definition: abg-workers.cc:340
std::vector< task_sptr > tasks_type
A convenience typedef for a vector of task_sptr.
Definition: abg-workers.h:70
size_t get_size() const
Getter of the size of the queue. This gives the number of task still present in the queue.
Definition: abg-workers.cc:301
bool schedule_tasks(const tasks_type &)
Submit a vector of tasks to the queue of tasks to be performed.
Definition: abg-workers.cc:327
queue()
Default constructor of the queue type.
Definition: abg-workers.cc:267
bool schedule_task(const task_sptr &)
Submit a task to the queue of tasks to be performed.
Definition: abg-workers.cc:315
size_t get_number_of_threads()
Definition: abg-workers.cc:75
Toplevel namespace for libabigail.
This functor is to notify listeners that a given task scheduled for execution has been fully executed...
Definition: abg-workers.h:95
virtual void operator()(const task_sptr &task_done)
The default function invocation operator of the queue type.
Definition: abg-workers.cc:358