|| _s == STATUS_PIPE_EMPTY; })
fhandler_fifo::fhandler_fifo ():
- fhandler_base (),
- read_ready (NULL), write_ready (NULL)
+ fhandler_base (), read_ready (NULL), write_ready (NULL),
+ listen_client_thr (NULL), lct_termination_evt (NULL), nclients (0),
+ nconnected (0)
{
pipe_name_buf[0] = L'\0';
need_fork_fixup (true);
return res;
}
+static HANDLE
+create_event ()
+{
+ NTSTATUS status;
+ OBJECT_ATTRIBUTES attr;
+ HANDLE evt = NULL;
+
+ InitializeObjectAttributes (&attr, NULL, 0, NULL, NULL);
+ status = NtCreateEvent (&evt, EVENT_ALL_ACCESS, &attr,
+ NotificationEvent, FALSE);
+ if (!NT_SUCCESS (status))
+ __seterrno_from_nt_status (status);
+ return evt;
+}
+
+
+static void
+set_pipe_non_blocking (HANDLE ph, bool nonblocking)
+{
+ NTSTATUS status;
+ IO_STATUS_BLOCK io;
+ FILE_PIPE_INFORMATION fpi;
+
+ fpi.ReadMode = FILE_PIPE_MESSAGE_MODE;
+ fpi.CompletionMode = nonblocking ? FILE_PIPE_COMPLETE_OPERATION
+ : FILE_PIPE_QUEUE_OPERATION;
+ status = NtSetInformationFile (ph, &io, &fpi, sizeof fpi,
+ FilePipeInformation);
+ if (!NT_SUCCESS (status))
+ debug_printf ("NtSetInformationFile(FilePipeInformation): %y", status);
+}
+
+/* The pipe instance is always in blocking mode when this is called. */
+int
+fifo_client_handler::connect ()
+{
+ NTSTATUS status;
+ IO_STATUS_BLOCK io;
+
+ if (connect_evt)
+ ResetEvent (connect_evt);
+ else if (!(connect_evt = create_event ()))
+ return -1;
+ status = NtFsControlFile (fh->get_handle (), connect_evt, NULL, NULL, &io,
+ FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
+ switch (status)
+ {
+ case STATUS_PENDING:
+ case STATUS_PIPE_LISTENING:
+ state = fc_connecting;
+ break;
+ case STATUS_PIPE_CONNECTED:
+ state = fc_connected;
+ set_pipe_non_blocking (fh->get_handle (), true);
+ break;
+ default:
+ __seterrno_from_nt_status (status);
+ return -1;
+ }
+ return 0;
+}
+
+int
+fhandler_fifo::disconnect_and_reconnect (int i)
+{
+ NTSTATUS status;
+ IO_STATUS_BLOCK io;
+ HANDLE ph = client[i].fh->get_handle ();
+
+ status = NtFsControlFile (ph, NULL, NULL, NULL, &io, FSCTL_PIPE_DISCONNECT,
+ NULL, 0, NULL, 0);
+ /* Short-lived. Don't use cygwait. We don't want to be interrupted. */
+ if (status == STATUS_PENDING
+ && NtWaitForSingleObject (ph, FALSE, NULL) == WAIT_OBJECT_0)
+ status = io.Status;
+ if (!NT_SUCCESS (status))
+ {
+ __seterrno_from_nt_status (status);
+ return -1;
+ }
+ set_pipe_non_blocking (client[i].fh->get_handle (), false);
+ if (client[i].connect () < 0)
+ return -1;
+ if (client[i].state == fc_connected)
+ nconnected++;
+ return 0;
+}
+
NTSTATUS
fhandler_fifo::npfs_handle (HANDLE &nph)
{
return status;
}
-/* Called when pipe is opened for reading. */
+/* Called when a FIFO is first opened for reading and again each time
+ a new client is needed. Each pipe instance is created in blocking
+ mode so that we can easily wait for a connection. After it is
+ connected, it is put in nonblocking mode. */
HANDLE
-fhandler_fifo::create_pipe ()
+fhandler_fifo::create_pipe_instance (bool first)
{
NTSTATUS status;
HANDLE npfsh;
ULONG hattr;
ULONG sharing;
ULONG nonblocking = FILE_PIPE_QUEUE_OPERATION;
- ULONG max_instances = 1;
+ ULONG max_instances = -1;
LARGE_INTEGER timeout;
status = npfs_handle (npfsh);
access = GENERIC_READ | FILE_READ_ATTRIBUTES | FILE_WRITE_ATTRIBUTES
| SYNCHRONIZE;
sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
- hattr = OBJ_INHERIT | OBJ_CASE_INSENSITIVE;
+ hattr = OBJ_INHERIT;
+ if (first)
+ hattr |= OBJ_CASE_INSENSITIVE;
InitializeObjectAttributes (&attr, get_pipe_name (),
hattr, npfsh, NULL);
timeout.QuadPart = -500000;
status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing,
- FILE_CREATE, 0,
+ first ? FILE_CREATE : FILE_OPEN, 0,
FILE_PIPE_MESSAGE_TYPE,
FILE_PIPE_MESSAGE_MODE,
nonblocking, max_instances,
return ph;
}
-/* Called when file is opened for writing. */
+/* Called when a FIFO is opened for writing. */
NTSTATUS
fhandler_fifo::open_pipe ()
{
return status;
}
+int
+fhandler_fifo::add_client ()
+{
+ fifo_client_handler fc;
+ fhandler_base *fh;
+ bool first = (nclients == 0);
+
+ if (nclients == MAX_CLIENTS)
+ {
+ set_errno (EMFILE);
+ return -1;
+ }
+ if (!(fc.dummy_evt = create_event ()))
+ return -1;
+ if (!(fh = build_fh_dev (dev ())))
+ {
+ set_errno (EMFILE);
+ return -1;
+ }
+ fc.fh = fh;
+ HANDLE ph = create_pipe_instance (first);
+ if (!ph)
+ goto errout;
+ fh->set_io_handle (ph);
+ fh->set_flags (get_flags ());
+ if (fc.connect () < 0)
+ {
+ fc.close ();
+ goto errout;
+ }
+ if (fc.state == fc_connected)
+ nconnected++;
+ client[nclients++] = fc;
+ return 0;
+errout:
+ delete fh;
+ return -1;
+
+}
+
+/* Just hop to the listen_client_thread method. */
+DWORD WINAPI
+listen_client_func (LPVOID param)
+{
+ fhandler_fifo *fh = (fhandler_fifo *) param;
+ return fh->listen_client_thread ();
+}
+
+/* Start a thread that listens for client connections. Whenever a new
+ client connects, it creates a new pipe_instance if necessary.
+ (There may already be an available instance if a client has
+ disconnected.) */
+bool
+fhandler_fifo::listen_client ()
+{
+ if (!(lct_termination_evt = create_event ()))
+ return false;
+
+ listen_client_thr = CreateThread (NULL, PREFERRED_IO_BLKSIZE,
+ listen_client_func, (PVOID) this, 0, NULL);
+ if (!listen_client_thr)
+ {
+ __seterrno ();
+ HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL);
+ if (evt)
+ CloseHandle (evt);
+ return false;
+ }
+ return true;
+}
+
+DWORD
+fhandler_fifo::listen_client_thread ()
+{
+ while (1)
+ {
+ bool found;
+ HANDLE w[MAX_CLIENTS + 1];
+ int i;
+ DWORD wait_ret;
+
+ found = false;
+ for (i = 0; i < nclients; i++)
+ switch (client[i].state)
+ {
+ case fc_invalid:
+ if (disconnect_and_reconnect (i) < 0)
+ goto errout;
+ /* Fall through. */
+ case fc_connected:
+ w[i] = client[i].dummy_evt;
+ break;
+ case fc_connecting:
+ found = true;
+ w[i] = client[i].connect_evt;
+ break;
+ case fc_unknown: /* Shouldn't happen. */
+ default:
+ break;
+ }
+ w[nclients] = lct_termination_evt;
+ if (!found)
+ {
+ if (add_client () < 0)
+ goto errout;
+ else
+ continue;
+ }
+ if (!arm (read_ready))
+ {
+ __seterrno ();
+ goto errout;
+ }
+
+ /* Wait for a client to connect. */
+ wait_ret = WaitForMultipleObjects (nclients + 1, w, false, INFINITE);
+ i = wait_ret - WAIT_OBJECT_0;
+ if (i < 0 || i > nclients)
+ goto errout;
+ else if (i == nclients) /* Reader is closing. */
+ return 0;
+ else
+ {
+ client[i].state = fc_connected;
+ nconnected++;
+ set_pipe_non_blocking (client[i].fh->get_handle (), true);
+ yield ();
+ }
+ }
+errout:
+ ResetEvent (read_ready);
+ return -1;
+}
+
int
fhandler_fifo::open (int flags, mode_t)
{
error_set_errno
} res;
bool reader, writer, duplexer;
- HANDLE ph = NULL;
/* Determine what we're doing with this fhandler: reading, writing, both */
switch (flags & O_ACCMODE)
debug_only_printf ("reader %d, writer %d, duplexer %d", reader, writer, duplexer);
set_flags (flags);
+ if (reader)
+ nohandle (true);
+
/* Create control events for this named pipe */
char char_sa_buf[1024];
LPSECURITY_ATTRIBUTES sa_buf;
goto out;
}
- /* If we're reading, create the pipe, signal that we're ready and wait for
- a writer.
- FIXME: Probably need to special case O_RDWR case. */
+ /* If we're reading, start the listen_client thread (which should
+ signal read_ready), and wait for a writer. */
if (reader)
{
- ph = create_pipe ();
- if (!ph)
+ if (!listen_client ())
{
- debug_printf ("create of reader failed");
+ debug_printf ("create of listen_client thread failed");
res = error_errno_set;
goto out;
}
- else if (!arm (read_ready))
+ /* Wait for the listen_client thread to create the pipe and
+ signal read_ready. This should be quick. */
+ HANDLE w[2] = { listen_client_thr, read_ready };
+ switch (WaitForMultipleObjects (2, w, FALSE, INFINITE))
{
+ case WAIT_OBJECT_0:
+ debug_printf ("listen_client_thread exited unexpectedly");
+ DWORD err;
+ GetExitCodeThread (listen_client_thr, &err);
+ __seterrno_from_win_error (err);
+ res = error_errno_set;
+ goto out;
+ break;
+ case WAIT_OBJECT_0 + 1:
+ if (!arm (read_ready))
+ {
+ res = error_set_errno;
+ goto out;
+ }
+ break;
+ default:
res = error_set_errno;
goto out;
+ break;
}
- else if (!duplexer && !wait (write_ready))
+ if (!duplexer && !wait (write_ready))
{
res = error_errno_set;
goto out;
}
/* If we're writing, wait for read_ready and then connect to the
- pipe. Then signal write_ready. */
+ pipe. This should always succeed quickly if the reader's
+ listen_client thread is running. Then signal write_ready. */
if (writer)
{
if (!wait (read_ready))
goto out;
}
else
- res = success;
+ {
+ set_pipe_non_blocking (get_handle (), true);
+ res = success;
+ }
}
out:
if (res == error_set_errno)
}
if (get_io_handle ())
CloseHandle (get_io_handle ());
+ if (listen_client_thr)
+ CloseHandle (listen_client_thr);
}
debug_printf ("res %d", res);
return res == success;
fhandler_fifo::raw_read (void *in_ptr, size_t& len)
{
size_t orig_len = len;
+
+ /* Start the listen_client thread if necessary (e.g., after dup or fork). */
+ if (!listen_client_thr && !listen_client ())
+ goto errout;
+
while (1)
{
- len = orig_len;
- fhandler_base::raw_read (in_ptr, len);
- ssize_t nread = (ssize_t) len;
- if (nread > 0)
- return;
- else if (nread < 0 && GetLastError () != ERROR_NO_DATA)
- goto errout;
- else if (nread == 0) /* Writer has disconnected. */
+ if (nconnected == 0) /* EOF */
{
- /* Not implemented yet. */
+ len = 0;
+ return;
}
+
+ /* Poll the connected clients for input. */
+ for (int i = 0; i < nclients; i++)
+ if (client[i].state == fc_connected)
+ {
+ len = orig_len;
+ client[i].fh->fhandler_base::raw_read (in_ptr, len);
+ ssize_t nread = (ssize_t) len;
+ if (nread > 0)
+ return;
+ else if (nread < 0 && GetLastError () != ERROR_NO_DATA)
+ goto errout;
+ else if (nread == 0) /* Client has disconnected. */
+ {
+ client[i].state = fc_invalid;
+ nconnected--;
+ }
+ }
if (is_nonblocking ())
{
set_errno (EAGAIN);
return fh.fstatvfs (sfs);
}
+int
+fifo_client_handler::close ()
+{
+ int res = 0;
+
+ if (fh)
+ res = fh->close ();
+ if (connect_evt)
+ CloseHandle (connect_evt);
+ if (dummy_evt)
+ CloseHandle (dummy_evt);
+ return res;
+}
+
int
fhandler_fifo::close ()
{
- CloseHandle (read_ready);
- CloseHandle (write_ready);
- return fhandler_base::close ();
+ int res = 0;
+ HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL);
+ HANDLE thr = InterlockedExchangePointer (&listen_client_thr, NULL);
+ if (thr)
+ {
+ if (evt)
+ SetEvent (evt);
+ WaitForSingleObject (thr, INFINITE);
+ DWORD err;
+ GetExitCodeThread (thr, &err);
+ if (err)
+ debug_printf ("listen_client_thread exited with code %d", err);
+ CloseHandle (thr);
+ }
+ if (evt)
+ CloseHandle (evt);
+ if (read_ready)
+ CloseHandle (read_ready);
+ if (write_ready)
+ CloseHandle (write_ready);
+ for (int i = 0; i < nclients; i++)
+ if (client[i].close () < 0)
+ res = -1;
+ return fhandler_base::close () || res;
}
int