]> sourceware.org Git - newlib-cygwin.git/blame - winsup/cygserver/threaded_queue.cc
* bsd_helper.h: Throughout, convert "struct thread" to "class thread".
[newlib-cygwin.git] / winsup / cygserver / threaded_queue.cc
CommitLineData
f449bfef
RC
1/* threaded_queue.cc
2
681bb2f7 3 Copyright 2001, 2002, 2003, 2014 Red Hat Inc.
f449bfef
RC
4
5 Written by Robert Collins <rbtcollins@hotmail.com>
6
1c001dd2 7This file is part of Cygwin.
f449bfef 8
1c001dd2
CS
9This software is a copyrighted work licensed under the terms of the
10Cygwin license. Please consult the file "CYGWIN_LICENSE" for
11details. */
f449bfef 12
282113ba 13#ifdef __OUTSIDE_CYGWIN__
1c001dd2
CS
14#include "woutsup.h"
15
16#include <assert.h>
f449bfef
RC
17#include <errno.h>
18#include <stdio.h>
19#include <unistd.h>
f449bfef 20#include <sys/types.h>
59a2339f 21#include <stdlib.h>
f449bfef 22#include "threaded_queue.h"
1c001dd2
CS
23
24/*****************************************************************************/
25
26/* queue_request */
27
28queue_request::~queue_request ()
29{}
30
31/*****************************************************************************/
f449bfef
RC
32
33/* threaded_queue */
34
1c001dd2
CS
35threaded_queue::threaded_queue (const size_t initial_workers)
36 : _workers_count (0),
37 _running (false),
38 _submitters_head (NULL),
39 _requests_count (0),
40 _requests_head (NULL),
41 _requests_sem (NULL)
f449bfef 42{
1c001dd2
CS
43 InitializeCriticalSection (&_queue_lock);
44
45 // This semaphore's count is the number of requests on the queue.
46 // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
47 // multiplied by max. threads per process (2028?), which is (a few)
48 // more requests than could ever be pending with the current design.
49
50 _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES
51 0, // Initial count
52 129792, // Maximum count
53 NULL); // Anonymous
54
55 if (!_requests_sem)
f449bfef 56 {
1c001dd2 57 system_printf (("failed to create the request queue semaphore, "
681bb2f7 58 "error = %u"),
1c001dd2
CS
59 GetLastError ());
60 abort ();
f449bfef 61 }
1c001dd2
CS
62
63 create_workers (initial_workers);
f449bfef
RC
64}
65
1c001dd2 66threaded_queue::~threaded_queue ()
f449bfef 67{
1c001dd2
CS
68 if (_running)
69 stop ();
70
71 debug_printf ("deleting all pending queue requests");
72 queue_request *reqptr = _requests_head;
73 while (reqptr)
f449bfef 74 {
1c001dd2
CS
75 queue_request *const ptr = reqptr;
76 reqptr = reqptr->_next;
282113ba 77 delete ptr;
f449bfef 78 }
f449bfef 79
1c001dd2
CS
80 DeleteCriticalSection (&_queue_lock);
81 if (_requests_sem)
82 (void) CloseHandle (_requests_sem);
83}
84
85/* FIXME: return success or failure rather than quitting */
86void
87threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
88{
89 assert (this);
90 assert (submitter);
91 assert (submitter->_queue == this);
92 assert (!submitter->_next);
93
94 submitter->_next =
95 TInterlockedExchangePointer (&_submitters_head, submitter);
96
97 if (_running)
98 submitter->start ();
99}
100
101bool
102threaded_queue::start ()
103{
104 EnterCriticalSection (&_queue_lock);
105 const bool was_running = _running;
106 _running = true;
107 queue_submission_loop *loopptr = _submitters_head;
108 LeaveCriticalSection (&_queue_lock);
109
110 if (!was_running)
f449bfef 111 {
1c001dd2
CS
112 debug_printf ("starting all queue submission loops");
113
114 while (loopptr)
f449bfef 115 {
1c001dd2
CS
116 queue_submission_loop *const ptr = loopptr;
117 loopptr = loopptr->_next;
118 ptr->start ();
f449bfef 119 }
f449bfef 120 }
1c001dd2
CS
121
122 return was_running;
f449bfef
RC
123}
124
1c001dd2
CS
125bool
126threaded_queue::stop ()
f449bfef 127{
1c001dd2
CS
128 EnterCriticalSection (&_queue_lock);
129 const bool was_running = _running;
130 _running = false;
131 queue_submission_loop *loopptr = _submitters_head;
132 LeaveCriticalSection (&_queue_lock);
133
134 if (was_running)
f449bfef 135 {
1c001dd2
CS
136 debug_printf ("stopping all queue submission loops");
137 while (loopptr)
138 {
139 queue_submission_loop *const ptr = loopptr;
140 loopptr = loopptr->_next;
141 ptr->stop ();
142 }
143
144 ReleaseSemaphore (_requests_sem, _workers_count, NULL);
145 while (_workers_count)
146 {
147 debug_printf (("waiting for worker threads to terminate: "
681bb2f7 148 "%u still running"),
1c001dd2
CS
149 _workers_count);
150 Sleep (1000);
151 }
152 debug_printf ("all worker threads have terminated");
f449bfef 153 }
1c001dd2
CS
154
155 return was_running;
f449bfef
RC
156}
157
158/* FIXME: return success or failure */
159void
1c001dd2 160threaded_queue::add (queue_request *const therequest)
f449bfef 161{
1c001dd2
CS
162 assert (this);
163 assert (therequest);
164 assert (!therequest->_next);
165
166 if (!_workers_count)
f449bfef 167 {
1c001dd2
CS
168 system_printf ("warning: no worker threads to handle request!");
169 // FIXME: And then what?
f449bfef 170 }
1c001dd2
CS
171
172 EnterCriticalSection (&_queue_lock);
173 if (!_requests_head)
174 _requests_head = therequest;
f449bfef
RC
175 else
176 {
1c001dd2
CS
177 /* Add to the queue end. */
178 queue_request *reqptr = _requests_head;
179 for (; reqptr->_next; reqptr = reqptr->_next)
180 {}
181 assert (reqptr);
182 assert (!reqptr->_next);
183 reqptr->_next = therequest;
f449bfef 184 }
1c001dd2
CS
185
186 _requests_count += 1;
187 assert (_requests_count > 0);
188 LeaveCriticalSection (&_queue_lock);
189
190 (void) ReleaseSemaphore (_requests_sem, 1, NULL);
f449bfef
RC
191}
192
1c001dd2
CS
193/*static*/ DWORD WINAPI
194threaded_queue::start_routine (const LPVOID lpParam)
195{
196 class threaded_queue *const queue = (class threaded_queue *) lpParam;
197 assert (queue);
198
199 queue->worker_loop ();
200
201 const long count = InterlockedDecrement (&queue->_workers_count);
202 assert (count >= 0);
203
204 if (queue->_running)
205 debug_printf ("worker loop has exited; thread about to terminate");
206
207 return 0;
208}
209
210/* Called from the constructor: so no need to be thread-safe until the
211 * worker threads start to be created; thus the interlocked increment
212 * of the `_workers_count' field.
213 */
214
f449bfef 215void
1c001dd2 216threaded_queue::create_workers (const size_t initial_workers)
f449bfef 217{
1c001dd2
CS
218 assert (initial_workers > 0);
219
220 for (unsigned int i = 0; i != initial_workers; i++)
221 {
222 const long count = InterlockedIncrement (&_workers_count);
223 assert (count > 0);
224
225 DWORD tid;
226 const HANDLE hThread =
227 CreateThread (NULL, 0, start_routine, this, 0, &tid);
228
229 if (!hThread)
230 {
681bb2f7 231 system_printf ("failed to create thread, error = %u",
1c001dd2
CS
232 GetLastError ());
233 abort ();
234 }
235
236 (void) CloseHandle (hThread);
237 }
f449bfef
RC
238}
239
1c001dd2
CS
240void
241threaded_queue::worker_loop ()
f449bfef 242{
1c001dd2 243 while (true)
f449bfef 244 {
1c001dd2
CS
245 const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
246 if (rc == WAIT_FAILED)
247 {
681bb2f7 248 system_printf ("wait for request semaphore failed, error = %u",
1c001dd2
CS
249 GetLastError ());
250 return;
251 }
252 assert (rc == WAIT_OBJECT_0);
253
254 EnterCriticalSection (&_queue_lock);
255 if (!_running)
256 {
257 LeaveCriticalSection (&_queue_lock);
258 return;
259 }
260
261 assert (_requests_head);
262 queue_request *const reqptr = _requests_head;
263 _requests_head = reqptr->_next;
264
265 _requests_count -= 1;
266 assert (_requests_count >= 0);
267 LeaveCriticalSection (&_queue_lock);
268
269 assert (reqptr);
270 reqptr->process ();
282113ba 271 delete reqptr;
1c001dd2
CS
272 }
273}
274
275/*****************************************************************************/
276
277/* queue_submission_loop */
278
279queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
280 const bool ninterruptible)
281 : _running (false),
282 _interrupt_event (NULL),
283 _queue (queue),
284 _interruptible (ninterruptible),
285 _hThread (NULL),
286 _tid (0),
287 _next (NULL)
288{
289 if (_interruptible)
290 {
291 // verbose: debug_printf ("creating an interruptible processing thread");
292
293 _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES
294 FALSE, // Auto-reset
295 FALSE, // Initially non-signalled
296 NULL); // Anonymous
297
298 if (!_interrupt_event)
299 {
681bb2f7 300 system_printf ("failed to create interrupt event, error = %u",
1c001dd2
CS
301 GetLastError ());
302 abort ();
303 }
f449bfef
RC
304 }
305}
306
1c001dd2 307queue_submission_loop::~queue_submission_loop ()
f449bfef 308{
1c001dd2 309 if (_running)
f449bfef 310 stop ();
1c001dd2
CS
311 if (_interrupt_event)
312 (void) CloseHandle (_interrupt_event);
313 if (_hThread)
314 (void) CloseHandle (_hThread);
f449bfef
RC
315}
316
317bool
1c001dd2 318queue_submission_loop::start ()
f449bfef 319{
1c001dd2
CS
320 assert (this);
321 assert (!_hThread);
322
323 const bool was_running = _running;
324
325 if (!was_running)
f449bfef 326 {
1c001dd2
CS
327 _running = true;
328
329 _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
330 if (!_hThread)
331 {
681bb2f7 332 system_printf ("failed to create thread, error = %u",
1c001dd2
CS
333 GetLastError ());
334 abort ();
335 }
f449bfef 336 }
1c001dd2
CS
337
338 return was_running;
f449bfef
RC
339}
340
1c001dd2
CS
341bool
342queue_submission_loop::stop ()
f449bfef 343{
1c001dd2
CS
344 assert (this);
345 assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
346
347 const bool was_running = _running;
348
349 if (_running)
f449bfef 350 {
1c001dd2
CS
351 _running = false;
352
353 if (_interruptible)
f449bfef 354 {
1c001dd2
CS
355 assert (_interrupt_event
356 && _interrupt_event != INVALID_HANDLE_VALUE);
357
358 SetEvent (_interrupt_event);
359
360 if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
361 {
681bb2f7 362 system_printf (("request loop thread %u failed to shutdown "
1c001dd2
CS
363 "when asked politely: about to get heavy"),
364 _tid);
365
366 if (!TerminateThread (_hThread, 0))
367 {
681bb2f7
CV
368 system_printf (("failed to kill request loop thread %u"
369 ", error = %u"),
1c001dd2
CS
370 _tid, GetLastError ());
371 abort ();
372 }
373 }
f449bfef
RC
374 }
375 else
f449bfef 376 {
1c001dd2
CS
377 // FIXME: could wait to see if the request loop notices that
378 // the submission loop is no longer running and shuts down
379 // voluntarily.
380
681bb2f7 381 debug_printf ("killing request loop thread %u", _tid);
1c001dd2
CS
382
383 if (!TerminateThread (_hThread, 0))
681bb2f7
CV
384 system_printf (("failed to kill request loop thread %u"
385 ", error = %u"),
1c001dd2 386 _tid, GetLastError ());
f449bfef 387 }
f449bfef 388 }
f449bfef 389
1c001dd2 390 return was_running;
f449bfef
RC
391}
392
1c001dd2
CS
393/*static*/ DWORD WINAPI
394queue_submission_loop::start_routine (const LPVOID lpParam)
f449bfef 395{
1c001dd2
CS
396 class queue_submission_loop *const submission_loop =
397 (class queue_submission_loop *) lpParam;
398 assert (submission_loop);
399
400 submission_loop->request_loop ();
401
402 debug_printf ("submission loop has exited; thread about to terminate");
403
404 submission_loop->stop ();
405
406 return 0;
f449bfef 407}
1c001dd2
CS
408
409/*****************************************************************************/
282113ba 410#endif /* __OUTSIDE_CYGWIN__ */
This page took 0.262911 seconds and 5 git commands to generate.