]> sourceware.org Git - newlib-cygwin.git/blame - winsup/cygserver/threaded_queue.cc
Cygwin: cygserver: build with -Wimplicit-fallthrough=4 -Werror
[newlib-cygwin.git] / winsup / cygserver / threaded_queue.cc
CommitLineData
f449bfef
RC
1/* threaded_queue.cc
2
f449bfef
RC
3 Written by Robert Collins <rbtcollins@hotmail.com>
4
1c001dd2 5This file is part of Cygwin.
f449bfef 6
1c001dd2
CS
7This software is a copyrighted work licensed under the terms of the
8Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9details. */
f449bfef 10
282113ba 11#ifdef __OUTSIDE_CYGWIN__
1c001dd2
CS
12#include "woutsup.h"
13
14#include <assert.h>
f449bfef
RC
15#include <errno.h>
16#include <stdio.h>
17#include <unistd.h>
f449bfef 18#include <sys/types.h>
59a2339f 19#include <stdlib.h>
f449bfef 20#include "threaded_queue.h"
1c001dd2
CS
21
22/*****************************************************************************/
23
24/* queue_request */
25
26queue_request::~queue_request ()
27{}
28
29/*****************************************************************************/
f449bfef
RC
30
31/* threaded_queue */
32
1c001dd2
CS
33threaded_queue::threaded_queue (const size_t initial_workers)
34 : _workers_count (0),
0b73dba4 35 _workers_busy (0),
1c001dd2
CS
36 _running (false),
37 _submitters_head (NULL),
38 _requests_count (0),
39 _requests_head (NULL),
40 _requests_sem (NULL)
f449bfef 41{
1c001dd2
CS
42 InitializeCriticalSection (&_queue_lock);
43
44 // This semaphore's count is the number of requests on the queue.
45 // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
46 // multiplied by max. threads per process (2028?), which is (a few)
47 // more requests than could ever be pending with the current design.
48
49 _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES
50 0, // Initial count
51 129792, // Maximum count
52 NULL); // Anonymous
53
54 if (!_requests_sem)
f449bfef 55 {
1c001dd2 56 system_printf (("failed to create the request queue semaphore, "
681bb2f7 57 "error = %u"),
1c001dd2
CS
58 GetLastError ());
59 abort ();
f449bfef 60 }
1c001dd2
CS
61
62 create_workers (initial_workers);
f449bfef
RC
63}
64
1c001dd2 65threaded_queue::~threaded_queue ()
f449bfef 66{
1c001dd2
CS
67 if (_running)
68 stop ();
69
70 debug_printf ("deleting all pending queue requests");
71 queue_request *reqptr = _requests_head;
72 while (reqptr)
f449bfef 73 {
1c001dd2
CS
74 queue_request *const ptr = reqptr;
75 reqptr = reqptr->_next;
282113ba 76 delete ptr;
f449bfef 77 }
f449bfef 78
1c001dd2
CS
79 DeleteCriticalSection (&_queue_lock);
80 if (_requests_sem)
81 (void) CloseHandle (_requests_sem);
82}
83
84/* FIXME: return success or failure rather than quitting */
85void
86threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
87{
1c001dd2
CS
88 assert (submitter);
89 assert (submitter->_queue == this);
90 assert (!submitter->_next);
91
92 submitter->_next =
93 TInterlockedExchangePointer (&_submitters_head, submitter);
94
95 if (_running)
96 submitter->start ();
97}
98
99bool
100threaded_queue::start ()
101{
102 EnterCriticalSection (&_queue_lock);
103 const bool was_running = _running;
104 _running = true;
105 queue_submission_loop *loopptr = _submitters_head;
106 LeaveCriticalSection (&_queue_lock);
107
108 if (!was_running)
f449bfef 109 {
1c001dd2
CS
110 debug_printf ("starting all queue submission loops");
111
112 while (loopptr)
f449bfef 113 {
1c001dd2
CS
114 queue_submission_loop *const ptr = loopptr;
115 loopptr = loopptr->_next;
116 ptr->start ();
f449bfef 117 }
f449bfef 118 }
1c001dd2
CS
119
120 return was_running;
f449bfef
RC
121}
122
1c001dd2
CS
123bool
124threaded_queue::stop ()
f449bfef 125{
1c001dd2
CS
126 EnterCriticalSection (&_queue_lock);
127 const bool was_running = _running;
128 _running = false;
129 queue_submission_loop *loopptr = _submitters_head;
130 LeaveCriticalSection (&_queue_lock);
131
132 if (was_running)
f449bfef 133 {
1c001dd2
CS
134 debug_printf ("stopping all queue submission loops");
135 while (loopptr)
136 {
137 queue_submission_loop *const ptr = loopptr;
138 loopptr = loopptr->_next;
139 ptr->stop ();
140 }
141
142 ReleaseSemaphore (_requests_sem, _workers_count, NULL);
143 while (_workers_count)
144 {
145 debug_printf (("waiting for worker threads to terminate: "
681bb2f7 146 "%u still running"),
1c001dd2
CS
147 _workers_count);
148 Sleep (1000);
149 }
150 debug_printf ("all worker threads have terminated");
f449bfef 151 }
1c001dd2
CS
152
153 return was_running;
f449bfef
RC
154}
155
156/* FIXME: return success or failure */
157void
1c001dd2 158threaded_queue::add (queue_request *const therequest)
f449bfef 159{
1c001dd2
CS
160 assert (therequest);
161 assert (!therequest->_next);
162
1c001dd2
CS
163 EnterCriticalSection (&_queue_lock);
164 if (!_requests_head)
165 _requests_head = therequest;
f449bfef
RC
166 else
167 {
1c001dd2
CS
168 /* Add to the queue end. */
169 queue_request *reqptr = _requests_head;
170 for (; reqptr->_next; reqptr = reqptr->_next)
171 {}
172 assert (reqptr);
173 assert (!reqptr->_next);
174 reqptr->_next = therequest;
f449bfef 175 }
1c001dd2
CS
176
177 _requests_count += 1;
178 assert (_requests_count > 0);
179 LeaveCriticalSection (&_queue_lock);
180
181 (void) ReleaseSemaphore (_requests_sem, 1, NULL);
0b73dba4
CV
182
183 if (_workers_busy >= _workers_count)
184 {
185 create_workers (1);
186 system_printf ("All threads busy, added one (now %u)", _workers_count);
187 }
f449bfef
RC
188}
189
1c001dd2
CS
190/*static*/ DWORD WINAPI
191threaded_queue::start_routine (const LPVOID lpParam)
192{
193 class threaded_queue *const queue = (class threaded_queue *) lpParam;
194 assert (queue);
195
196 queue->worker_loop ();
197
198 const long count = InterlockedDecrement (&queue->_workers_count);
199 assert (count >= 0);
200
201 if (queue->_running)
202 debug_printf ("worker loop has exited; thread about to terminate");
203
204 return 0;
205}
206
f449bfef 207void
1c001dd2 208threaded_queue::create_workers (const size_t initial_workers)
f449bfef 209{
1c001dd2
CS
210 assert (initial_workers > 0);
211
0b73dba4 212 for (unsigned int i = 0; i < initial_workers; i++)
1c001dd2
CS
213 {
214 const long count = InterlockedIncrement (&_workers_count);
215 assert (count > 0);
216
217 DWORD tid;
218 const HANDLE hThread =
219 CreateThread (NULL, 0, start_routine, this, 0, &tid);
220
221 if (!hThread)
222 {
681bb2f7 223 system_printf ("failed to create thread, error = %u",
1c001dd2
CS
224 GetLastError ());
225 abort ();
226 }
227
228 (void) CloseHandle (hThread);
229 }
f449bfef
RC
230}
231
1c001dd2
CS
232void
233threaded_queue::worker_loop ()
f449bfef 234{
1c001dd2 235 while (true)
f449bfef 236 {
1c001dd2
CS
237 const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
238 if (rc == WAIT_FAILED)
239 {
681bb2f7 240 system_printf ("wait for request semaphore failed, error = %u",
1c001dd2
CS
241 GetLastError ());
242 return;
243 }
244 assert (rc == WAIT_OBJECT_0);
245
246 EnterCriticalSection (&_queue_lock);
247 if (!_running)
248 {
249 LeaveCriticalSection (&_queue_lock);
250 return;
251 }
252
253 assert (_requests_head);
254 queue_request *const reqptr = _requests_head;
255 _requests_head = reqptr->_next;
256
257 _requests_count -= 1;
258 assert (_requests_count >= 0);
259 LeaveCriticalSection (&_queue_lock);
260
261 assert (reqptr);
0b73dba4 262 InterlockedIncrement (&_workers_busy);
1c001dd2 263 reqptr->process ();
0b73dba4 264 InterlockedDecrement (&_workers_busy);
282113ba 265 delete reqptr;
1c001dd2
CS
266 }
267}
268
269/*****************************************************************************/
270
271/* queue_submission_loop */
272
273queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
274 const bool ninterruptible)
275 : _running (false),
276 _interrupt_event (NULL),
277 _queue (queue),
278 _interruptible (ninterruptible),
279 _hThread (NULL),
280 _tid (0),
281 _next (NULL)
282{
283 if (_interruptible)
284 {
285 // verbose: debug_printf ("creating an interruptible processing thread");
286
287 _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES
288 FALSE, // Auto-reset
289 FALSE, // Initially non-signalled
290 NULL); // Anonymous
291
292 if (!_interrupt_event)
293 {
681bb2f7 294 system_printf ("failed to create interrupt event, error = %u",
1c001dd2
CS
295 GetLastError ());
296 abort ();
297 }
f449bfef
RC
298 }
299}
300
1c001dd2 301queue_submission_loop::~queue_submission_loop ()
f449bfef 302{
1c001dd2 303 if (_running)
f449bfef 304 stop ();
1c001dd2
CS
305 if (_interrupt_event)
306 (void) CloseHandle (_interrupt_event);
307 if (_hThread)
308 (void) CloseHandle (_hThread);
f449bfef
RC
309}
310
311bool
1c001dd2 312queue_submission_loop::start ()
f449bfef 313{
1c001dd2
CS
314 assert (!_hThread);
315
316 const bool was_running = _running;
317
318 if (!was_running)
f449bfef 319 {
1c001dd2
CS
320 _running = true;
321
322 _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
323 if (!_hThread)
324 {
681bb2f7 325 system_printf ("failed to create thread, error = %u",
1c001dd2
CS
326 GetLastError ());
327 abort ();
328 }
f449bfef 329 }
1c001dd2
CS
330
331 return was_running;
f449bfef
RC
332}
333
1c001dd2
CS
334bool
335queue_submission_loop::stop ()
f449bfef 336{
1c001dd2
CS
337 assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
338
339 const bool was_running = _running;
340
341 if (_running)
f449bfef 342 {
1c001dd2
CS
343 _running = false;
344
345 if (_interruptible)
f449bfef 346 {
1c001dd2
CS
347 assert (_interrupt_event
348 && _interrupt_event != INVALID_HANDLE_VALUE);
349
350 SetEvent (_interrupt_event);
351
352 if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
353 {
681bb2f7 354 system_printf (("request loop thread %u failed to shutdown "
1c001dd2
CS
355 "when asked politely: about to get heavy"),
356 _tid);
357
358 if (!TerminateThread (_hThread, 0))
359 {
681bb2f7
CV
360 system_printf (("failed to kill request loop thread %u"
361 ", error = %u"),
1c001dd2
CS
362 _tid, GetLastError ());
363 abort ();
364 }
365 }
f449bfef
RC
366 }
367 else
f449bfef 368 {
1c001dd2
CS
369 // FIXME: could wait to see if the request loop notices that
370 // the submission loop is no longer running and shuts down
371 // voluntarily.
372
681bb2f7 373 debug_printf ("killing request loop thread %u", _tid);
1c001dd2
CS
374
375 if (!TerminateThread (_hThread, 0))
681bb2f7
CV
376 system_printf (("failed to kill request loop thread %u"
377 ", error = %u"),
1c001dd2 378 _tid, GetLastError ());
f449bfef 379 }
f449bfef 380 }
f449bfef 381
1c001dd2 382 return was_running;
f449bfef
RC
383}
384
1c001dd2
CS
385/*static*/ DWORD WINAPI
386queue_submission_loop::start_routine (const LPVOID lpParam)
f449bfef 387{
1c001dd2
CS
388 class queue_submission_loop *const submission_loop =
389 (class queue_submission_loop *) lpParam;
390 assert (submission_loop);
391
392 submission_loop->request_loop ();
393
394 debug_printf ("submission loop has exited; thread about to terminate");
395
396 submission_loop->stop ();
397
398 return 0;
f449bfef 399}
1c001dd2
CS
400
401/*****************************************************************************/
282113ba 402#endif /* __OUTSIDE_CYGWIN__ */
This page took 0.303282 seconds and 5 git commands to generate.