}
};
-/* Info needed by all readers of a FIFO, stored in named shared memory. */
+/* Info needed by all fhandlers for a given FIFO, stored in named
+ shared memory. This is mostly for readers, but writers need access
+ in order to update the count of open writers. */
class fifo_shmem_t
{
- LONG _nreaders;
+ LONG _nreaders, _nwriters;
fifo_reader_id_t _owner, _prev_owner, _pending_owner;
- af_unix_spinlock_t _owner_lock, _reading_lock, _reader_opening_lock, _nreaders_lock;
+ af_unix_spinlock_t _owner_lock, _reading_lock, _nreaders_lock, _nwriters_lock;
/* Info about shared memory block used for temporary storage of the
owner's fc_handler list. */
int nreaders () const { return (int) _nreaders; }
int inc_nreaders () { return (int) InterlockedIncrement (&_nreaders); }
int dec_nreaders () { return (int) InterlockedDecrement (&_nreaders); }
+ int nwriters () const { return (int) _nwriters; }
+ int inc_nwriters () { return (int) InterlockedIncrement (&_nwriters); }
+ int dec_nwriters () { return (int) InterlockedDecrement (&_nwriters); }
fifo_reader_id_t get_owner () const { return _owner; }
void set_owner (fifo_reader_id_t fr_id) { _owner = fr_id; }
void owner_unlock () { _owner_lock.unlock (); }
void reading_lock () { _reading_lock.lock (); }
void reading_unlock () { _reading_lock.unlock (); }
- void reader_opening_lock () { _reader_opening_lock.lock (); }
- void reader_opening_unlock () { _reader_opening_lock.unlock (); }
void nreaders_lock () { _nreaders_lock.lock (); }
void nreaders_unlock () { _nreaders_lock.unlock (); }
+ void nwriters_lock () { _nwriters_lock.lock (); }
+ void nwriters_unlock () { _nwriters_lock.unlock (); }
int get_shared_nhandlers () const { return (int) _sh_nhandlers; }
void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); }
HANDLE owner_needed_evt; /* The owner is closing. */
HANDLE owner_found_evt; /* A new owner has taken over. */
HANDLE update_needed_evt; /* shared_fc_handler needs updating. */
- HANDLE check_write_ready_evt; /* write_ready needs to be checked. */
- HANDLE write_ready_ok_evt; /* check_write_ready is done. */
/* Handles to non-shared events needed for fifo_reader_threads. */
HANDLE cancel_evt; /* Signal thread to terminate. */
void record_connection (fifo_client_handler&,
fifo_client_connect_state = fc_connected);
- int create_shmem ();
+ int create_shmem (bool only_open = false);
int reopen_shmem ();
int create_shared_fc_handler ();
int reopen_shared_fc_handler ();
int nreaders () const { return shmem->nreaders (); }
int inc_nreaders () { return shmem->inc_nreaders (); }
int dec_nreaders () { return shmem->dec_nreaders (); }
+ int nwriters () const { return shmem->nwriters (); }
+ int inc_nwriters () { return shmem->inc_nwriters (); }
+ int dec_nwriters () { return shmem->dec_nwriters (); }
void nreaders_lock () { shmem->nreaders_lock (); }
void nreaders_unlock () { shmem->nreaders_unlock (); }
+ void nwriters_lock () { shmem->nwriters_lock (); }
+ void nwriters_unlock () { shmem->nwriters_unlock (); }
fifo_reader_id_t get_prev_owner () const { return shmem->get_prev_owner (); }
void set_prev_owner (fifo_reader_id_t fr_id)
{ return shmem->shared_fc_handler_updated (); }
void shared_fc_handler_updated (bool val)
{ shmem->shared_fc_handler_updated (val); }
- void check_write_ready ();
- void reader_opening_lock () { shmem->reader_opening_lock (); }
- void reader_opening_unlock () { shmem->reader_opening_unlock (); }
public:
fhandler_fifo ();
fhandler_base (),
read_ready (NULL), write_ready (NULL), writer_opening (NULL),
owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL),
- check_write_ready_evt (NULL), write_ready_ok_evt (NULL),
cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false),
fc_handler (NULL), shandlers (0), nhandlers (0),
reader (false), writer (false), duplexer (false),
return 0;
}
-/* The write_ready event gets set when a writer opens, to indicate
- that a blocking reader can open. If a second reader wants to open,
- we need to see if there are still any writers open. */
-void
-fhandler_fifo::check_write_ready ()
-{
- bool set = false;
-
- for (int i = 0; i < nhandlers && !set; i++)
- if (fc_handler[i].set_state () >= fc_connected)
- set = true;
- if (set || IsEventSignalled (writer_opening))
- SetEvent (write_ready);
- else
- ResetEvent (write_ready);
- SetEvent (write_ready_ok_evt);
-}
-
static DWORD WINAPI
fifo_reader_thread (LPVOID param)
{
IO_STATUS_BLOCK io;
bool cancel = false;
bool update = false;
- bool check = false;
status = NtFsControlFile (fc.h, conn_evt, NULL, NULL, &io,
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
if (status == STATUS_PENDING)
{
- HANDLE w[4] = { conn_evt, update_needed_evt,
- check_write_ready_evt, cancel_evt };
- switch (WaitForMultipleObjects (4, w, false, INFINITE))
+ HANDLE w[3] = { conn_evt, update_needed_evt, cancel_evt };
+ switch (WaitForMultipleObjects (3, w, false, INFINITE))
{
case WAIT_OBJECT_0:
status = io.Status;
update = true;
break;
case WAIT_OBJECT_0 + 2:
- status = STATUS_WAIT_2;
- check = true;
- break;
- case WAIT_OBJECT_0 + 3:
status = STATUS_THREAD_IS_TERMINATING;
cancel = true;
update = true;
break;
case STATUS_THREAD_IS_TERMINATING:
case STATUS_WAIT_1:
- case STATUS_WAIT_2:
/* Try to connect a bogus client. Otherwise fc is still
listening, and the next connection might not get recorded. */
status1 = open_pipe (ph);
NtClose (ph);
if (update && update_shared_handlers () < 0)
api_fatal ("Can't update shared handlers, %E");
- if (check)
- check_write_ready ();
fifo_client_unlock ();
if (cancel)
goto canceled;
return 0;
}
+/* Return -1 on error and 0 or 1 on success. If ONLY_OPEN is true, we
+ expect the shared memory to exist, and we only try to open it. In
+ this case, we return 0 on success.
+
+ Otherwise, we create the shared memory if it doesn't exist, and we
+ return 1 if it already existed and we successfully open it. */
int
-fhandler_fifo::create_shmem ()
+fhandler_fifo::create_shmem (bool only_open)
{
HANDLE sect;
OBJECT_ATTRIBUTES attr;
PVOID addr = NULL;
UNICODE_STRING uname;
WCHAR shmem_name[MAX_PATH];
+ bool already_exists = false;
__small_swprintf (shmem_name, L"fifo-shmem.%08x.%016X", get_dev (),
get_ino ());
RtlInitUnicodeString (&uname, shmem_name);
InitializeObjectAttributes (&attr, &uname, OBJ_INHERIT,
get_shared_parent_dir (), NULL);
- status = NtCreateSection (§, STANDARD_RIGHTS_REQUIRED | SECTION_QUERY
- | SECTION_MAP_READ | SECTION_MAP_WRITE,
- &attr, &size, PAGE_READWRITE, SEC_COMMIT, NULL);
- if (status == STATUS_OBJECT_NAME_COLLISION)
+ if (!only_open)
+ {
+ status = NtCreateSection (§, STANDARD_RIGHTS_REQUIRED | SECTION_QUERY
+ | SECTION_MAP_READ | SECTION_MAP_WRITE,
+ &attr, &size, PAGE_READWRITE, SEC_COMMIT, NULL);
+ if (status == STATUS_OBJECT_NAME_COLLISION)
+ already_exists = true;
+ }
+ if (only_open || already_exists)
status = NtOpenSection (§, STANDARD_RIGHTS_REQUIRED | SECTION_QUERY
| SECTION_MAP_READ | SECTION_MAP_WRITE, &attr);
if (!NT_SUCCESS (status))
}
shmem_handle = sect;
shmem = (fifo_shmem_t *) addr;
- return 0;
+ return already_exists ? 1 : 0;
}
/* shmem_handle must be valid when this is called. */
int
fhandler_fifo::open (int flags, mode_t)
{
- int saved_errno = 0;
+ int saved_errno = 0, shmem_res = 0;
if (flags & O_PATH)
return open_fs (flags);
writer = true;
break;
case O_RDWR:
- reader = true;
- duplexer = true;
+ reader = writer = duplexer = true;
break;
default:
set_errno (EINVAL);
goto err_close_write_ready;
}
- /* If we're reading, signal read_ready, create the shared memory,
- and start the fifo_reader_thread. */
+ /* If we're reading, create the shared memory and the shared
+ fc_handler memory, create some events, start the
+ fifo_reader_thread, signal read_ready, and wait for a writer. */
if (reader)
{
- bool first = true;
-
- SetEvent (read_ready);
- if (create_shmem () < 0)
+ /* Create/open shared memory. */
+ if ((shmem_res = create_shmem ()) < 0)
goto err_close_writer_opening;
+ else if (shmem_res == 0)
+ debug_printf ("shmem created");
+ else
+ debug_printf ("shmem existed; ok if we're not the first reader");
if (create_shared_fc_handler () < 0)
goto err_close_shmem;
- reader_opening_lock ();
- if (inc_nreaders () == 1)
- /* Reinitialize _sh_fc_handler_updated, which starts as 0. */
- shared_fc_handler_updated (true);
- else
- first = false;
npbuf[0] = 'n';
if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf)))
{
debug_printf ("CreateEvent for %s failed, %E", npbuf);
__seterrno ();
- goto err_dec_nreaders;
+ goto err_close_shared_fc_handler;
}
npbuf[0] = 'f';
if (!(owner_found_evt = CreateEvent (sa_buf, true, false, npbuf)))
__seterrno ();
goto err_close_owner_found_evt;
}
- npbuf[0] = 'c';
- if (!(check_write_ready_evt = CreateEvent (sa_buf, false, false, npbuf)))
- {
- debug_printf ("CreateEvent for %s failed, %E", npbuf);
- __seterrno ();
- goto err_close_update_needed_evt;
- }
- npbuf[0] = 'k';
- if (!(write_ready_ok_evt = CreateEvent (sa_buf, false, false, npbuf)))
- {
- debug_printf ("CreateEvent for %s failed, %E", npbuf);
- __seterrno ();
- goto err_close_check_write_ready_evt;
- }
if (!(cancel_evt = create_event ()))
- goto err_close_write_ready_ok_evt;
+ goto err_close_update_needed_evt;
if (!(thr_sync_evt = create_event ()))
goto err_close_cancel_evt;
+
me.winpid = GetCurrentProcessId ();
me.fh = this;
- new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
- /* Wait until there's an owner. */
- owner_lock ();
- while (!get_owner ())
+ nreaders_lock ();
+ if (inc_nreaders () == 1)
{
- owner_unlock ();
- yield ();
- owner_lock ();
+ /* Reinitialize _sh_fc_handler_updated, which starts as 0. */
+ shared_fc_handler_updated (true);
+ set_owner (me);
}
- owner_unlock ();
+ new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
+ SetEvent (read_ready);
+ nreaders_unlock ();
/* If we're a duplexer, we need a handle for writing. */
if (duplexer)
HANDLE ph = NULL;
NTSTATUS status;
+ nwriters_lock ();
+ inc_nwriters ();
+ SetEvent (write_ready);
+ nwriters_unlock ();
+
while (1)
{
status = open_pipe (ph);
else
{
__seterrno_from_nt_status (status);
+ nohandle (true);
goto err_close_reader;
}
}
}
/* Not a duplexer; wait for a writer to connect if we're blocking. */
- else if (!(flags & O_NONBLOCK))
- {
- if (!first)
- {
- /* Ask the owner to update write_ready. */
- SetEvent (check_write_ready_evt);
- WaitForSingleObject (write_ready_ok_evt, INFINITE);
- }
- if (!wait (write_ready))
- goto err_close_reader;
- }
- reader_opening_unlock ();
+ else if (!wait (write_ready))
+ goto err_close_reader;
goto success;
}
- /* If we're writing, wait for read_ready, connect to the pipe, and
- signal write_ready. */
+ /* If we're writing, wait for read_ready, connect to the pipe, open
+ the shared memory, and signal write_ready. */
if (writer)
{
NTSTATUS status;
+ /* Don't let a reader see EOF at this point. */
SetEvent (writer_opening);
- if (!wait (read_ready))
- {
- ResetEvent (writer_opening);
- goto err_close_writer_opening;
- }
while (1)
{
+ if (!wait (read_ready))
+ {
+ ResetEvent (writer_opening);
+ goto err_close_writer_opening;
+ }
status = open_pipe (get_handle ());
if (NT_SUCCESS (status))
- goto writer_success;
+ goto writer_shmem;
else if (status == STATUS_OBJECT_NAME_NOT_FOUND)
{
- /* The pipe hasn't been created yet. */
+ /* The pipe hasn't been created yet or there's no longer
+ a reader open. */
yield ();
continue;
}
and/or many writers are trying to connect simultaneously */
while (1)
{
- SetEvent (writer_opening);
if (!wait (read_ready))
{
ResetEvent (writer_opening);
}
status = wait_open_pipe (get_handle ());
if (NT_SUCCESS (status))
- goto writer_success;
+ goto writer_shmem;
else if (status == STATUS_IO_TIMEOUT)
continue;
else
}
}
}
-writer_success:
+writer_shmem:
+ if (create_shmem (true) < 0)
+ goto err_close_writer_opening;
+/* writer_success: */
set_pipe_non_blocking (get_handle (), flags & O_NONBLOCK);
+ nwriters_lock ();
+ inc_nwriters ();
SetEvent (write_ready);
+ ResetEvent (writer_opening);
+ nwriters_unlock ();
success:
return 1;
err_close_reader:
saved_errno = get_errno ();
close ();
set_errno (saved_errno);
- reader_opening_unlock ();
return 0;
+/* err_close_thr_sync_evt: */
+/* NtClose (thr_sync_evt); */
err_close_cancel_evt:
NtClose (cancel_evt);
-err_close_write_ready_ok_evt:
- NtClose (write_ready_ok_evt);
-err_close_check_write_ready_evt:
- NtClose (check_write_ready_evt);
err_close_update_needed_evt:
NtClose (update_needed_evt);
err_close_owner_found_evt:
NtClose (owner_found_evt);
err_close_owner_needed_evt:
NtClose (owner_needed_evt);
-err_dec_nreaders:
- if (dec_nreaders () == 0)
- ResetEvent (read_ready);
- reader_opening_unlock ();
-/* err_close_shared_fc_handler: */
+err_close_shared_fc_handler:
NtUnmapViewOfSection (NtCurrentProcess (), shared_fc_handler);
NtClose (shared_fc_hdl);
err_close_shmem:
int
fhandler_fifo::close ()
{
+ if (writer)
+ {
+ nwriters_lock ();
+ if (dec_nwriters () == 0)
+ ResetEvent (write_ready);
+ nwriters_unlock ();
+ }
if (reader)
{
/* If we're the owner, we can't close our fc_handlers if a new
NtClose (owner_found_evt);
if (update_needed_evt)
NtClose (update_needed_evt);
- if (check_write_ready_evt)
- NtClose (check_write_ready_evt);
- if (write_ready_ok_evt)
- NtClose (write_ready_ok_evt);
if (cancel_evt)
NtClose (cancel_evt);
if (thr_sync_evt)
NtClose (thr_sync_evt);
- if (shmem)
- NtUnmapViewOfSection (NtCurrentProcess (), shmem);
- if (shmem_handle)
- NtClose (shmem_handle);
if (shared_fc_handler)
NtUnmapViewOfSection (NtCurrentProcess (), shared_fc_handler);
if (shared_fc_hdl)
NtClose (shared_fc_hdl);
}
+ if (shmem)
+ NtUnmapViewOfSection (NtCurrentProcess (), shmem);
+ if (shmem_handle)
+ NtClose (shmem_handle);
if (read_ready)
NtClose (read_ready);
if (write_ready)
__seterrno ();
goto err_close_write_ready;
}
+ if (!DuplicateHandle (GetCurrentProcess (), shmem_handle,
+ GetCurrentProcess (), &fhf->shmem_handle,
+ 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
+ {
+ __seterrno ();
+ goto err_close_writer_opening;
+ }
+ if (fhf->reopen_shmem () < 0)
+ goto err_close_shmem_handle;
if (reader)
{
/* Make sure the child starts unlocked. */
fhf->nhandlers = fhf->shandlers = 0;
fhf->fc_handler = NULL;
- if (!DuplicateHandle (GetCurrentProcess (), shmem_handle,
- GetCurrentProcess (), &fhf->shmem_handle,
- 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
- {
- __seterrno ();
- goto err_close_writer_opening;
- }
- if (fhf->reopen_shmem () < 0)
- goto err_close_shmem_handle;
if (!DuplicateHandle (GetCurrentProcess (), shared_fc_hdl,
GetCurrentProcess (), &fhf->shared_fc_hdl,
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
__seterrno ();
goto err_close_owner_found_evt;
}
- if (!DuplicateHandle (GetCurrentProcess (), check_write_ready_evt,
- GetCurrentProcess (), &fhf->check_write_ready_evt,
- 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
- {
- __seterrno ();
- goto err_close_update_needed_evt;
- }
- if (!DuplicateHandle (GetCurrentProcess (), write_ready_ok_evt,
- GetCurrentProcess (), &fhf->write_ready_ok_evt,
- 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
- {
- __seterrno ();
- goto err_close_check_write_ready_evt;
- }
if (!(fhf->cancel_evt = create_event ()))
- goto err_close_write_ready_ok_evt;
+ goto err_close_update_needed_evt;
if (!(fhf->thr_sync_evt = create_event ()))
goto err_close_cancel_evt;
inc_nreaders ();
fhf->me.fh = fhf;
new cygthread (fifo_reader_thread, fhf, "fifo_reader", fhf->thr_sync_evt);
}
+ if (writer)
+ inc_nwriters ();
return 0;
err_close_cancel_evt:
NtClose (fhf->cancel_evt);
-err_close_write_ready_ok_evt:
- NtClose (fhf->write_ready_ok_evt);
-err_close_check_write_ready_evt:
- NtClose (fhf->check_write_ready_evt);
err_close_update_needed_evt:
NtClose (fhf->update_needed_evt);
err_close_owner_found_evt:
fork_fixup (parent, read_ready, "read_ready");
fork_fixup (parent, write_ready, "write_ready");
fork_fixup (parent, writer_opening, "writer_opening");
+ fork_fixup (parent, shmem_handle, "shmem_handle");
+ if (reopen_shmem () < 0)
+ api_fatal ("Can't reopen shared memory during fork, %E");
if (reader)
{
/* Make sure the child starts unlocked. */
fifo_client_unlock ();
- fork_fixup (parent, shmem_handle, "shmem_handle");
- if (reopen_shmem () < 0)
- api_fatal ("Can't reopen shared memory during fork, %E");
fork_fixup (parent, shared_fc_hdl, "shared_fc_hdl");
if (reopen_shared_fc_handler () < 0)
api_fatal ("Can't reopen shared fc_handler memory during fork, %E");
fork_fixup (parent, owner_needed_evt, "owner_needed_evt");
fork_fixup (parent, owner_found_evt, "owner_found_evt");
fork_fixup (parent, update_needed_evt, "update_needed_evt");
- fork_fixup (parent, check_write_ready_evt, "check_write_ready_evt");
- fork_fixup (parent, write_ready_ok_evt, "write_ready_ok_evt");
if (close_on_exec ())
/* Prevent a later attempt to close the non-inherited
pipe-instance handles copied from the parent. */
me.winpid = GetCurrentProcessId ();
new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
}
+ if (writer)
+ inc_nwriters ();
}
void
fhandler_fifo::fixup_after_exec ()
{
fhandler_base::fixup_after_exec ();
- if (reader && !close_on_exec ())
+ if (close_on_exec ())
+ return;
+ if (reopen_shmem () < 0)
+ api_fatal ("Can't reopen shared memory during exec, %E");
+ if (reader)
{
/* Make sure the child starts unlocked. */
fifo_client_unlock ();
- if (reopen_shmem () < 0)
- api_fatal ("Can't reopen shared memory during exec, %E");
if (reopen_shared_fc_handler () < 0)
api_fatal ("Can't reopen shared fc_handler memory during exec, %E");
fc_handler = NULL;
me.winpid = GetCurrentProcessId ();
new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
}
+ if (writer)
+ inc_nwriters ();
}
void
set_no_inheritance (owner_needed_evt, val);
set_no_inheritance (owner_found_evt, val);
set_no_inheritance (update_needed_evt, val);
- set_no_inheritance (check_write_ready_evt, val);
- set_no_inheritance (write_ready_ok_evt, val);
fifo_client_lock ();
for (int i = 0; i < nhandlers; i++)
set_no_inheritance (fc_handler[i].h, val);