]> sourceware.org Git - newlib-cygwin.git/commitdiff
Cygwin: FIFO: support opening multiple readers
authorKen Brown <kbrown@cornell.edu>
Wed, 6 May 2020 15:31:39 +0000 (11:31 -0400)
committerKen Brown <kbrown@cornell.edu>
Fri, 8 May 2020 10:45:24 +0000 (06:45 -0400)
Although we can have multiple readers open because of dup/fork/exec,
the current code does not support multiple readers opening a FIFO by
explicitly calling 'open'.

The main complication in supporting this is that when a blocking
reader tries to open and there's already one open, it has to check
whether there any writers open.  It can't rely on the write_ready
event, whose state hasn't changed since the first writer opened.

To fix this, add two new named events, check_write_ready_evt and
write_ready_ok_evt, and a new method, check_write_ready().

The first event signals the owner's reader thread to call
check_write_ready(), which polls the fc_handler list to check for
connected writers.  If it finds none, it checks to see if there's a
writer in the process and then sets/resets write_ready appropriately.

When check_write_ready() finishes it sets write_ready_ok_evt to signal
the reader that write_ready has been updated.

The polling is done via fifo_client_handler::pipe_state().  As long as
it's calling that function anyway, check_write_ready() updates the
state of each handler.

Also add a new lock to prevent a race if two readers are trying to
open simultaneously.

winsup/cygwin/fhandler.h
winsup/cygwin/fhandler_fifo.cc

index 31c65866e20ccaad6b9ae9ed59753035ce7d2ca7..8a23d675309f03296227766732148d2093dbd0ea 100644 (file)
@@ -1324,7 +1324,7 @@ class fifo_shmem_t
 {
   LONG _nreaders;
   fifo_reader_id_t _owner, _prev_owner, _pending_owner;
-  af_unix_spinlock_t _owner_lock, _reading_lock;
+  af_unix_spinlock_t _owner_lock, _reading_lock, _reader_opening_lock;
 
   /* Info about shared memory block used for temporary storage of the
      owner's fc_handler list. */
@@ -1346,6 +1346,8 @@ public:
   void owner_unlock () { _owner_lock.unlock (); }
   void reading_lock () { _reading_lock.lock (); }
   void reading_unlock () { _reading_lock.unlock (); }
+  void reader_opening_lock () { _reader_opening_lock.lock (); }
+  void reader_opening_unlock () { _reader_opening_lock.unlock (); }
 
   int get_shared_nhandlers () const { return (int) _sh_nhandlers; }
   void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); }
@@ -1371,6 +1373,8 @@ class fhandler_fifo: public fhandler_base
   HANDLE owner_needed_evt;      /* The owner is closing. */
   HANDLE owner_found_evt;       /* A new owner has taken over. */
   HANDLE update_needed_evt;     /* shared_fc_handler needs updating. */
+  HANDLE check_write_ready_evt; /* write_ready needs to be checked. */
+  HANDLE write_ready_ok_evt;    /* check_write_ready is done. */
 
   /* Handles to non-shared events needed for fifo_reader_threads. */
   HANDLE cancel_evt;            /* Signal thread to terminate. */
@@ -1448,6 +1452,9 @@ class fhandler_fifo: public fhandler_base
   { return shmem->shared_fc_handler_updated (); }
   void shared_fc_handler_updated (bool val)
   { shmem->shared_fc_handler_updated (val); }
+  void check_write_ready ();
+  void reader_opening_lock () { shmem->reader_opening_lock (); }
+  void reader_opening_unlock () { shmem->reader_opening_unlock (); }
 
 public:
   fhandler_fifo ();
index 5c059a567e97248448df131eaec3bfa769928719..4c7d51edb89ca5afa198245f657b73674a441316 100644 (file)
@@ -75,6 +75,7 @@ fhandler_fifo::fhandler_fifo ():
   fhandler_base (),
   read_ready (NULL), write_ready (NULL), writer_opening (NULL),
   owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL),
+  check_write_ready_evt (NULL), write_ready_ok_evt (NULL),
   cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false),
   fc_handler (NULL), shandlers (0), nhandlers (0),
   reader (false), writer (false), duplexer (false),
@@ -441,6 +442,45 @@ fhandler_fifo::update_shared_handlers ()
   return 0;
 }
 
