]>
Commit | Line | Data |
---|---|---|
f449bfef RC |
1 | /* threaded_queue.cc |
2 | ||
681bb2f7 | 3 | Copyright 2001, 2002, 2003, 2014 Red Hat Inc. |
f449bfef RC |
4 | |
5 | Written by Robert Collins <rbtcollins@hotmail.com> | |
6 | ||
1c001dd2 | 7 | This file is part of Cygwin. |
f449bfef | 8 | |
1c001dd2 CS |
9 | This software is a copyrighted work licensed under the terms of the |
10 | Cygwin license. Please consult the file "CYGWIN_LICENSE" for | |
11 | details. */ | |
f449bfef | 12 | |
282113ba | 13 | #ifdef __OUTSIDE_CYGWIN__ |
1c001dd2 CS |
14 | #include "woutsup.h" |
15 | ||
16 | #include <assert.h> | |
f449bfef RC |
17 | #include <errno.h> |
18 | #include <stdio.h> | |
19 | #include <unistd.h> | |
f449bfef | 20 | #include <sys/types.h> |
59a2339f | 21 | #include <stdlib.h> |
f449bfef | 22 | #include "threaded_queue.h" |
1c001dd2 CS |
23 | |
24 | /*****************************************************************************/ | |
25 | ||
26 | /* queue_request */ | |
27 | ||
28 | queue_request::~queue_request () | |
29 | {} | |
30 | ||
31 | /*****************************************************************************/ | |
f449bfef RC |
32 | |
33 | /* threaded_queue */ | |
34 | ||
1c001dd2 CS |
35 | threaded_queue::threaded_queue (const size_t initial_workers) |
36 | : _workers_count (0), | |
37 | _running (false), | |
38 | _submitters_head (NULL), | |
39 | _requests_count (0), | |
40 | _requests_head (NULL), | |
41 | _requests_sem (NULL) | |
f449bfef | 42 | { |
1c001dd2 CS |
43 | InitializeCriticalSection (&_queue_lock); |
44 | ||
45 | // This semaphore's count is the number of requests on the queue. | |
46 | // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS | |
47 | // multiplied by max. threads per process (2028?), which is (a few) | |
48 | // more requests than could ever be pending with the current design. | |
49 | ||
50 | _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES | |
51 | 0, // Initial count | |
52 | 129792, // Maximum count | |
53 | NULL); // Anonymous | |
54 | ||
55 | if (!_requests_sem) | |
f449bfef | 56 | { |
1c001dd2 | 57 | system_printf (("failed to create the request queue semaphore, " |
681bb2f7 | 58 | "error = %u"), |
1c001dd2 CS |
59 | GetLastError ()); |
60 | abort (); | |
f449bfef | 61 | } |
1c001dd2 CS |
62 | |
63 | create_workers (initial_workers); | |
f449bfef RC |
64 | } |
65 | ||
1c001dd2 | 66 | threaded_queue::~threaded_queue () |
f449bfef | 67 | { |
1c001dd2 CS |
68 | if (_running) |
69 | stop (); | |
70 | ||
71 | debug_printf ("deleting all pending queue requests"); | |
72 | queue_request *reqptr = _requests_head; | |
73 | while (reqptr) | |
f449bfef | 74 | { |
1c001dd2 CS |
75 | queue_request *const ptr = reqptr; |
76 | reqptr = reqptr->_next; | |
282113ba | 77 | delete ptr; |
f449bfef | 78 | } |
f449bfef | 79 | |
1c001dd2 CS |
80 | DeleteCriticalSection (&_queue_lock); |
81 | if (_requests_sem) | |
82 | (void) CloseHandle (_requests_sem); | |
83 | } | |
84 | ||
85 | /* FIXME: return success or failure rather than quitting */ | |
86 | void | |
87 | threaded_queue::add_submission_loop (queue_submission_loop *const submitter) | |
88 | { | |
89 | assert (this); | |
90 | assert (submitter); | |
91 | assert (submitter->_queue == this); | |
92 | assert (!submitter->_next); | |
93 | ||
94 | submitter->_next = | |
95 | TInterlockedExchangePointer (&_submitters_head, submitter); | |
96 | ||
97 | if (_running) | |
98 | submitter->start (); | |
99 | } | |
100 | ||
101 | bool | |
102 | threaded_queue::start () | |
103 | { | |
104 | EnterCriticalSection (&_queue_lock); | |
105 | const bool was_running = _running; | |
106 | _running = true; | |
107 | queue_submission_loop *loopptr = _submitters_head; | |
108 | LeaveCriticalSection (&_queue_lock); | |
109 | ||
110 | if (!was_running) | |
f449bfef | 111 | { |
1c001dd2 CS |
112 | debug_printf ("starting all queue submission loops"); |
113 | ||
114 | while (loopptr) | |
f449bfef | 115 | { |
1c001dd2 CS |
116 | queue_submission_loop *const ptr = loopptr; |
117 | loopptr = loopptr->_next; | |
118 | ptr->start (); | |
f449bfef | 119 | } |
f449bfef | 120 | } |
1c001dd2 CS |
121 | |
122 | return was_running; | |
f449bfef RC |
123 | } |
124 | ||
1c001dd2 CS |
125 | bool |
126 | threaded_queue::stop () | |
f449bfef | 127 | { |
1c001dd2 CS |
128 | EnterCriticalSection (&_queue_lock); |
129 | const bool was_running = _running; | |
130 | _running = false; | |
131 | queue_submission_loop *loopptr = _submitters_head; | |
132 | LeaveCriticalSection (&_queue_lock); | |
133 | ||
134 | if (was_running) | |
f449bfef | 135 | { |
1c001dd2 CS |
136 | debug_printf ("stopping all queue submission loops"); |
137 | while (loopptr) | |
138 | { | |
139 | queue_submission_loop *const ptr = loopptr; | |
140 | loopptr = loopptr->_next; | |
141 | ptr->stop (); | |
142 | } | |
143 | ||
144 | ReleaseSemaphore (_requests_sem, _workers_count, NULL); | |
145 | while (_workers_count) | |
146 | { | |
147 | debug_printf (("waiting for worker threads to terminate: " | |
681bb2f7 | 148 | "%u still running"), |
1c001dd2 CS |
149 | _workers_count); |
150 | Sleep (1000); | |
151 | } | |
152 | debug_printf ("all worker threads have terminated"); | |
f449bfef | 153 | } |
1c001dd2 CS |
154 | |
155 | return was_running; | |
f449bfef RC |
156 | } |
157 | ||
158 | /* FIXME: return success or failure */ | |
159 | void | |
1c001dd2 | 160 | threaded_queue::add (queue_request *const therequest) |
f449bfef | 161 | { |
1c001dd2 CS |
162 | assert (this); |
163 | assert (therequest); | |
164 | assert (!therequest->_next); | |
165 | ||
166 | if (!_workers_count) | |
f449bfef | 167 | { |
1c001dd2 CS |
168 | system_printf ("warning: no worker threads to handle request!"); |
169 | // FIXME: And then what? | |
f449bfef | 170 | } |
1c001dd2 CS |
171 | |
172 | EnterCriticalSection (&_queue_lock); | |
173 | if (!_requests_head) | |
174 | _requests_head = therequest; | |
f449bfef RC |
175 | else |
176 | { | |
1c001dd2 CS |
177 | /* Add to the queue end. */ |
178 | queue_request *reqptr = _requests_head; | |
179 | for (; reqptr->_next; reqptr = reqptr->_next) | |
180 | {} | |
181 | assert (reqptr); | |
182 | assert (!reqptr->_next); | |
183 | reqptr->_next = therequest; | |
f449bfef | 184 | } |
1c001dd2 CS |
185 | |
186 | _requests_count += 1; | |
187 | assert (_requests_count > 0); | |
188 | LeaveCriticalSection (&_queue_lock); | |
189 | ||
190 | (void) ReleaseSemaphore (_requests_sem, 1, NULL); | |
f449bfef RC |
191 | } |
192 | ||
1c001dd2 CS |
193 | /*static*/ DWORD WINAPI |
194 | threaded_queue::start_routine (const LPVOID lpParam) | |
195 | { | |
196 | class threaded_queue *const queue = (class threaded_queue *) lpParam; | |
197 | assert (queue); | |
198 | ||
199 | queue->worker_loop (); | |
200 | ||
201 | const long count = InterlockedDecrement (&queue->_workers_count); | |
202 | assert (count >= 0); | |
203 | ||
204 | if (queue->_running) | |
205 | debug_printf ("worker loop has exited; thread about to terminate"); | |
206 | ||
207 | return 0; | |
208 | } | |
209 | ||
210 | /* Called from the constructor: so no need to be thread-safe until the | |
211 | * worker threads start to be created; thus the interlocked increment | |
212 | * of the `_workers_count' field. | |
213 | */ | |
214 | ||
f449bfef | 215 | void |
1c001dd2 | 216 | threaded_queue::create_workers (const size_t initial_workers) |
f449bfef | 217 | { |
1c001dd2 CS |
218 | assert (initial_workers > 0); |
219 | ||
220 | for (unsigned int i = 0; i != initial_workers; i++) | |
221 | { | |
222 | const long count = InterlockedIncrement (&_workers_count); | |
223 | assert (count > 0); | |
224 | ||
225 | DWORD tid; | |
226 | const HANDLE hThread = | |
227 | CreateThread (NULL, 0, start_routine, this, 0, &tid); | |
228 | ||
229 | if (!hThread) | |
230 | { | |
681bb2f7 | 231 | system_printf ("failed to create thread, error = %u", |
1c001dd2 CS |
232 | GetLastError ()); |
233 | abort (); | |
234 | } | |
235 | ||
236 | (void) CloseHandle (hThread); | |
237 | } | |
f449bfef RC |
238 | } |
239 | ||
1c001dd2 CS |
240 | void |
241 | threaded_queue::worker_loop () | |
f449bfef | 242 | { |
1c001dd2 | 243 | while (true) |
f449bfef | 244 | { |
1c001dd2 CS |
245 | const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE); |
246 | if (rc == WAIT_FAILED) | |
247 | { | |
681bb2f7 | 248 | system_printf ("wait for request semaphore failed, error = %u", |
1c001dd2 CS |
249 | GetLastError ()); |
250 | return; | |
251 | } | |
252 | assert (rc == WAIT_OBJECT_0); | |
253 | ||
254 | EnterCriticalSection (&_queue_lock); | |
255 | if (!_running) | |
256 | { | |
257 | LeaveCriticalSection (&_queue_lock); | |
258 | return; | |
259 | } | |
260 | ||
261 | assert (_requests_head); | |
262 | queue_request *const reqptr = _requests_head; | |
263 | _requests_head = reqptr->_next; | |
264 | ||
265 | _requests_count -= 1; | |
266 | assert (_requests_count >= 0); | |
267 | LeaveCriticalSection (&_queue_lock); | |
268 | ||
269 | assert (reqptr); | |
270 | reqptr->process (); | |
282113ba | 271 | delete reqptr; |
1c001dd2 CS |
272 | } |
273 | } | |
274 | ||
275 | /*****************************************************************************/ | |
276 | ||
277 | /* queue_submission_loop */ | |
278 | ||
279 | queue_submission_loop::queue_submission_loop (threaded_queue *const queue, | |
280 | const bool ninterruptible) | |
281 | : _running (false), | |
282 | _interrupt_event (NULL), | |
283 | _queue (queue), | |
284 | _interruptible (ninterruptible), | |
285 | _hThread (NULL), | |
286 | _tid (0), | |
287 | _next (NULL) | |
288 | { | |
289 | if (_interruptible) | |
290 | { | |
291 | // verbose: debug_printf ("creating an interruptible processing thread"); | |
292 | ||
293 | _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES | |
294 | FALSE, // Auto-reset | |
295 | FALSE, // Initially non-signalled | |
296 | NULL); // Anonymous | |
297 | ||
298 | if (!_interrupt_event) | |
299 | { | |
681bb2f7 | 300 | system_printf ("failed to create interrupt event, error = %u", |
1c001dd2 CS |
301 | GetLastError ()); |
302 | abort (); | |
303 | } | |
f449bfef RC |
304 | } |
305 | } | |
306 | ||
1c001dd2 | 307 | queue_submission_loop::~queue_submission_loop () |
f449bfef | 308 | { |
1c001dd2 | 309 | if (_running) |
f449bfef | 310 | stop (); |
1c001dd2 CS |
311 | if (_interrupt_event) |
312 | (void) CloseHandle (_interrupt_event); | |
313 | if (_hThread) | |
314 | (void) CloseHandle (_hThread); | |
f449bfef RC |
315 | } |
316 | ||
317 | bool | |
1c001dd2 | 318 | queue_submission_loop::start () |
f449bfef | 319 | { |
1c001dd2 CS |
320 | assert (this); |
321 | assert (!_hThread); | |
322 | ||
323 | const bool was_running = _running; | |
324 | ||
325 | if (!was_running) | |
f449bfef | 326 | { |
1c001dd2 CS |
327 | _running = true; |
328 | ||
329 | _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid); | |
330 | if (!_hThread) | |
331 | { | |
681bb2f7 | 332 | system_printf ("failed to create thread, error = %u", |
1c001dd2 CS |
333 | GetLastError ()); |
334 | abort (); | |
335 | } | |
f449bfef | 336 | } |
1c001dd2 CS |
337 | |
338 | return was_running; | |
f449bfef RC |
339 | } |
340 | ||
1c001dd2 CS |
341 | bool |
342 | queue_submission_loop::stop () | |
f449bfef | 343 | { |
1c001dd2 CS |
344 | assert (this); |
345 | assert (_hThread && _hThread != INVALID_HANDLE_VALUE); | |
346 | ||
347 | const bool was_running = _running; | |
348 | ||
349 | if (_running) | |
f449bfef | 350 | { |
1c001dd2 CS |
351 | _running = false; |
352 | ||
353 | if (_interruptible) | |
f449bfef | 354 | { |
1c001dd2 CS |
355 | assert (_interrupt_event |
356 | && _interrupt_event != INVALID_HANDLE_VALUE); | |
357 | ||
358 | SetEvent (_interrupt_event); | |
359 | ||
360 | if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT) | |
361 | { | |
681bb2f7 | 362 | system_printf (("request loop thread %u failed to shutdown " |
1c001dd2 CS |
363 | "when asked politely: about to get heavy"), |
364 | _tid); | |
365 | ||
366 | if (!TerminateThread (_hThread, 0)) | |
367 | { | |
681bb2f7 CV |
368 | system_printf (("failed to kill request loop thread %u" |
369 | ", error = %u"), | |
1c001dd2 CS |
370 | _tid, GetLastError ()); |
371 | abort (); | |
372 | } | |
373 | } | |
f449bfef RC |
374 | } |
375 | else | |
f449bfef | 376 | { |
1c001dd2 CS |
377 | // FIXME: could wait to see if the request loop notices that |
378 | // the submission loop is no longer running and shuts down | |
379 | // voluntarily. | |
380 | ||
681bb2f7 | 381 | debug_printf ("killing request loop thread %u", _tid); |
1c001dd2 CS |
382 | |
383 | if (!TerminateThread (_hThread, 0)) | |
681bb2f7 CV |
384 | system_printf (("failed to kill request loop thread %u" |
385 | ", error = %u"), | |
1c001dd2 | 386 | _tid, GetLastError ()); |
f449bfef | 387 | } |
f449bfef | 388 | } |
f449bfef | 389 | |
1c001dd2 | 390 | return was_running; |
f449bfef RC |
391 | } |
392 | ||
1c001dd2 CS |
393 | /*static*/ DWORD WINAPI |
394 | queue_submission_loop::start_routine (const LPVOID lpParam) | |
f449bfef | 395 | { |
1c001dd2 CS |
396 | class queue_submission_loop *const submission_loop = |
397 | (class queue_submission_loop *) lpParam; | |
398 | assert (submission_loop); | |
399 | ||
400 | submission_loop->request_loop (); | |
401 | ||
402 | debug_printf ("submission loop has exited; thread about to terminate"); | |
403 | ||
404 | submission_loop->stop (); | |
405 | ||
406 | return 0; | |
f449bfef | 407 | } |
1c001dd2 CS |
408 | |
409 | /*****************************************************************************/ | |
282113ba | 410 | #endif /* __OUTSIDE_CYGWIN__ */ |