libabigail
Loading...
Searching...
No Matches
abg-workers.cc
Go to the documentation of this file.
1// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
2// -*- Mode: C++ -*-
3//
4// Copyright (C) 2013-2024 Red Hat, Inc.
5//
6// Author: Dodji Seketeli
7
8/// @file
9///
10/// This file implements the worker threads (or thread pool) design
11/// pattern. It aims at performing a set of tasks in parallel, using
12/// the multi-threading capabilities of the underlying processor(s).
13
14#include <assert.h>
15#include <unistd.h>
16#include <pthread.h>
17#include <queue>
18#include <vector>
19#include <iostream>
20
21#include "abg-fwd.h"
22#include "abg-internal.h"
23// <headers defining libabigail's API go under here>
24ABG_BEGIN_EXPORT_DECLARATIONS
25
26#include "abg-workers.h"
27
28ABG_END_EXPORT_DECLARATIONS
29// </headers defining libabigail's API>
30
31namespace abigail
32{
33
34namespace workers
35{
36
37/// @defgroup thread_pool Worker Threads
38/// @{
39///
40/// \brief Libabigail's implementation of Thread Pools.
41///
42/// The main interface of this pattern is a @ref queue of @ref tasks
43/// to be performed. Associated to that queue are a set of worker
44/// threads (these are native posix threads) that sits there, idle,
45/// until at least one @ref task is added to the queue.
46///
47/// When a @ref task is added to the @ref queue, one thread is woken
48/// up, picks the @ref task, removes it from the @ref queue, and
49/// executes the instructions it carries. We say the worker thread
50/// performs the @ref task.
51///
52/// When the worker thread is done performing the @ref task, the
53/// performed @ref task is added to another queue, named as the "done
54/// queue". Then the thread looks at the @ref queue of tasks to be
55/// performed again, and if there is at least one task in that queue,
56/// the same process as above is done. Otherwise, the thread blocks,
57/// waiting for a new task to be added to the queue.
58///
59/// By default, the number of worker threads is equal to the number of
60/// execution threads advertised by the underlying processor.
61///
62/// Note that the user of the queue can either wait for all the tasks
63/// to be performed by the pool of threads,and them stop them, get the
64/// vector of done tasks and proceed to whatever computation she may
65/// need next.
66///
67/// Or she can choose to be asynchronously notified whenever a task is
68/// performed and added to the "done queue".
69///
70///@}
71
72/// @return The number of hardware threads of executions advertised by
73/// the underlying processor.
74size_t
76{return sysconf(_SC_NPROCESSORS_ONLN);}
77
78/// The abstraction of a worker thread.
79///
80/// This is an implementation detail of the @ref queue public
81/// interface type of this worker thread design pattern.
82struct worker
83{
84 pthread_t tid;
85
86 worker()
87 : tid()
88 {}
89
90 static queue::priv*
91 wait_to_execute_a_task(queue::priv*);
92}; // end struct worker
93
94// </worker declarations>
95
96// <queue stuff>
97
98/// The private data structure of the task queue.
99struct queue::priv
100{
101 // A boolean to say if the user wants to shutdown the worker
102 // threads. guarded by tasks_todo_mutex.
103 // TODO: once we have std::atomic<bool>, use it and reconsider the
104 // synchronization around its reads and writes
105 bool bring_workers_down;
106 // The number of worker threads.
107 size_t num_workers;
108 // A mutex that protects the todo tasks queue from being accessed in
109 // read/write by two threads at the same time.
110 pthread_mutex_t tasks_todo_mutex;
111 // The queue condition variable. This condition is used to make the
112 // worker threads sleep until a new task is added to the queue of
113 // todo tasks. Whenever a new task is added to that queue, a signal
114 // is sent to all a thread sleeping on this condition variable.
115 pthread_cond_t tasks_todo_cond;
116 // A mutex that protects the done tasks queue from being accessed in
117 // read/write by two threads at the same time.
118 pthread_mutex_t tasks_done_mutex;
119 // A condition to be signalled whenever there is a task done. That is being
120 // used to wait for tasks completed when bringing the workers down.
121 pthread_cond_t tasks_done_cond;
122 // The todo task queue itself.
123 std::queue<task_sptr> tasks_todo;
124 // The done task queue itself.
125 std::vector<task_sptr> tasks_done;
126 // This functor is invoked to notify the user of this queue that a
127 // task has been completed and has been added to the done tasks
128 // vector. We call it a notifier. This notifier is the default
129 // notifier of the work queue; the one that is used when the user
130 // has specified no notifier. It basically does nothing.
131 static task_done_notify default_notify;
132 // This is a reference to the the notifier that is actually used in
133 // the queue. It's either the one specified by the user or the
134 // default one.
135 task_done_notify& notify;
136 // A vector of the worker threads.
137 std::vector<worker> workers;
138
139 /// A constructor of @ref queue::priv.
140 ///
141 /// @param nb_workers the number of worker threads to have in the
142 /// thread pool.
143 ///
144 /// @param task_done_notify a functor object that is invoked by the
145 /// worker thread which has performed the task, right after it's
146 /// added that task to the vector of the done tasks.
147 priv(size_t nb_workers = get_number_of_threads(),
148 task_done_notify& n = default_notify)
149 : bring_workers_down(),
150 num_workers(nb_workers),
151 tasks_todo_mutex(),
152 tasks_todo_cond(),
153 tasks_done_mutex(),
154 tasks_done_cond(),
155 notify(n)
156 {create_workers();}
157
158 /// Create the worker threads pool and have all threads sit idle,
159 /// waiting for a task to be added to the todo queue.
160 void
161 create_workers()
162 {
163 for (unsigned i = 0; i < num_workers; ++i)
164 {
165 worker w;
166 ABG_ASSERT(pthread_create(&w.tid,
167 /*attr=*/0,
168 (void*(*)(void*))&worker::wait_to_execute_a_task,
169 this) == 0);
170 workers.push_back(w);
171 }
172 }
173
174 /// Submit a task to the queue of tasks to be performed.
175 ///
176 /// This wakes up one thread from the pool which immediatly starts
177 /// performing the task. When it's done with the task, it goes back
178 /// to be suspended, waiting for a new task to be scheduled.
179 ///
180 /// @param t the task to schedule. Note that a nil task won't be
181 /// scheduled. If the queue is empty, the task @p t won't be
182 /// scheduled either.
183 ///
184 /// @return true iff the task @p t was successfully scheduled.
185 bool
186 schedule_task(const task_sptr& t)
187 {
188 if (workers.empty() || !t)
189 return false;
190
191 pthread_mutex_lock(&tasks_todo_mutex);
192 tasks_todo.push(t);
193 pthread_mutex_unlock(&tasks_todo_mutex);
194 pthread_cond_signal(&tasks_todo_cond);
195 return true;
196 }
197
198 /// Submit a vector of task to the queue of tasks to be performed.
199 ///
200 /// This wakes up threads of the pool which immediatly start
201 /// performing the tasks. When they are done with the task, they go
202 /// back to be suspended, waiting for new tasks to be scheduled.
203 ///
204 /// @param tasks the tasks to schedule.
205 bool
206 schedule_tasks(const tasks_type& tasks)
207 {
208 bool is_ok= true;
209 for (tasks_type::const_iterator t = tasks.begin(); t != tasks.end(); ++t)
210 is_ok &= schedule_task(*t);
211 return is_ok;
212 }
213
214 /// Signal all the threads (of the pool) which are suspended and
215 /// waiting to perform a task, so that they wake up and end up their
216 /// execution. If there is no task to perform, they just end their
217 /// execution. If there are tasks to perform, they finish them and
218 /// then end their execution.
219 ///
220 /// This function then joins all the tasks of the pool, waiting for
221 /// them to finish, and then it returns. In other words, this
222 /// function suspends the thread of the caller, waiting for the
223 /// worker threads to finish their tasks, and end their execution.
224 ///
225 /// If the user code wants to work with the thread pool again,
226 /// she'll need to create them again, using the member function
227 /// create_workers().
228 void
229 do_bring_workers_down()
230 {
231 if (workers.empty())
232 return;
233
234 // Wait for the todo list to be empty to make sure all tasks got picked up
235 pthread_mutex_lock(&tasks_todo_mutex);
236 while (!tasks_todo.empty())
237 pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex);
238
239 bring_workers_down = true;
240 pthread_mutex_unlock(&tasks_todo_mutex);
241
242 // Now that the task queue is empty, drain the workers by waking them up,
243 // letting them finish their final task before termination.
244 ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0);
245
246 for (std::vector<worker>::const_iterator i = workers.begin();
247 i != workers.end();
248 ++i)
249 ABG_ASSERT(pthread_join(i->tid, /*thread_return=*/0) == 0);
250 workers.clear();
251 }
252
253 /// Destructors of @ref queue::priv type.
254 ~priv()
255 {do_bring_workers_down();}
256
257}; //end struct queue::priv
258
259// default initialize the default notifier.
260queue::task_done_notify queue::priv::default_notify;
261
262/// Default constructor of the @ref queue type.
263///
264/// By default the queue is created with a number of worker threaders
265/// which is equals to the number of simultaneous execution threads
266/// supported by the underlying processor.
268 : p_(new priv())
269{}
270
271/// Constructor of the @ref queue type.
272///
273/// @param number_of_workers the number of worker threads to have in
274/// the pool.
275queue::queue(unsigned number_of_workers)
276 : p_(new priv(number_of_workers))
277{}
278
279/// Constructor of the @ref queue type.
280///
281/// @param number_of_workers the number of worker threads to have in
282/// the pool.
283///
284/// @param the notifier to invoke when a task is done doing its job.
285/// Users should create a type that inherit this @ref task_done_notify
286/// class and overload its virtual task_done_notify::operator()
287/// operator function. Note that the code of that
288/// task_done_notify::operator() is assured to run in *sequence*, with
289/// respect to the code of other task_done_notify::operator() from
290/// other tasks.
291queue::queue(unsigned number_of_workers,
292 task_done_notify& notifier)
293 : p_(new priv(number_of_workers, notifier))
294{}
295
296/// Getter of the size of the queue. This gives the number of task
297/// still present in the queue.
298///
299/// @return the number of task still present in the queue.
300size_t
302{return p_->tasks_todo.size();}
303
304/// Submit a task to the queue of tasks to be performed.
305///
306/// This wakes up one thread from the pool which immediatly starts
307/// performing the task. When it's done with the task, it goes back
308/// to be suspended, waiting for a new task to be scheduled.
309///
310/// @param t the task to schedule. Note that if the queue is empty or
311/// if the task is nil, the task is not scheduled.
312///
313/// @return true iff the task was successfully scheduled.
314bool
315queue::schedule_task(const task_sptr& t)
316{return p_->schedule_task(t);}
317
318/// Submit a vector of tasks to the queue of tasks to be performed.
319///
320/// This wakes up one or more threads from the pool which immediatly
321/// start performing the tasks. When the threads are done with the
322/// tasks, they goes back to be suspended, waiting for a new task to
323/// be scheduled.
324///
325/// @param tasks the tasks to schedule.
326bool
328{return p_->schedule_tasks(tasks);}
329
330/// Suspends the current thread until all worker threads finish
331/// performing the tasks they are executing.
332///
333/// If the worker threads were suspended waiting for a new task to
334/// perform, they are woken up and their execution ends.
335///
336/// The execution of the current thread is resumed when all the
337/// threads of the pool have finished their execution and are
338/// terminated.
339void
341{p_->do_bring_workers_down();}
342
343/// Getter of the vector of tasks that got performed.
344///
345/// @return the vector of tasks that got performed.
346std::vector<task_sptr>&
348{return p_->tasks_done;}
349
350/// Destructor for the @ref queue type.
353
354/// The default function invocation operator of the @ref queue type.
355///
356/// This does nothing.
357void
358queue::task_done_notify::operator()(const task_sptr&/*task_done*/)
359{
360}
361
362// </queue stuff>
363
364// <worker definitions>
365
366/// Wait to be woken up by a thread condition signal, then look if
367/// there is a task to be executed. If there is, then pick one (in a
368/// FIFO manner), execute it, and put the executed task into the set
369/// of done tasks.
370///
371/// @param t the private data of the "task queue" type to consider.
372///
373/// @param return the same private data of the task queue type we got
374/// in argument.
375queue::priv*
376worker::wait_to_execute_a_task(queue::priv* p)
377{
378 while (true)
379 {
380 pthread_mutex_lock(&p->tasks_todo_mutex);
381 // If there is no more tasks to perform and the queue is not to
382 // be brought down then wait (sleep) for new tasks to come up.
383 while (p->tasks_todo.empty() && !p->bring_workers_down)
384 pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex);
385
386 // We were woken up. So maybe there are tasks to perform? If
387 // so, get a task from the queue ...
388 task_sptr t;
389 if (!p->tasks_todo.empty())
390 {
391 t = p->tasks_todo.front();
392 p->tasks_todo.pop();
393 }
394 pthread_mutex_unlock(&p->tasks_todo_mutex);
395
396 // If we've got a task to perform then perform it and when it's
397 // done then add to the set of tasks that are done.
398 if (t)
399 {
400 t->perform();
401
402 // Add the task to the vector of tasks that are done and
403 // notify listeners about the fact that the task is done.
404 //
405 // Note that this (including the notification) is not
406 // happening in parallel. So the code performed by the
407 // notifier during the notification is running sequentially,
408 // not in parallel with any other task that was just done
409 // and that is notifying its listeners.
410 pthread_mutex_lock(&p->tasks_done_mutex);
411 p->tasks_done.push_back(t);
412 p->notify(t);
413 pthread_mutex_unlock(&p->tasks_done_mutex);
414 pthread_cond_signal(&p->tasks_done_cond);
415 }
416
417 // ensure we access bring_workers_down always guarded
418 bool drop_out = false;
419 pthread_mutex_lock(&p->tasks_todo_mutex);
420 drop_out = p->bring_workers_down;
421 pthread_mutex_unlock(&p->tasks_todo_mutex);
422 if (drop_out)
423 break;
424 }
425
426 return p;
427}
428// </worker definitions>
429} //end namespace workers
430} //end namespace abigail
#define ABG_ASSERT(cond)
This is a wrapper around the 'assert' glibc call. It allows for its argument to have side effects,...
Definition abg-fwd.h:1718
This file declares an interface for the worker threads (or thread pool) design pattern....
tasks_type & get_completed_tasks() const
Getter of the vector of tasks that got performed.
~queue()
Destructor for the queue type.
void wait_for_workers_to_complete()
Suspends the current thread until all worker threads finish performing the tasks they are executing.
std::vector< task_sptr > tasks_type
A convenience typedef for a vector of task_sptr.
Definition abg-workers.h:73
size_t get_size() const
Getter of the size of the queue. This gives the number of task still present in the queue.
bool schedule_tasks(const tasks_type &)
Submit a vector of tasks to the queue of tasks to be performed.
queue()
Default constructor of the queue type.
bool schedule_task(const task_sptr &)
Submit a task to the queue of tasks to be performed.
size_t get_number_of_threads()
Toplevel namespace for libabigail.
This functor is to notify listeners that a given task scheduled for execution has been fully executed...
Definition abg-workers.h:95
virtual void operator()(const task_sptr &task_done)
The default function invocation operator of the queue type.