+/* The write_ready event gets set when a writer opens, to indicate
+   that a blocking reader can open.  If a second reader wants to open,
+   we need to see if there are still any writers open. */
+void
+fhandler_fifo::check_write_ready ()
+{
+  bool set = false;
+
+  fifo_client_lock ();
+  for (int i = 0; i < nhandlers && !set; i++)
+    switch (fc_handler[i].pipe_state ())
+      {
+      case FILE_PIPE_CONNECTED_STATE:
+       fc_handler[i].state = fc_connected;
+       set = true;
+       break;
+      case FILE_PIPE_INPUT_AVAILABLE_STATE:
+       fc_handler[i].state = fc_input_avail;
+       set = true;
+       break;
+      case FILE_PIPE_DISCONNECTED_STATE:
+       fc_handler[i].state = fc_disconnected;
+       break;
+      case FILE_PIPE_LISTENING_STATE:
+       fc_handler[i].state = fc_listening;
+      case FILE_PIPE_CLOSING_STATE:
+       fc_handler[i].state = fc_closing;
+      default:
+       fc_handler[i].state = fc_error;
+       break;
+      }
+  fifo_client_unlock ();
+  if (set || IsEventSignalled (writer_opening))
+    SetEvent (write_ready);
+  else
+    ResetEvent (write_ready);
+  SetEvent (write_ready_ok_evt);
+}
+
 static DWORD WINAPI
 fifo_reader_thread (LPVOID param)
 {
@@ -526,13 +566,15 @@ fhandler_fifo::fifo_reader_thread_func ()
          IO_STATUS_BLOCK io;
          bool cancel = false;
          bool update = false;
+         bool check = false;
 
          status = NtFsControlFile (fc.h, conn_evt, NULL, NULL, &io,
                                    FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
          if (status == STATUS_PENDING)
            {
-             HANDLE w[3] = { conn_evt, update_needed_evt, cancel_evt };
-             switch (WaitForMultipleObjects (3, w, false, INFINITE))
+             HANDLE w[4] = { conn_evt, update_needed_evt,
+               check_write_ready_evt, cancel_evt };
+             switch (WaitForMultipleObjects (4, w, false, INFINITE))
                {
                case WAIT_OBJECT_0:
                  status = io.Status;
@@ -544,6 +586,10 @@ fhandler_fifo::fifo_reader_thread_func ()
                  update = true;
                  break;
                case WAIT_OBJECT_0 + 2:
+                 status = STATUS_WAIT_2;
+                 check = true;
+                 break;
+               case WAIT_OBJECT_0 + 3:
                  status = STATUS_THREAD_IS_TERMINATING;
                  cancel = true;
                  update = true;
@@ -570,6 +616,7 @@ fhandler_fifo::fifo_reader_thread_func ()
              break;
            case STATUS_THREAD_IS_TERMINATING:
            case STATUS_WAIT_1:
+           case STATUS_WAIT_2:
              /* Try to connect a bogus client.  Otherwise fc is still
                 listening, and the next connection might not get recorded. */
              status1 = open_pipe (ph);
@@ -602,6 +649,8 @@ fhandler_fifo::fifo_reader_thread_func ()
            NtClose (ph);
          if (update && update_shared_handlers () < 0)
            api_fatal ("Can't update shared handlers, %E");
+         if (check)
+           check_write_ready ();
          if (cancel)
            goto canceled;
        }
@@ -833,14 +882,19 @@ fhandler_fifo::open (int flags, mode_t)
      and start the fifo_reader_thread. */
   if (reader)
     {
+      bool first = true;
+
       SetEvent (read_ready);
       if (create_shmem () < 0)
        goto err_close_writer_opening;
       if (create_shared_fc_handler () < 0)
        goto err_close_shmem;
-      inc_nreaders ();
-      /* Reinitialize _sh_fc_handler_updated, which starts as 0. */
-      shared_fc_handler_updated (true);
+      reader_opening_lock ();
+      if (inc_nreaders () == 1)
+       /* Reinitialize _sh_fc_handler_updated, which starts as 0. */
+       shared_fc_handler_updated (true);
+      else
+       first = false;
       npbuf[0] = 'n';
       if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf)))
        {
@@ -862,9 +916,23 @@ fhandler_fifo::open (int flags, mode_t)
          __seterrno ();
          goto err_close_owner_found_evt;
        }
+      npbuf[0] = 'c';
+      if (!(check_write_ready_evt = CreateEvent (sa_buf, false, false, npbuf)))
+       {
+         debug_printf ("CreateEvent for %s failed, %E", npbuf);
+         __seterrno ();
+         goto err_close_update_needed_evt;
+       }
+      npbuf[0] = 'k';
+      if (!(write_ready_ok_evt = CreateEvent (sa_buf, false, false, npbuf)))
+       {
+         debug_printf ("CreateEvent for %s failed, %E", npbuf);
+         __seterrno ();
+         goto err_close_check_write_ready_evt;
+       }
       /* Make cancel and sync inheritable for exec. */
       if (!(cancel_evt = create_event (true)))
-       goto err_close_update_needed_evt;
+       goto err_close_write_ready_ok_evt;
       if (!(thr_sync_evt = create_event (true)))
        goto err_close_cancel_evt;
       me.winpid = GetCurrentProcessId ();
@@ -908,9 +976,19 @@ fhandler_fifo::open (int flags, mode_t)
                }
            }
        }
