[PATCH 19/21] Cygwin: FIFO: allow any reader to take ownership

Ken Brown kbrown@cornell.edu
Thu May 7 20:21:22 GMT 2020


Add a take_ownership method, used by raw_read and select.cc:peek_fifo.
It wakes up all fifo_reader_threads and allows the caller to become
owner.  The work is done by the fifo_reader_threads.

For synchronization we introduce several new fhandler_fifo data
members and methods:

- update_needed_evt signals the current owner to stop listening for
  writer connections and update its fc_handler list.

- shared_fc_handler() gets and sets the status of the fc_handler
  update process.

- get_pending_owner() and set_pending_owner() get and set the reader
  that is requesting ownership.

Finally, a new 'reading_lock' prevents two readers from trying to take
ownership simultaneously.
---
 winsup/cygwin/fhandler.h       |  28 ++++++++-
 winsup/cygwin/fhandler_fifo.cc | 106 +++++++++++++++++++++++++++++----
 winsup/cygwin/select.cc        |   4 ++
 3 files changed, 122 insertions(+), 16 deletions(-)

diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h
index f8c1b52a4..31c65866e 100644
--- a/winsup/cygwin/fhandler.h
+++ b/winsup/cygwin/fhandler.h
@@ -1323,12 +1323,13 @@ struct fifo_reader_id_t
 class fifo_shmem_t
 {
   LONG _nreaders;
-  fifo_reader_id_t _owner, _prev_owner;
-  af_unix_spinlock_t _owner_lock;
+  fifo_reader_id_t _owner, _prev_owner, _pending_owner;
+  af_unix_spinlock_t _owner_lock, _reading_lock;
 
   /* Info about shared memory block used for temporary storage of the
      owner's fc_handler list. */
-  LONG _sh_nhandlers, _sh_shandlers, _sh_fc_handler_committed;
+  LONG _sh_nhandlers, _sh_shandlers, _sh_fc_handler_committed,
+    _sh_fc_handler_updated;
 
 public:
   int inc_nreaders () { return (int) InterlockedIncrement (&_nreaders); }
@@ -1338,9 +1339,13 @@ public:
   void set_owner (fifo_reader_id_t fr_id) { _owner = fr_id; }
   fifo_reader_id_t get_prev_owner () const { return _prev_owner; }
   void set_prev_owner (fifo_reader_id_t fr_id) { _prev_owner = fr_id; }
+  fifo_reader_id_t get_pending_owner () const { return _pending_owner; }
+  void set_pending_owner (fifo_reader_id_t fr_id) { _pending_owner = fr_id; }
 
   void owner_lock () { _owner_lock.lock (); }
   void owner_unlock () { _owner_lock.unlock (); }
+  void reading_lock () { _reading_lock.lock (); }
+  void reading_unlock () { _reading_lock.unlock (); }
 
   int get_shared_nhandlers () const { return (int) _sh_nhandlers; }
   void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); }
@@ -1350,6 +1355,9 @@ public:
   { return (size_t) _sh_fc_handler_committed; }
   void set_shared_fc_handler_committed (size_t n)
   { InterlockedExchange (&_sh_fc_handler_committed, (LONG) n); }
+  bool shared_fc_handler_updated () const { return _sh_fc_handler_updated; }
+  void shared_fc_handler_updated (bool val)
+  { InterlockedExchange (&_sh_fc_handler_updated, val); }
 };
 
 class fhandler_fifo: public fhandler_base
@@ -1362,6 +1370,7 @@ class fhandler_fifo: public fhandler_base
   /* Handles to named events needed by all readers of a given FIFO. */
   HANDLE owner_needed_evt;      /* The owner is closing. */
   HANDLE owner_found_evt;       /* A new owner has taken over. */
+  HANDLE update_needed_evt;     /* shared_fc_handler needs updating. */
 
   /* Handles to non-shared events needed for fifo_reader_threads. */
   HANDLE cancel_evt;            /* Signal thread to terminate. */
@@ -1409,6 +1418,11 @@ class fhandler_fifo: public fhandler_base
   fifo_reader_id_t get_prev_owner () const { return shmem->get_prev_owner (); }
   void set_prev_owner (fifo_reader_id_t fr_id)
   { shmem->set_prev_owner (fr_id); }
