]>
sourceware.org Git - newlib-cygwin.git/blob - winsup/cygserver/threaded_queue.cc
3 Copyright 2001, 2002, 2003 Red Hat Inc.
5 Written by Robert Collins <rbtcollins@hotmail.com>
7 This file is part of Cygwin.
9 This software is a copyrighted work licensed under the terms of the
10 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
13 #ifdef __OUTSIDE_CYGWIN__
20 #include <sys/types.h>
22 #include "threaded_queue.h"
24 /*****************************************************************************/
28 queue_request::~queue_request ()
31 /*****************************************************************************/
35 threaded_queue::threaded_queue (const size_t initial_workers
)
38 _submitters_head (NULL
),
40 _requests_head (NULL
),
43 InitializeCriticalSection (&_queue_lock
);
45 // This semaphore's count is the number of requests on the queue.
46 // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
47 // multiplied by max. threads per process (2028?), which is (a few)
48 // more requests than could ever be pending with the current design.
50 _requests_sem
= CreateSemaphore (NULL
, // SECURITY_ATTRIBUTES
52 129792, // Maximum count
57 system_printf (("failed to create the request queue semaphore, "
63 create_workers (initial_workers
);
66 threaded_queue::~threaded_queue ()
71 debug_printf ("deleting all pending queue requests");
72 queue_request
*reqptr
= _requests_head
;
75 queue_request
*const ptr
= reqptr
;
76 reqptr
= reqptr
->_next
;
80 DeleteCriticalSection (&_queue_lock
);
82 (void) CloseHandle (_requests_sem
);
85 /* FIXME: return success or failure rather than quitting */
87 threaded_queue::add_submission_loop (queue_submission_loop
*const submitter
)
91 assert (submitter
->_queue
== this);
92 assert (!submitter
->_next
);
95 TInterlockedExchangePointer (&_submitters_head
, submitter
);
102 threaded_queue::start ()
104 EnterCriticalSection (&_queue_lock
);
105 const bool was_running
= _running
;
107 queue_submission_loop
*loopptr
= _submitters_head
;
108 LeaveCriticalSection (&_queue_lock
);
112 debug_printf ("starting all queue submission loops");
116 queue_submission_loop
*const ptr
= loopptr
;
117 loopptr
= loopptr
->_next
;
126 threaded_queue::stop ()
128 EnterCriticalSection (&_queue_lock
);
129 const bool was_running
= _running
;
131 queue_submission_loop
*loopptr
= _submitters_head
;
132 LeaveCriticalSection (&_queue_lock
);
136 debug_printf ("stopping all queue submission loops");
139 queue_submission_loop
*const ptr
= loopptr
;
140 loopptr
= loopptr
->_next
;
144 ReleaseSemaphore (_requests_sem
, _workers_count
, NULL
);
145 while (_workers_count
)
147 debug_printf (("waiting for worker threads to terminate: "
148 "%lu still running"),
152 debug_printf ("all worker threads have terminated");
158 /* FIXME: return success or failure */
160 threaded_queue::add (queue_request
*const therequest
)
164 assert (!therequest
->_next
);
168 system_printf ("warning: no worker threads to handle request!");
169 // FIXME: And then what?
172 EnterCriticalSection (&_queue_lock
);
174 _requests_head
= therequest
;
177 /* Add to the queue end. */
178 queue_request
*reqptr
= _requests_head
;
179 for (; reqptr
->_next
; reqptr
= reqptr
->_next
)
182 assert (!reqptr
->_next
);
183 reqptr
->_next
= therequest
;
186 _requests_count
+= 1;
187 assert (_requests_count
> 0);
188 LeaveCriticalSection (&_queue_lock
);
190 (void) ReleaseSemaphore (_requests_sem
, 1, NULL
);
193 /*static*/ DWORD WINAPI
194 threaded_queue::start_routine (const LPVOID lpParam
)
196 class threaded_queue
*const queue
= (class threaded_queue
*) lpParam
;
199 queue
->worker_loop ();
201 const long count
= InterlockedDecrement (&queue
->_workers_count
);
205 debug_printf ("worker loop has exited; thread about to terminate");
210 /* Called from the constructor: so no need to be thread-safe until the
211 * worker threads start to be created; thus the interlocked increment
212 * of the `_workers_count' field.
216 threaded_queue::create_workers (const size_t initial_workers
)
218 assert (initial_workers
> 0);
220 for (unsigned int i
= 0; i
!= initial_workers
; i
++)
222 const long count
= InterlockedIncrement (&_workers_count
);
226 const HANDLE hThread
=
227 CreateThread (NULL
, 0, start_routine
, this, 0, &tid
);
231 system_printf ("failed to create thread, error = %lu",
236 (void) CloseHandle (hThread
);
241 threaded_queue::worker_loop ()
245 const DWORD rc
= WaitForSingleObject (_requests_sem
, INFINITE
);
246 if (rc
== WAIT_FAILED
)
248 system_printf ("wait for request semaphore failed, error = %lu",
252 assert (rc
== WAIT_OBJECT_0
);
254 EnterCriticalSection (&_queue_lock
);
257 LeaveCriticalSection (&_queue_lock
);
261 assert (_requests_head
);
262 queue_request
*const reqptr
= _requests_head
;
263 _requests_head
= reqptr
->_next
;
265 _requests_count
-= 1;
266 assert (_requests_count
>= 0);
267 LeaveCriticalSection (&_queue_lock
);
275 /*****************************************************************************/
277 /* queue_submission_loop */
279 queue_submission_loop::queue_submission_loop (threaded_queue
*const queue
,
280 const bool ninterruptible
)
282 _interrupt_event (NULL
),
284 _interruptible (ninterruptible
),
291 // verbose: debug_printf ("creating an interruptible processing thread");
293 _interrupt_event
= CreateEvent (NULL
, // SECURITY_ATTRIBUTES
295 FALSE
, // Initially non-signalled
298 if (!_interrupt_event
)
300 system_printf ("failed to create interrupt event, error = %lu",
307 queue_submission_loop::~queue_submission_loop ()
311 if (_interrupt_event
)
312 (void) CloseHandle (_interrupt_event
);
314 (void) CloseHandle (_hThread
);
318 queue_submission_loop::start ()
323 const bool was_running
= _running
;
329 _hThread
= CreateThread (NULL
, 0, start_routine
, this, 0, &_tid
);
332 system_printf ("failed to create thread, error = %lu",
342 queue_submission_loop::stop ()
345 assert (_hThread
&& _hThread
!= INVALID_HANDLE_VALUE
);
347 const bool was_running
= _running
;
355 assert (_interrupt_event
356 && _interrupt_event
!= INVALID_HANDLE_VALUE
);
358 SetEvent (_interrupt_event
);
360 if (WaitForSingleObject (_hThread
, 1000) == WAIT_TIMEOUT
)
362 system_printf (("request loop thread %lu failed to shutdown "
363 "when asked politely: about to get heavy"),
366 if (!TerminateThread (_hThread
, 0))
368 system_printf (("failed to kill request loop thread %lu"
370 _tid
, GetLastError ());
377 // FIXME: could wait to see if the request loop notices that
378 // the submission loop is no longer running and shuts down
381 debug_printf ("killing request loop thread %lu", _tid
);
383 if (!TerminateThread (_hThread
, 0))
384 system_printf (("failed to kill request loop thread %lu"
386 _tid
, GetLastError ());
393 /*static*/ DWORD WINAPI
394 queue_submission_loop::start_routine (const LPVOID lpParam
)
396 class queue_submission_loop
*const submission_loop
=
397 (class queue_submission_loop
*) lpParam
;
398 assert (submission_loop
);
400 submission_loop
->request_loop ();
402 debug_printf ("submission loop has exited; thread about to terminate");
404 submission_loop
->stop ();
409 /*****************************************************************************/
410 #endif /* __OUTSIDE_CYGWIN__ */
This page took 0.055635 seconds and 5 git commands to generate.