cygrunsrv + sshd + rsync = 20 times too slow -- throttled?
Corinna Vinschen
corinna-cygwin@cygwin.com
Thu Sep 2 19:35:21 GMT 2021
On Sep 2 21:00, Corinna Vinschen wrote:
> On Sep 2 09:01, Ken Brown wrote:
> > On 9/2/2021 4:17 AM, Corinna Vinschen wrote:
> > > What if the readers never request more than, say, 50 or even 25% of the
> > > available buffer space? Our buffer is 64K and there's no guarantee that
> > > any read > PIPE_BUF (== 4K) is atomic anyway. This can work without
> > > having to check the other side of the pipe. Something like this,
> > > ignoring border cases:
> > >
> > > pipe::create()
> > > {
> > > [...]
> > > mutex = CreateMutex();
> > > }
> > >
> > > pipe::raw_read(char *buf, size_t num_requested)
> > > {
> > > if (blocking)
> > > {
> > > WFSO(mutex);
> > > NtQueryInformationFile(FilePipeLocalInformation);
> > > if (!fpli.ReadDataAvailable
> > > && num_requested > fpli.InboundQuota / 4)
> > > num_requested = fpli.InboundQuota / 4;
> > > NtReadFile(pipe, buf, num_requested);
> > > ReleaseMutex(mutex);
> > > }
> > > }
> > >
> > > It's not entirely foolproof, but it should fix 99% of the cases.
> >
> > I like it!
> >
> > Do you think there's anything we can or should do to avoid a deadlock in the
> > rare cases where this fails? The only thing I can think of immediately is
> > to always impose a timeout if select is called with infinite timeout on the
> > write side of a pipe, after which we report that the pipe is write ready.
> > After all, we've lived since 2008 with a bug that caused select to *always*
> > report write ready.
>
> Indeed. Hmm. What timeout are you thinking of? Seconds? Minutes?
>
> > Alternatively, we could just wait and see if there's an actual use case in
> > which someone encounters a deadlock.
>
> Or that. Fixing up select isn't too hard in that case, I guess.
It's getting too late again. I drop off for tonight, but I attached
my POC code I have so far. It also adds the snippets from my previous
patch which fixes stuff Takashi found during testing. It also fixes
something which looks like a bug in raw_write:
- ptr = ((char *) ptr) + chunk;
+ ptr = ((char *) ptr) + nbytes_now;
Incrementing ptr by chunk bytes while only nbytes_now have been written
looks incorrect.
As for the reader, it makes the # of bytes to read dependent on the
number of reader handles. I don't know if that's such a bright idea,
but this can be changed easily.
Anyway, this runs all my testcases successfully but they are anything
but thorough.
Patch relativ to topic/pipe attached. Would you both mind to take a
scrutinizing look?
Thanks,
Corinna
-------------- next part --------------
diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h
index 132e6002133b..032ab5fb07ae 100644
--- a/winsup/cygwin/fhandler.h
+++ b/winsup/cygwin/fhandler.h
@@ -1171,6 +1171,7 @@ class fhandler_socket_unix : public fhandler_socket
class fhandler_pipe: public fhandler_base
{
private:
+ HANDLE read_mtx;
pid_t popen_pid;
size_t max_atomic_write;
void set_pipe_non_blocking (bool nonblocking);
@@ -1178,6 +1179,7 @@ public:
fhandler_pipe ();
bool ispipe() const { return true; }
+ void set_read_mutex (HANDLE mtx) { read_mtx = mtx; }
void set_popen_pid (pid_t pid) {popen_pid = pid;}
pid_t get_popen_pid () const {return popen_pid;}
@@ -1187,7 +1189,9 @@ public:
select_record *select_except (select_stuff *);
char *get_proc_fd_name (char *buf);
int open (int flags, mode_t mode = 0);
+ void fixup_after_fork (HANDLE);
int dup (fhandler_base *child, int);
+ int close ();
void __reg3 raw_read (void *ptr, size_t& len);
ssize_t __reg3 raw_write (const void *ptr, size_t len);
int ioctl (unsigned int cmd, void *);
diff --git a/winsup/cygwin/fhandler_pipe.cc b/winsup/cygwin/fhandler_pipe.cc
index 2dec0a84817c..7a5cefb3d07c 100644
--- a/winsup/cygwin/fhandler_pipe.cc
+++ b/winsup/cygwin/fhandler_pipe.cc
@@ -240,8 +240,37 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
keep_looping = false;
if (evt)
ResetEvent (evt);
+ if (!is_nonblocking ())
+ {
+ FILE_PIPE_LOCAL_INFORMATION fpli;
+ ULONG reader_count;
+ ULONG max_len = 64;
+
+ WaitForSingleObject (read_mtx, INFINITE);
+
+ /* Make sure never to request more bytes than half the pipe
+ buffer size. Every pending read lowers WriteQuotaAvailable
+ on the write side and thus affects select's ability to return
+ more or less reliable info whether a write succeeds or not.
+
+ Let the size of the request depend on the number of readers
+ at the time. */
+ status = NtQueryInformationFile (get_handle (), &io,
+ &fpli, sizeof (fpli),
+ FilePipeLocalInformation);
+ if (NT_SUCCESS (status) && fpli.ReadDataAvailable == 0)
+ {
+ reader_count = get_obj_handle_count (get_handle ());
+ if (reader_count < 10)
+ max_len = fpli.InboundQuota / (2 * reader_count);
+ if (len > max_len)
+ len = max_len;
+ }
+ }
status = NtReadFile (get_handle (), evt, NULL, NULL, &io, ptr,
len, NULL, NULL);
+ if (!is_nonblocking ())
+ ReleaseMutex (read_mtx);
if (evt && status == STATUS_PENDING)
{
waitret = cygwait (evt);
@@ -313,7 +342,6 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
ssize_t __reg3
fhandler_pipe::raw_write (const void *ptr, size_t len)
{
- ssize_t ret = -1;
size_t nbytes = 0;
ULONG chunk;
NTSTATUS status = STATUS_SUCCESS;
@@ -352,8 +380,36 @@ fhandler_pipe::raw_write (const void *ptr, size_t len)
else
len1 = (ULONG) left;
nbytes_now = 0;
- status = NtWriteFile (get_handle (), evt, NULL, NULL, &io,
- (PVOID) ptr, len1, NULL, NULL);
+ /* NtWriteFile returns success with # of bytes written == 0 if writing
+ on a non-blocking pipe fails because the pipe buffer doesn't have
+ sufficient space.
+
+ POSIX requires
+ - A write request for {PIPE_BUF} or fewer bytes shall have the
+ following effect: if there is sufficient space available in the
+ pipe, write() shall transfer all the data and return the number
+ of bytes requested. Otherwise, write() shall transfer no data and
+ return -1 with errno set to [EAGAIN].
+
+ - A write request for more than {PIPE_BUF} bytes shall cause one
+ of the following:
+
+ - When at least one byte can be written, transfer what it can and
+ return the number of bytes written. When all data previously
+ written to the pipe is read, it shall transfer at least {PIPE_BUF}
+ bytes.
+
+ - When no data can be written, transfer no data, and return -1 with
+ errno set to [EAGAIN]. */
+ while (len1 > 0)
+ {
+ status = NtWriteFile (get_handle (), evt, NULL, NULL, &io,
+ (PVOID) ptr, len1, NULL, NULL);
+ if (evt || !NT_SUCCESS (status) || io.Information > 0
+ || len <= PIPE_BUF)
+ break;
+ len1 >>= 1;
+ }
if (evt && status == STATUS_PENDING)
{
waitret = cygwait (evt);
@@ -375,13 +431,11 @@ fhandler_pipe::raw_write (const void *ptr, size_t len)
else if (NT_SUCCESS (status))
{
nbytes_now = io.Information;
- /* NtWriteFile returns success with # of bytes written == 0
- if writing on a non-blocking pipe fails because the pipe
- buffer doesn't have sufficient space. */
- if (nbytes_now == 0)
- set_errno (EAGAIN);
- ptr = ((char *) ptr) + chunk;
+ ptr = ((char *) ptr) + nbytes_now;
nbytes += nbytes_now;
+ /* 0 bytes returned? EAGAIN. See above. */
+ if (nbytes == 0)
+ set_errno (EAGAIN);
}
else if (STATUS_PIPE_IS_CLOSED (status))
{
@@ -392,17 +446,23 @@ fhandler_pipe::raw_write (const void *ptr, size_t len)
__seterrno_from_nt_status (status);
if (nbytes_now == 0)
- len = 0; /* Terminate loop. */
- if (nbytes > 0)
- ret = nbytes;
+ break;
}
if (evt)
CloseHandle (evt);
- if (status == STATUS_THREAD_SIGNALED && ret < 0)
+ if (status == STATUS_THREAD_SIGNALED && nbytes == 0)
set_errno (EINTR);
else if (status == STATUS_THREAD_CANCELED)
pthread::static_cancel_self ();
- return ret;
+ return nbytes ?: -1;
+}
+
+void
+fhandler_pipe::fixup_after_fork (HANDLE parent)
+{
+ if (read_mtx)
+ fork_fixup (parent, read_mtx, "read_mtx");
+ fhandler_base::fixup_after_fork (parent);
}
int
@@ -411,16 +471,31 @@ fhandler_pipe::dup (fhandler_base *child, int flags)
fhandler_pipe *ftp = (fhandler_pipe *) child;
ftp->set_popen_pid (0);
- int res;
- if (get_handle () && fhandler_base::dup (child, flags))
+ int res = 0;
+ if (fhandler_base::dup (child, flags))
res = -1;
- else
- res = 0;
+ else if (read_mtx &&
+ !DuplicateHandle (GetCurrentProcess (), read_mtx,
+ GetCurrentProcess (), &ftp->read_mtx,
+ 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
+ {
+ __seterrno ();
+ ftp->close ();
+ res = -1;
+ }
debug_printf ("res %d", res);
return res;
}
+int
+fhandler_pipe::close ()
+{
+ if (read_mtx)
+ NtClose (read_mtx);
+ return fhandler_base::close ();
+}
+
#define PIPE_INTRO "\\\\.\\pipe\\cygwin-"
/* Create a pipe, and return handles to the read and write ends,
@@ -608,6 +683,7 @@ fhandler_pipe::create (fhandler_pipe *fhs[2], unsigned psize, int mode)
else if ((fhs[1] = (fhandler_pipe *) build_fh_dev (*pipew_dev)) == NULL)
{
delete fhs[0];
+ CloseHandle (r);
CloseHandle (w);
}
else
@@ -617,7 +693,25 @@ fhandler_pipe::create (fhandler_pipe *fhs[2], unsigned psize, int mode)
unique_id);
fhs[1]->init (w, FILE_CREATE_PIPE_INSTANCE | GENERIC_WRITE, mode,
unique_id);
- res = 0;
+ /* For the read side of the pipe, add a mutex. See raw_read for the
+ usage. */
+ SECURITY_ATTRIBUTES sa = { .nLength = sizeof (SECURITY_ATTRIBUTES),
+ .lpSecurityDescriptor = NULL,
+ .bInheritHandle = !(mode & O_CLOEXEC)
+ };
+ HANDLE mtx = CreateMutexW (&sa, FALSE, NULL);
+ if (!mtx)
+ {
+ delete fhs[0];
+ CloseHandle (r);
+ delete fhs[1];
+ CloseHandle (w);
+ }
+ else
+ {
+ fhs[0]->set_read_mutex (mtx);
+ res = 0;
+ }
}
debug_printf ("%R = pipe([%p, %p], %d, %y)", res, fhs[0], fhs[1], psize, mode);
@@ -658,7 +752,7 @@ nt_create (LPSECURITY_ATTRIBUTES sa_ptr, PHANDLE r, PHANDLE w,
&cygheap->installation_key,
GetCurrentProcessId ());
- access = GENERIC_READ | FILE_WRITE_ATTRIBUTES;
+ access = GENERIC_READ | FILE_WRITE_ATTRIBUTES | SYNCHRONIZE;
ULONG pipe_type = pipe_byte ? FILE_PIPE_BYTE_STREAM_TYPE
: FILE_PIPE_MESSAGE_TYPE;
@@ -737,7 +831,7 @@ nt_create (LPSECURITY_ATTRIBUTES sa_ptr, PHANDLE r, PHANDLE w,
{
debug_printf ("NtOpenFile: name %S", &pipename);
- access = GENERIC_WRITE | FILE_READ_ATTRIBUTES;
+ access = GENERIC_WRITE | FILE_READ_ATTRIBUTES | SYNCHRONIZE;
status = NtOpenFile (w, access, &attr, &io, 0, 0);
if (!NT_SUCCESS (status))
{
diff --git a/winsup/cygwin/flock.cc b/winsup/cygwin/flock.cc
index bd7a16d91ecd..2f12fc07e37b 100644
--- a/winsup/cygwin/flock.cc
+++ b/winsup/cygwin/flock.cc
@@ -216,22 +216,6 @@ allow_others_to_sync ()
done = true;
}
-/* Get the handle count of an object. */
-static ULONG
-get_obj_handle_count (HANDLE h)
-{
- OBJECT_BASIC_INFORMATION obi;
- NTSTATUS status;
- ULONG hdl_cnt = 0;
-
- status = NtQueryObject (h, ObjectBasicInformation, &obi, sizeof obi, NULL);
- if (!NT_SUCCESS (status))
- debug_printf ("NtQueryObject: %y", status);
- else
- hdl_cnt = obi.HandleCount;
- return hdl_cnt;
-}
-
/* Helper struct to construct a local OBJECT_ATTRIBUTES on the stack. */
struct lockfattr_t
{
diff --git a/winsup/cygwin/miscfuncs.cc b/winsup/cygwin/miscfuncs.cc
index f4c3a1c48e8e..dc36030ca572 100644
--- a/winsup/cygwin/miscfuncs.cc
+++ b/winsup/cygwin/miscfuncs.cc
@@ -18,6 +18,22 @@ details. */
#include "tls_pbuf.h"
#include "mmap_alloc.h"
+/* Get handle count of an object. */
+ULONG
+get_obj_handle_count (HANDLE h)
+{
+ OBJECT_BASIC_INFORMATION obi;
+ NTSTATUS status;
+ ULONG hdl_cnt = 0;
+
+ status = NtQueryObject (h, ObjectBasicInformation, &obi, sizeof obi, NULL);
+ if (!NT_SUCCESS (status))
+ debug_printf ("NtQueryObject: %y", status);
+ else
+ hdl_cnt = obi.HandleCount;
+ return hdl_cnt;
+}
+
int __reg2
check_invalid_virtual_addr (const void *s, unsigned sz)
{
diff --git a/winsup/cygwin/miscfuncs.h b/winsup/cygwin/miscfuncs.h
index 1ff7ee0d3fde..47cef6f20c0a 100644
--- a/winsup/cygwin/miscfuncs.h
+++ b/winsup/cygwin/miscfuncs.h
@@ -98,6 +98,9 @@ transform_chars (PUNICODE_STRING upath, USHORT start_idx)
PWCHAR transform_chars_af_unix (PWCHAR, const char *, __socklen_t);
+/* Get handle count of an object. */
+ULONG get_obj_handle_count (HANDLE h);
+
/* Memory checking */
int __reg2 check_invalid_virtual_addr (const void *s, unsigned sz);
diff --git a/winsup/cygwin/select.cc b/winsup/cygwin/select.cc
index 83e1c00e0ac7..ac2fd227eb17 100644
--- a/winsup/cygwin/select.cc
+++ b/winsup/cygwin/select.cc
@@ -612,7 +612,6 @@ pipe_data_available (int fd, fhandler_base *fh, HANDLE h, bool writing)
that. This means that a pipe could still block since you could
be trying to write more to the pipe than is available in the
buffer but that is the hazard of select(). */
- fpli.WriteQuotaAvailable = fpli.OutboundQuota - fpli.ReadDataAvailable;
if (fpli.WriteQuotaAvailable > 0)
{
paranoid_printf ("fd %d, %s, write: size %u, avail %u", fd,
More information about the Cygwin-developers
mailing list