+  fifo_reader_id_t get_pending_owner () const
+  { return shmem->get_pending_owner (); }
+  void set_pending_owner (fifo_reader_id_t fr_id)
+  { shmem->set_pending_owner (fr_id); }
+
   void owner_needed ()
   {
     ResetEvent (owner_found_evt);
@@ -1430,6 +1444,10 @@ class fhandler_fifo: public fhandler_base
   { shmem->set_shared_fc_handler_committed (n); }
   int update_my_handlers (bool from_exec = false);
   int update_shared_handlers ();
+  bool shared_fc_handler_updated () const
+  { return shmem->shared_fc_handler_updated (); }
+  void shared_fc_handler_updated (bool val)
+  { shmem->shared_fc_handler_updated (val); }
 
 public:
   fhandler_fifo ();
@@ -1449,6 +1467,10 @@ public:
   void owner_lock () { shmem->owner_lock (); }
   void owner_unlock () { shmem->owner_unlock (); }
 
+  void take_ownership ();
+  void reading_lock () { shmem->reading_lock (); }
+  void reading_unlock () { shmem->reading_unlock (); }
+
   int open (int, mode_t);
   off_t lseek (off_t offset, int whence);
   int close ();
diff --git a/winsup/cygwin/fhandler_fifo.cc b/winsup/cygwin/fhandler_fifo.cc
index bf33a52d6..81473015e 100644
--- a/winsup/cygwin/fhandler_fifo.cc
+++ b/winsup/cygwin/fhandler_fifo.cc
@@ -74,7 +74,7 @@ static NO_COPY fifo_reader_id_t null_fr_id = { .winpid = 0, .fh = NULL };
 fhandler_fifo::fhandler_fifo ():
   fhandler_base (),
   read_ready (NULL), write_ready (NULL), writer_opening (NULL),
-  owner_needed_evt (NULL), owner_found_evt (NULL),
+  owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL),
   cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false),
   fc_handler (NULL), shandlers (0), nhandlers (0),
   reader (false), writer (false), duplexer (false),
@@ -436,6 +436,8 @@ fhandler_fifo::update_shared_handlers ()
     }
   set_shared_nhandlers (nhandlers);
   memcpy (shared_fc_handler, fc_handler, nhandlers * sizeof (fc_handler[0]));
+  shared_fc_handler_updated (true);
+  set_prev_owner (me);
   return 0;
 }
 
