cygrunsrv + sshd + rsync = 20 times too slow -- throttled?

Corinna Vinschen corinna-cygwin@cygwin.com
Thu Sep 2 19:35:21 GMT 2021


On Sep  2 21:00, Corinna Vinschen wrote:
> On Sep  2 09:01, Ken Brown wrote:
> > On 9/2/2021 4:17 AM, Corinna Vinschen wrote:
> > > What if the readers never request more than, say, 50 or even 25% of the
> > > available buffer space?  Our buffer is 64K and there's no guarantee that
> > > any read > PIPE_BUF (== 4K) is atomic anyway.  This can work without
> > > having to check the other side of the pipe.  Something like this,
> > > ignoring border cases:
> > > 
> > > pipe::create()
> > > {
> > >     [...]
> > >     mutex = CreateMutex();
> > > }
> > > 
> > > pipe::raw_read(char *buf, size_t num_requested)
> > > {
> > >    if (blocking)
> > >      {
> > >        WFSO(mutex);
> > >        NtQueryInformationFile(FilePipeLocalInformation);
> > >        if (!fpli.ReadDataAvailable
> > > 	  && num_requested > fpli.InboundQuota / 4)
> > > 	num_requested = fpli.InboundQuota / 4;
> > >        NtReadFile(pipe, buf, num_requested);
> > >        ReleaseMutex(mutex);
> > >      }
> > > }
> > > 
> > > It's not entirely foolproof, but it should fix 99% of the cases.
> > 
> > I like it!
> > 
> > Do you think there's anything we can or should do to avoid a deadlock in the
> > rare cases where this fails?  The only thing I can think of immediately is
> > to always impose a timeout if select is called with infinite timeout on the
> > write side of a pipe, after which we report that the pipe is write ready.
> > After all, we've lived since 2008 with a bug that caused select to *always*
> > report write ready.
> 
> Indeed.  Hmm.  What timeout are you thinking of?  Seconds?  Minutes?
> 
> > Alternatively, we could just wait and see if there's an actual use case in
> > which someone encounters a deadlock.
> 
> Or that.  Fixing up select isn't too hard in that case, I guess.

It's getting too late again.  I drop off for tonight, but I attached
my POC code I have so far.  It also adds the snippets from my previous
patch which fixes stuff Takashi found during testing.  It also fixes
something which looks like a bug in raw_write:

-	  ptr = ((char *) ptr) + chunk;
+	  ptr = ((char *) ptr) + nbytes_now;

Incrementing ptr by chunk bytes while only nbytes_now have been written
looks incorrect.

As for the reader, it makes the # of bytes to read dependent on the
number of reader handles.  I don't know if that's such a bright idea,
but this can be changed easily.

Anyway, this runs all my testcases successfully but they are anything
but thorough.

Patch relativ to topic/pipe attached.  Would you both mind to take a
scrutinizing look?