-      /* Not a duplexer; wait for a writer to connect. */
-      else if (!wait (write_ready))
-       goto err_close_reader;
+      /* Not a duplexer; wait for a writer to connect if we're blocking. */
+      else if (!(flags & O_NONBLOCK))
+       {
+         if (!first)
+           {
+             /* Ask the owner to update write_ready. */
+             SetEvent (check_write_ready_evt);
+             WaitForSingleObject (write_ready_ok_evt, INFINITE);
+           }
+         if (!wait (write_ready))
+           goto err_close_reader;
+       }
+      reader_opening_unlock ();
       goto success;
     }
 
@@ -984,6 +1062,10 @@ err_close_reader:
   return 0;
 err_close_cancel_evt:
   NtClose (cancel_evt);
+err_close_write_ready_ok_evt:
+  NtClose (write_ready_ok_evt);
+err_close_check_write_ready_evt:
+  NtClose (check_write_ready_evt);
 err_close_update_needed_evt:
   NtClose (update_needed_evt);
 err_close_owner_found_evt:
@@ -993,6 +1075,7 @@ err_close_owner_needed_evt:
 err_dec_nreaders:
   if (dec_nreaders () == 0)
     ResetEvent (read_ready);
+  reader_opening_unlock ();
 /* err_close_shared_fc_handler: */
   NtUnmapViewOfSection (NtCurrentProcess (), shared_fc_handler);
   NtClose (shared_fc_hdl);
@@ -1396,6 +1479,10 @@ fhandler_fifo::close ()
        NtClose (owner_found_evt);
       if (update_needed_evt)
        NtClose (update_needed_evt);
+      if (check_write_ready_evt)
+       NtClose (check_write_ready_evt);
+      if (write_ready_ok_evt)
+       NtClose (write_ready_ok_evt);
       if (cancel_evt)
        NtClose (cancel_evt);
       if (thr_sync_evt)
@@ -1519,8 +1606,22 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
          __seterrno ();
          goto err_close_owner_found_evt;
        }
+      if (!DuplicateHandle (GetCurrentProcess (), check_write_ready_evt,
+                           GetCurrentProcess (), &fhf->check_write_ready_evt,
+                           0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
+       {
+         __seterrno ();
+         goto err_close_update_needed_evt;
+       }
+      if (!DuplicateHandle (GetCurrentProcess (), write_ready_ok_evt,
+                           GetCurrentProcess (), &fhf->write_ready_ok_evt,
+                           0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
+       {
+         __seterrno ();
+         goto err_close_check_write_ready_evt;
+       }
       if (!(fhf->cancel_evt = create_event (true)))
-       goto err_close_update_needed_evt;
+       goto err_close_write_ready_ok_evt;
       if (!(fhf->thr_sync_evt = create_event (true)))
        goto err_close_cancel_evt;
       inc_nreaders ();
@@ -1530,6 +1631,10 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
   return 0;
 err_close_cancel_evt:
   NtClose (fhf->cancel_evt);
+err_close_write_ready_ok_evt:
+  NtClose (fhf->write_ready_ok_evt);
+err_close_check_write_ready_evt:
+  NtClose (fhf->check_write_ready_evt);
 err_close_update_needed_evt:
   NtClose (fhf->update_needed_evt);
 err_close_owner_found_evt:
@@ -1575,6 +1680,8 @@ fhandler_fifo::fixup_after_fork (HANDLE parent)
       fork_fixup (parent, owner_needed_evt, "owner_needed_evt");
       fork_fixup (parent, owner_found_evt, "owner_found_evt");
       fork_fixup (parent, update_needed_evt, "update_needed_evt");
+      fork_fixup (parent, check_write_ready_evt, "check_write_ready_evt");
+      fork_fixup (parent, write_ready_ok_evt, "write_ready_ok_evt");
       if (close_on_exec ())
        /* Prevent a later attempt to close the non-inherited
           pipe-instance handles copied from the parent. */
@@ -1658,6 +1765,8 @@ fhandler_fifo::set_close_on_exec (bool val)
       set_no_inheritance (owner_needed_evt, val);
       set_no_inheritance (owner_found_evt, val);
       set_no_inheritance (update_needed_evt, val);
+      set_no_inheritance (check_write_ready_evt, val);
+      set_no_inheritance (write_ready_ok_evt, val);
       set_no_inheritance (cancel_evt, val);
       set_no_inheritance (thr_sync_evt, val);
       fifo_client_lock ();
This page took 0.044552 seconds and 5 git commands to generate.