@@ -456,20 +458,44 @@ fhandler_fifo::fifo_reader_thread_func ()
 
   while (1)
     {
-      fifo_reader_id_t cur_owner;
+      fifo_reader_id_t cur_owner, pending_owner;
+      bool idle = false, take_ownership = false;
 
       owner_lock ();
       cur_owner = get_owner ();
-      if (!cur_owner)
+      pending_owner = get_pending_owner ();
+
+      if (pending_owner)
 	{
-	  set_owner (me);
-	  if (update_my_handlers () < 0)
-	    api_fatal ("Can't update my handlers, %E");
-	  owner_found ();
-	  owner_unlock ();
-	  continue;
+	  if (pending_owner != me)
+	    idle = true;
+	  else
+	    take_ownership = true;
 	}
+      else if (!cur_owner)
+	take_ownership = true;
       else if (cur_owner != me)
+	idle = true;
+      if (take_ownership)
+	{
+	  if (!shared_fc_handler_updated ())
+	    {
+	      owner_unlock ();
+	      yield ();
+	      continue;
+	    }
+	  else
+	    {
+	      set_owner (me);
+	      set_pending_owner (null_fr_id);
+	      if (update_my_handlers () < 0)
+		api_fatal ("Can't update my handlers, %E");
+	      owner_found ();
+	      owner_unlock ();
+	      continue;
+	    }
+	}
+      else if (idle)
 	{
 	  owner_unlock ();
 	  HANDLE w[2] = { owner_needed_evt, cancel_evt };
@@ -494,6 +520,7 @@ fhandler_fifo::fifo_reader_thread_func ()
 	  /* Listen for a writer to connect to the new client handler. */
 	  fifo_client_handler& fc = fc_handler[nhandlers - 1];
 	  fifo_client_unlock ();
+	  shared_fc_handler_updated (false);
 	  owner_unlock ();
 	  NTSTATUS status;
 	  IO_STATUS_BLOCK io;
@@ -504,8 +531,8 @@ fhandler_fifo::fifo_reader_thread_func ()
 				    FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
 	  if (status == STATUS_PENDING)
 	    {
-	      HANDLE w[2] = { conn_evt, cancel_evt };
-	      switch (WaitForMultipleObjects (2, w, false, INFINITE))
+	      HANDLE w[3] = { conn_evt, update_needed_evt, cancel_evt };
+	      switch (WaitForMultipleObjects (3, w, false, INFINITE))
 		{
 		case WAIT_OBJECT_0:
 		  status = io.Status;
@@ -513,6 +540,10 @@ fhandler_fifo::fifo_reader_thread_func ()
 				status);
 		  break;
 		case WAIT_OBJECT_0 + 1:
+		  status = STATUS_WAIT_1;
+		  update = true;
+		  break;
+		case WAIT_OBJECT_0 + 2:
 		  status = STATUS_THREAD_IS_TERMINATING;
 		  cancel = true;
 		  update = true;
@@ -538,6 +569,7 @@ fhandler_fifo::fifo_reader_thread_func ()
 	      record_connection (fc, fc_closing);
 	      break;
 	    case STATUS_THREAD_IS_TERMINATING:
+	    case STATUS_WAIT_1:
 	      /* Try to connect a bogus client.  Otherwise fc is still
 		 listening, and the next connection might not get recorded. */
 	      status1 = open_pipe (ph);
@@ -807,6 +839,8 @@ fhandler_fifo::open (int flags, mode_t)
       if (create_shared_fc_handler () < 0)
 	goto err_close_shmem;
       inc_nreaders ();
+      /* Reinitialize _sh_fc_handler_updated, which starts as 0. */
+      shared_fc_handler_updated (true);
       npbuf[0] = 'n';
       if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf)))
 	{
@@ -821,9 +855,16 @@ fhandler_fifo::open (int flags, mode_t)
 	  __seterrno ();
 	  goto err_close_owner_needed_evt;
 	}
+      npbuf[0] = 'u';
+      if (!(update_needed_evt = CreateEvent (sa_buf, false, false, npbuf)))
+	{
+	  debug_printf ("CreateEvent for %s failed, %E", npbuf);
+	  __seterrno ();
+	  goto err_close_owner_found_evt;
+	}
       /* Make cancel and sync inheritable for exec. */
       if (!(cancel_evt = create_event (true)))
-	goto err_close_owner_found_evt;
+	goto err_close_update_needed_evt;
       if (!(thr_sync_evt = create_event (true)))
 	goto err_close_cancel_evt;
       me.winpid = GetCurrentProcessId ();
@@ -943,6 +984,8 @@ err_close_reader:
   return 0;
 err_close_cancel_evt:
   NtClose (cancel_evt);
+err_close_update_needed_evt:
+  NtClose (update_needed_evt);
 err_close_owner_found_evt:
   NtClose (owner_found_evt);
 err_close_owner_needed_evt:
@@ -1136,6 +1179,24 @@ fhandler_fifo::hit_eof ()
   return ret;
 }
 