Thanks,
Corinna
-------------- next part --------------
diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h
index 132e6002133b..032ab5fb07ae 100644
--- a/winsup/cygwin/fhandler.h
+++ b/winsup/cygwin/fhandler.h
@@ -1171,6 +1171,7 @@ class fhandler_socket_unix : public fhandler_socket
 class fhandler_pipe: public fhandler_base
 {
 private:
+  HANDLE read_mtx;
   pid_t popen_pid;
   size_t max_atomic_write;
   void set_pipe_non_blocking (bool nonblocking);
@@ -1178,6 +1179,7 @@ public:
   fhandler_pipe ();
 
   bool ispipe() const { return true; }
+  void set_read_mutex (HANDLE mtx) { read_mtx = mtx; }
 
   void set_popen_pid (pid_t pid) {popen_pid = pid;}
   pid_t get_popen_pid () const {return popen_pid;}
@@ -1187,7 +1189,9 @@ public:
   select_record *select_except (select_stuff *);
   char *get_proc_fd_name (char *buf);
   int open (int flags, mode_t mode = 0);
+  void fixup_after_fork (HANDLE);
   int dup (fhandler_base *child, int);
+  int close ();
   void __reg3 raw_read (void *ptr, size_t& len);
   ssize_t __reg3 raw_write (const void *ptr, size_t len);
   int ioctl (unsigned int cmd, void *);
diff --git a/winsup/cygwin/fhandler_pipe.cc b/winsup/cygwin/fhandler_pipe.cc
index 2dec0a84817c..7a5cefb3d07c 100644
--- a/winsup/cygwin/fhandler_pipe.cc
+++ b/winsup/cygwin/fhandler_pipe.cc
@@ -240,8 +240,37 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
       keep_looping = false;
       if (evt)
 	ResetEvent (evt);
+      if (!is_nonblocking ())
+	{
+	  FILE_PIPE_LOCAL_INFORMATION fpli;
+	  ULONG reader_count;
+	  ULONG max_len = 64;
+
+	  WaitForSingleObject (read_mtx, INFINITE);
+
+	  /* Make sure never to request more bytes than half the pipe
+	     buffer size.  Every pending read lowers WriteQuotaAvailable
+	     on the write side and thus affects select's ability to return
+	     more or less reliable info whether a write succeeds or not.
+
+	     Let the size of the request depend on the number of readers
+	     at the time. */
+	  status = NtQueryInformationFile (get_handle (), &io,
+					   &fpli, sizeof (fpli),
+					   FilePipeLocalInformation);
+	  if (NT_SUCCESS (status) && fpli.ReadDataAvailable == 0)
+	    {
+	      reader_count = get_obj_handle_count (get_handle ());
+	      if (reader_count < 10)
+		max_len = fpli.InboundQuota / (2 * reader_count);
+	      if (len > max_len)
+		len = max_len;
+	    }
+	}
       status = NtReadFile (get_handle (), evt, NULL, NULL, &io, ptr,
 			   len, NULL, NULL);
+      if (!is_nonblocking ())
+	ReleaseMutex (read_mtx);
       if (evt && status == STATUS_PENDING)
 	{
 	  waitret = cygwait (evt);
@@ -313,7 +342,6 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
 ssize_t __reg3
 fhandler_pipe::raw_write (const void *ptr, size_t len)
 {
-  ssize_t ret = -1;
   size_t nbytes = 0;
   ULONG chunk;
   NTSTATUS status = STATUS_SUCCESS;
@@ -352,8 +380,36 @@ fhandler_pipe::raw_write (const void *ptr, size_t len)
       else
 	len1 = (ULONG) left;
       nbytes_now = 0;
-      status = NtWriteFile (get_handle (), evt, NULL, NULL, &io,
-			    (PVOID) ptr, len1, NULL, NULL);
+      /* NtWriteFile returns success with # of bytes written == 0 if writing
+         on a non-blocking pipe fails because the pipe buffer doesn't have
+	 sufficient space.
+
+	 POSIX requires
+	 - A write request for {PIPE_BUF} or fewer bytes shall have the
+	   following effect: if there is sufficient space available in the
+	   pipe, write() shall transfer all the data and return the number
+	   of bytes requested. Otherwise, write() shall transfer no data and
+	   return -1 with errno set to [EAGAIN].
+
+	 - A write request for more than {PIPE_BUF} bytes shall cause one
+	   of the following:
+
+	  - When at least one byte can be written, transfer what it can and
+	    return the number of bytes written. When all data previously
+	    written to the pipe is read, it shall transfer at least {PIPE_BUF}
+	    bytes.
+
+	  - When no data can be written, transfer no data, and return -1 with
+	    errno set to [EAGAIN]. */
+      while (len1 > 0)
+	{
+	  status = NtWriteFile (get_handle (), evt, NULL, NULL, &io,
+				(PVOID) ptr, len1, NULL, NULL);
+	  if (evt || !NT_SUCCESS (status) || io.Information > 0
+	      || len <= PIPE_BUF)
+	    break;
+	  len1 >>= 1;
+	}
       if (evt && status == STATUS_PENDING)
 	{
 	  waitret = cygwait (evt);
@@ -375,13 +431,11 @@ fhandler_pipe::raw_write (const void *ptr, size_t len)
       else if (NT_SUCCESS (status))
 	{
 	  nbytes_now = io.Information;
-	  /* NtWriteFile returns success with # of bytes written == 0
-	     if writing on a non-blocking pipe fails because the pipe
-	     buffer doesn't have sufficient space. */
-	  if (nbytes_now == 0)
-	    set_errno (EAGAIN);
-	  ptr = ((char *) ptr) + chunk;
+	  ptr = ((char *) ptr) + nbytes_now;
 	  nbytes += nbytes_now;
+	  /* 0 bytes returned?  EAGAIN.  See above. */
+	  if (nbytes == 0)
+	    set_errno (EAGAIN);
 	}
       else if (STATUS_PIPE_IS_CLOSED (status))
 	{
@@ -392,17 +446,23 @@ fhandler_pipe::raw_write (const void *ptr, size_t len)
 	__seterrno_from_nt_status (status);
 
       if (nbytes_now == 0)
-	len = 0;		/* Terminate loop. */
-      if (nbytes > 0)
-	ret = nbytes;
+	break;
     }
   if (evt)
     CloseHandle (evt);
-  if (status == STATUS_THREAD_SIGNALED && ret < 0)
+  if (status == STATUS_THREAD_SIGNALED && nbytes == 0)
     set_errno (EINTR);
   else if (status == STATUS_THREAD_CANCELED)
     pthread::static_cancel_self ();
-  return ret;
+  return nbytes ?: -1;
+}
+
+void
+fhandler_pipe::fixup_after_fork (HANDLE parent)
+{
+  if (read_mtx)
+    fork_fixup (parent, read_mtx, "read_mtx");
+  fhandler_base::fixup_after_fork (parent);
 }
 
 int
@@ -411,16 +471,31 @@ fhandler_pipe::dup (fhandler_base *child, int flags)
   fhandler_pipe *ftp = (fhandler_pipe *) child;
   ftp->set_popen_pid (0);
 
-  int res;
-  if (get_handle () && fhandler_base::dup (child, flags))
+  int res = 0;
+  if (fhandler_base::dup (child, flags))
     res = -1;
-  else
-    res = 0;
+  else if (read_mtx &&
+	   !DuplicateHandle (GetCurrentProcess (), read_mtx,
+			     GetCurrentProcess (), &ftp->read_mtx,
+			     0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
+    {
+      __seterrno ();
+      ftp->close ();
+      res = -1;
+    }
 
   debug_printf ("res %d", res);
   return res;
 }
 
+int
+fhandler_pipe::close ()
+{
+  if (read_mtx)
+    NtClose (read_mtx);
+  return fhandler_base::close ();
+}
+
 #define PIPE_INTRO "\\\\.\\pipe\\cygwin-"
 
 /* Create a pipe, and return handles to the read and write ends,
@@ -608,6 +683,7 @@ fhandler_pipe::create (fhandler_pipe *fhs[2], unsigned psize, int mode)
   else if ((fhs[1] = (fhandler_pipe *) build_fh_dev (*pipew_dev)) == NULL)
     {
       delete fhs[0];
+      CloseHandle (r);
       CloseHandle (w);
     }
   else
@@ -617,7 +693,25 @@ fhandler_pipe::create (fhandler_pipe *fhs[2], unsigned psize, int mode)
 		    unique_id);
       fhs[1]->init (w, FILE_CREATE_PIPE_INSTANCE | GENERIC_WRITE, mode,
 		    unique_id);
-      res = 0;
+      /* For the read side of the pipe, add a mutex.  See raw_read for the
+	 usage. */
+      SECURITY_ATTRIBUTES sa = { .nLength = sizeof (SECURITY_ATTRIBUTES),
+				 .lpSecurityDescriptor = NULL,
+				 .bInheritHandle = !(mode & O_CLOEXEC)
+			       };
+      HANDLE mtx = CreateMutexW (&sa, FALSE, NULL);
+      if (!mtx)
+	{
+	  delete fhs[0];
+	  CloseHandle (r);
+	  delete fhs[1];
+	  CloseHandle (w);
+	}
+      else
+	{
+	  fhs[0]->set_read_mutex (mtx);
+	  res = 0;
+	}
     }
 
   debug_printf ("%R = pipe([%p, %p], %d, %y)", res, fhs[0], fhs[1], psize, mode);
@@ -658,7 +752,7 @@ nt_create (LPSECURITY_ATTRIBUTES sa_ptr, PHANDLE r, PHANDLE w,
 				 &cygheap->installation_key,
 				 GetCurrentProcessId ());
 
-  access = GENERIC_READ | FILE_WRITE_ATTRIBUTES;
+  access = GENERIC_READ | FILE_WRITE_ATTRIBUTES | SYNCHRONIZE;
 
   ULONG pipe_type = pipe_byte ? FILE_PIPE_BYTE_STREAM_TYPE
     : FILE_PIPE_MESSAGE_TYPE;
@@ -737,7 +831,7 @@ nt_create (LPSECURITY_ATTRIBUTES sa_ptr, PHANDLE r, PHANDLE w,
     {
       debug_printf ("NtOpenFile: name %S", &pipename);
 
-      access = GENERIC_WRITE | FILE_READ_ATTRIBUTES;
+      access = GENERIC_WRITE | FILE_READ_ATTRIBUTES | SYNCHRONIZE;
       status = NtOpenFile (w, access, &attr, &io, 0, 0);
       if (!NT_SUCCESS (status))
 	{
diff --git a/winsup/cygwin/flock.cc b/winsup/cygwin/flock.cc
index bd7a16d91ecd..2f12fc07e37b 100644
--- a/winsup/cygwin/flock.cc
+++ b/winsup/cygwin/flock.cc
@@ -216,22 +216,6 @@ allow_others_to_sync ()
   done = true;
 }
 
-/* Get the handle count of an object. */
-static ULONG
-get_obj_handle_count (HANDLE h)
-{
-  OBJECT_BASIC_INFORMATION obi;
-  NTSTATUS status;
-  ULONG hdl_cnt = 0;
-
-  status = NtQueryObject (h, ObjectBasicInformation, &obi, sizeof obi, NULL);
-  if (!NT_SUCCESS (status))
-    debug_printf ("NtQueryObject: %y", status);
-  else
-    hdl_cnt = obi.HandleCount;
-  return hdl_cnt;
-}
-
 /* Helper struct to construct a local OBJECT_ATTRIBUTES on the stack. */
 struct lockfattr_t
 {
diff --git a/winsup/cygwin/miscfuncs.cc b/winsup/cygwin/miscfuncs.cc
index f4c3a1c48e8e..dc36030ca572 100644
--- a/winsup/cygwin/miscfuncs.cc
+++ b/winsup/cygwin/miscfuncs.cc
@@ -18,6 +18,22 @@ details. */
 #include "tls_pbuf.h"
 #include "mmap_alloc.h"
 
+/* Get handle count of an object. */
+ULONG
+get_obj_handle_count (HANDLE h)
+{
+  OBJECT_BASIC_INFORMATION obi;
+  NTSTATUS status;
+  ULONG hdl_cnt = 0;
+
+  status = NtQueryObject (h, ObjectBasicInformation, &obi, sizeof obi, NULL);
+  if (!NT_SUCCESS (status))
+    debug_printf ("NtQueryObject: %y", status);
+  else
+    hdl_cnt = obi.HandleCount;
+  return hdl_cnt;
+}
+
 int __reg2
 check_invalid_virtual_addr (const void *s, unsigned sz)
 {
diff --git a/winsup/cygwin/miscfuncs.h b/winsup/cygwin/miscfuncs.h
index 1ff7ee0d3fde..47cef6f20c0a 100644
--- a/winsup/cygwin/miscfuncs.h
+++ b/winsup/cygwin/miscfuncs.h
@@ -98,6 +98,9 @@ transform_chars (PUNICODE_STRING upath, USHORT start_idx)
 
 PWCHAR transform_chars_af_unix (PWCHAR, const char *, __socklen_t);
 
+/* Get handle count of an object. */
+ULONG get_obj_handle_count (HANDLE h);
+
 /* Memory checking */
 int __reg2 check_invalid_virtual_addr (const void *s, unsigned sz);
 
diff --git a/winsup/cygwin/select.cc b/winsup/cygwin/select.cc
index 83e1c00e0ac7..ac2fd227eb17 100644
--- a/winsup/cygwin/select.cc
+++ b/winsup/cygwin/select.cc
@@ -612,7 +612,6 @@ pipe_data_available (int fd, fhandler_base *fh, HANDLE h, bool writing)
 	   that.  This means that a pipe could still block since you could
 	   be trying to write more to the pipe than is available in the
 	   buffer but that is the hazard of select().  */
-      fpli.WriteQuotaAvailable = fpli.OutboundQuota - fpli.ReadDataAvailable;
       if (fpli.WriteQuotaAvailable > 0)
 	{
 	  paranoid_printf ("fd %d, %s, write: size %u, avail %u", fd,


More information about the Cygwin-developers mailing list