+/* Called from raw_read and select.cc:peek_fifo. */
+void
+fhandler_fifo::take_ownership ()
+{
+  owner_lock ();
+  if (get_owner () == me)
+    {
+      owner_unlock ();
+      return;
+    }
+  set_pending_owner (me);
+  owner_needed ();
+  SetEvent (update_needed_evt);
+  owner_unlock ();
+  /* The reader threads should now do the transfer.  */
+  WaitForSingleObject (owner_found_evt, INFINITE);
+}
+
 void __reg3
 fhandler_fifo::raw_read (void *in_ptr, size_t& len)
 {
@@ -1144,6 +1205,9 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
 
   while (1)
     {
+      /* No one else can take ownership while we hold the reading_lock. */
+      reading_lock ();
+      take_ownership ();
       /* Poll the connected clients for input. */
       int nconnected = 0;
       fifo_client_lock ();
@@ -1167,6 +1231,7 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
 		  {
 		    len = nbytes;
 		    fifo_client_unlock ();
+		    reading_unlock ();
 		    return;
 		  }
 		break;
@@ -1187,9 +1252,11 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
       fifo_client_unlock ();
       if (maybe_eof () && hit_eof ())
 	{
+	  reading_unlock ();
 	  len = 0;
 	  return;
 	}
+      reading_unlock ();
       if (is_nonblocking ())
 	{
 	  set_errno (EAGAIN);
@@ -1327,6 +1394,8 @@ fhandler_fifo::close ()
 	NtClose (owner_needed_evt);
       if (owner_found_evt)
 	NtClose (owner_found_evt);
+      if (update_needed_evt)
+	NtClose (update_needed_evt);
       if (cancel_evt)
 	NtClose (cancel_evt);
       if (thr_sync_evt)
@@ -1443,8 +1512,15 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
 	  __seterrno ();
 	  goto err_close_owner_needed_evt;
 	}
+      if (!DuplicateHandle (GetCurrentProcess (), update_needed_evt,
+			    GetCurrentProcess (), &fhf->update_needed_evt,
+			    0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
+	{
+	  __seterrno ();
+	  goto err_close_owner_found_evt;
+	}
       if (!(fhf->cancel_evt = create_event (true)))
-	goto err_close_owner_found_evt;
+	goto err_close_update_needed_evt;
       if (!(fhf->thr_sync_evt = create_event (true)))
 	goto err_close_cancel_evt;
       inc_nreaders ();
@@ -1454,6 +1530,8 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
   return 0;
 err_close_cancel_evt:
   NtClose (fhf->cancel_evt);
+err_close_update_needed_evt:
+  NtClose (fhf->update_needed_evt);
 err_close_owner_found_evt:
   NtClose (fhf->owner_found_evt);
 err_close_owner_needed_evt:
@@ -1496,6 +1574,7 @@ fhandler_fifo::fixup_after_fork (HANDLE parent)
 	api_fatal ("Can't reopen shared fc_handler memory during fork, %E");
       fork_fixup (parent, owner_needed_evt, "owner_needed_evt");
       fork_fixup (parent, owner_found_evt, "owner_found_evt");
+      fork_fixup (parent, update_needed_evt, "update_needed_evt");
       if (close_on_exec ())
 	/* Prevent a later attempt to close the non-inherited
 	   pipe-instance handles copied from the parent. */
@@ -1578,6 +1657,7 @@ fhandler_fifo::set_close_on_exec (bool val)
     {
       set_no_inheritance (owner_needed_evt, val);
       set_no_inheritance (owner_found_evt, val);
+      set_no_inheritance (update_needed_evt, val);
       set_no_inheritance (cancel_evt, val);
       set_no_inheritance (thr_sync_evt, val);
       fifo_client_lock ();
diff --git a/winsup/cygwin/select.cc b/winsup/cygwin/select.cc
index 9323c423f..2c299acf7 100644
--- a/winsup/cygwin/select.cc
+++ b/winsup/cygwin/select.cc
@@ -866,6 +866,8 @@ peek_fifo (select_record *s, bool from_select)
 	  goto out;
 	}
 
+      fh->reading_lock ();
+      fh->take_ownership ();
       fh->fifo_client_lock ();
       int nconnected = 0;
       for (int i = 0; i < fh->get_nhandlers (); i++)
@@ -888,6 +890,7 @@ peek_fifo (select_record *s, bool from_select)
 		fh->get_fc_handler (i).get_state () = fc_input_avail;
 		select_printf ("read: %s, ready for read", fh->get_name ());
 		fh->fifo_client_unlock ();
+		fh->reading_unlock ();
 		gotone += s->read_ready = true;
 		goto out;
 	      default:
@@ -905,6 +908,7 @@ peek_fifo (select_record *s, bool from_select)
 	  if (s->except_selected)
 	    gotone += s->except_ready = true;
 	}
+      fh->reading_unlock ();
     }
 out:
   if (s->write_selected)
-- 
2.21.0



More information about the Cygwin-patches mailing list