[PATCH v4 1/2] POSIX Asynchronous I/O support: aio files

Mark Geisert mark@maxrnd.com
Fri Jul 20 08:44:00 GMT 2018


This is the core of the AIO implementation: aio.cc and aio.h.  The
latter is used within the Cygwin DLL by aio.cc and the fhandler* modules,
as well as by user programs wanting the AIO functionality.
---
 winsup/cygwin/aio.cc        | 1006 +++++++++++++++++++++++++++++++++++
 winsup/cygwin/include/aio.h |   82 +++
 2 files changed, 1088 insertions(+)
 create mode 100644 winsup/cygwin/aio.cc
 create mode 100644 winsup/cygwin/include/aio.h

diff --git a/winsup/cygwin/aio.cc b/winsup/cygwin/aio.cc
new file mode 100644
index 000000000..0244edf60
--- /dev/null
+++ b/winsup/cygwin/aio.cc
@@ -0,0 +1,1006 @@
+/* aio.cc: Posix asynchronous i/o functions.
+
+This file is part of Cygwin.
+
+This software is a copyrighted work licensed under the terms of the
+Cygwin license.  Please consult the file "CYGWIN_LICENSE" for
+details. */
+
+#include "winsup.h"
+#include "path.h"
+#include "fhandler.h"
+#include "dtable.h"
+#include "cygheap.h"
+#include "sigproc.h"
+#include <aio.h>
+#include <fcntl.h>
+#include <semaphore.h>
+#include <unistd.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* 'aioinitialized' is a thread-safe status of AIO feature initialization:
+ * 0 means uninitialized, >0 means initializing, <0 means initialized
+ */
+static NO_COPY volatile LONG    aioinitialized = 0;
+
+/* This implementation supports two flavors of asynchronous operation:
+ * "inline" and "queued".  Inline AIOs are used when:
+ *     (1) fd refers to a local non-locked disk file opened in binary mode,
+ *     (2) no more than AIO_MAX inline AIOs will be in progress at same time.
+ * In all other cases queued AIOs will be used.
+ *
+ * An inline AIO is performed by the calling app's thread as a pread|pwrite on
+ * a shadow fd that permits Windows asynchronous i/o, with event notification
+ * on completion.  Event arrival causes AIO context for the fd to be updated.
+ *
+ * A queued AIO is performed in a similar manner, but by an AIO worker thread
+ * rather than the calling app's thread.  The queued flavor can also operate 
+ * on sockets, pipes, non-binary files, mandatory-locked files, and files
+ * that don't support pread|pwrite.  Generally all these cases are handled as
+ * synchronous read|write operations, but still don't delay the app because
+ * they're taken care of by AIO worker threads.
+ */
+
+/* These variables support inline AIO operations */
+static NO_COPY HANDLE            evt_handles[AIO_MAX];
+static NO_COPY struct aiocb     *evt_aiocbs[AIO_MAX];
+static NO_COPY CRITICAL_SECTION  evt_locks[AIO_MAX]; /* per-slot locks */
+static NO_COPY CRITICAL_SECTION  slotcrit; /* lock for slot variables in toto */
+
+/* These variables support queued AIO operations */
+static NO_COPY sem_t             worksem;   /* tells whether AIOs are queued */
+static NO_COPY CRITICAL_SECTION  workcrit;        /* lock for AIO work queue */
+TAILQ_HEAD(queue, aiocb) worklist = TAILQ_HEAD_INITIALIZER(worklist);
+
+static int
+aiochkslot (struct aiocb *aio)
+{
+  EnterCriticalSection (&slotcrit);
+
+  /* Sanity check.. make sure this AIO is not already busy */
+  for (int slot = 0; slot < AIO_MAX; ++slot)
+    if (evt_aiocbs[slot] == aio)
+      {
+        debug_printf ("aio %p is already busy in slot %d", aio, slot);
+        LeaveCriticalSection (&slotcrit);
+        return slot;
+      }
+
+  LeaveCriticalSection (&slotcrit);
+  return -1;
+}
+
+static int
+aiogetslot (struct aiocb *aio)
+{
+  EnterCriticalSection (&slotcrit);
+
+  /* Find free slot for this inline AIO; if none available AIO will be queued */
+  for (int slot = 0; slot < AIO_MAX; ++slot)
+    if (evt_aiocbs[slot] == NULL)
+      {
+        /* If aio is NULL this is just an availability check.. no change made */
+        if (aio)
+          evt_aiocbs[slot] = aio;
+        LeaveCriticalSection (&slotcrit);
+        return slot;
+      }
+
+  LeaveCriticalSection (&slotcrit);
+  return -1;
+}
+
+static int
+aiorelslot (struct aiocb *aio)
+{
+  EnterCriticalSection (&slotcrit);
+
+  /* Find slot associated with this inline AIO and free it */
+  for (int slot = 0; slot < AIO_MAX; ++slot)
+    if (evt_aiocbs[slot] == aio)
+      {
+        evt_aiocbs[slot] = NULL;
+        LeaveCriticalSection (&slotcrit);
+        return slot;
+      }
+
+  LeaveCriticalSection (&slotcrit);
+  return -1;
+}
+
+static void
+aionotify_on_pthread (struct sigevent *evp)
+{
+  pthread_attr_t *attr;
+  pthread_attr_t  default_attr;
+  int             rc;
+  pthread_t       vaquita; /* == "little porpoise", endangered, see below */
+
+  if (evp->sigev_notify_attributes)
+    attr = evp->sigev_notify_attributes;
+  else
+    {
+      pthread_attr_init (attr = &default_attr);
+      pthread_attr_setdetachstate (attr, PTHREAD_CREATE_DETACHED);
+    }
+
+  /* A "vaquita" thread is a temporary pthread created to deliver a signal to
+   * the application.  We don't wait around for the thread to return from the
+   * app.  There's some symbolism here of sending a little creature off to tell
+   * the app something important.  If all the vaquitas end up wiped out in the
+   * wild, a distinct near-term possibility, at least this code remembers them.
+   */
+  rc = pthread_create (&vaquita, attr,
+                       (void * (*) (void *)) evp->sigev_notify_function,
+                       evp->sigev_value.sival_ptr);
+
+  /* The following error is not expected. If seen often, develop a recovery. */
+  if (rc)
+    debug_printf ("aio vaquita thread creation failed, %E");
+
+  /* Should we wait for the signal delivery thread to finish?  We can't: Who
+   * knows what mischief the app coder may have in their handler?  Worst case
+   * is they accidentally used non-signal-safe functions in their handler.  We
+   * return hoping for the best and finish cleaning up our end of notification.
+   */
+  return;
+}
+
+static void
+aionotify (struct aiocb *aio)
+{
+  siginfo_t si = {0};
+  si.si_code = SI_ASYNCIO;
+
+  /* If signal notification wanted, send AIO-complete signal */
+  switch (aio->aio_sigevent.sigev_notify) {
+  case SIGEV_NONE:
+    break;
+
+  case SIGEV_SIGNAL:
+    si.si_signo = aio->aio_sigevent.sigev_signo;
+    si.si_value = aio->aio_sigevent.sigev_value;
+    if (si.si_signo)
+      sig_send (myself, si);
+    break;
+
+  case SIGEV_THREAD:
+    aionotify_on_pthread (&aio->aio_sigevent);
+    break;
+  }
+
+  /* If this op is on LIO list and is last op, send LIO-complete signal */
+  if (aio->aio_liocb)
+    {
+      if (1 == InterlockedExchangeAdd (&aio->aio_liocb->lio_count, -1))
+        {
+          /* LIO's count has decremented to zero */
+          switch (aio->aio_liocb->lio_sigevent->sigev_notify) {
+          case SIGEV_NONE:
+            break;
+
+          case SIGEV_SIGNAL:
+            si.si_signo = aio->aio_liocb->lio_sigevent->sigev_signo;
+            si.si_value = aio->aio_liocb->lio_sigevent->sigev_value;
+            if (si.si_signo)
+              sig_send (myself, si);
+            break;
+
+          case SIGEV_THREAD:
+            aionotify_on_pthread (aio->aio_liocb->lio_sigevent);
+            break;
+          }
+
+          free (aio->aio_liocb);
+          aio->aio_liocb = NULL;
+        }
+    }
+}
+
+static DWORD WINAPI __attribute__ ((noreturn))
+aiowaiter (void *unused)
+{ /* One instance, called on its own cygthread; runs until program exits */
+  struct aiocb *aio;
+
+  while (1)
+    {
+      /* Wait forever for at least one event to be set */
+      DWORD res = WaitForMultipleObjects(AIO_MAX, evt_handles, FALSE, INFINITE);
+      switch (res)
+        {
+          case WAIT_FAILED:
+            api_fatal ("aiowaiter fatal error, %E");
+
+          default:
+            if (res < WAIT_OBJECT_0 || res >= WAIT_OBJECT_0 + AIO_MAX)
+              api_fatal ("aiowaiter unexpected WFMO result %d", res);
+            int slot = res - WAIT_OBJECT_0;
+
+            /* Guard against "saw completion before request finished" gotcha */
+            EnterCriticalSection (&evt_locks[slot]);
+            LeaveCriticalSection (&evt_locks[slot]);
+
+            aio = evt_aiocbs[slot];
+            debug_printf ("WFMO returns %d, aio %p", res, aio);
+
+            if (aio->aio_errno == EBUSY)
+              {
+                /* Capture Windows status and convert to Cygwin status */
+                NTSTATUS status = (NTSTATUS) aio->aio_wincb.status;
+                if (NT_SUCCESS (status))
+                  {
+                    aio->aio_rbytes = (ssize_t) aio->aio_wincb.info;
+                    aio->aio_errno = 0;
+                  }
+                else
+                  {
+                    aio->aio_rbytes = -1;
+                    aio->aio_errno = geterrno_from_nt_status (status);
+                  }
+              }
+            else
+              {
+                /* Async operation was simulated; AIO status already updated */
+              }
+
+            /* Send completion signal if user requested it */
+            aionotify (aio);
+
+            /* Free up the slot used for this inline AIO.  We do this
+             * manually rather than calling aiorelslot() because we
+             * already have the slot number handy.
+             */
+            EnterCriticalSection (&slotcrit);
+            evt_aiocbs[slot] = NULL;
+            LeaveCriticalSection (&slotcrit);
+            debug_printf ("retired aio %p; slot %d released", aio, slot);
+
+            /* Notify workers that a slot has opened up */
+            sem_post (&worksem);
+        }
+    }
+}
+
+static int
+asyncread (struct aiocb *aio)
+{ /* Try to initiate an asynchronous read, either from app or worker thread */
+  ssize_t       res = 0;
+
+  cygheap_fdget cfd (aio->aio_fildes);
+  if (cfd < 0)
+    res = -1; /* errno has been set to EBADF */
+  else
+    {
+      int slot = aiogetslot (aio);
+      debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
+      if (slot >= 0)
+        {
+          EnterCriticalSection (&evt_locks[slot]);
+          aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
+          aio->aio_wincb.event = (void *) evt_handles[slot];
+          res = cfd->pread ((void *) aio->aio_buf, aio->aio_nbytes,
+                            aio->aio_offset, (void *) aio);
+          LeaveCriticalSection (&evt_locks[slot]);
+        }
+      else
+        {
+          set_errno (ENOBUFS); /* Internal use only */
+          res = -1;
+        }
+    }
+
+  return res;
+}
+
+static int
+asyncwrite (struct aiocb *aio)
+{ /* Try to initiate an asynchronous write, either from app or worker thread */
+  ssize_t       res = 0;
+
+  cygheap_fdget cfd (aio->aio_fildes);
+  if (cfd < 0)
+    res = -1; /* errno has been set to EBADF */
+  else
+    {
+      int slot = aiogetslot (aio);
+      debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
+      if (slot >= 0)
+        {
+          EnterCriticalSection (&evt_locks[slot]);
+          aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
+          aio->aio_wincb.event = (void *) evt_handles[slot];
+          res = cfd->pwrite ((void *) aio->aio_buf, aio->aio_nbytes,
+                             aio->aio_offset, (void *) aio);
+          LeaveCriticalSection (&evt_locks[slot]);
+        }
+      else
+        {
+          set_errno (ENOBUFS); /* Internal use only */
+          res = -1;
+        }
+    }
+
+  return res;
+}
+
+/* Have to forward ref because of chicken v. egg situation */
+static DWORD WINAPI __attribute__ ((noreturn)) aioworker (void *);
+
+static void
+aioinit (void)
+{
+  /* First a cheap test to speed processing after initialization completes */
+  if (aioinitialized >= 0)
+    {
+      /* Guard against multiple threads initializing at same time */
+      if (0 == InterlockedExchangeAdd (&aioinitialized, 1))
+        {
+          int       i = AIO_MAX;
+          char     *tnames = (char *) malloc (AIO_MAX * 8);
+
+          if (!tnames)
+            api_fatal ("couldn't create aioworker tname table");
+
+          InitializeCriticalSection (&slotcrit);
+          InitializeCriticalSection (&workcrit);
+          sem_init (&worksem, 0, 0);
+          TAILQ_INIT(&worklist);
+
+          /* Create AIO_MAX number of aioworker threads for queued AIOs */
+          while (i--)
+            {
+              __small_sprintf (&tnames[i * 8], "aio%d", AIO_MAX - i);
+              if (!new cygthread (aioworker, NULL, &tnames[i * 8]))
+                api_fatal ("couldn't create an aioworker thread, %E");
+            }
+
+          /* Initialize event handles and slot locks arrays for inline AIOs */
+          for (i = 0; i < AIO_MAX; ++i)
+            {
+              /* Events are non-inheritable, auto-reset, init unset, unnamed */
+              evt_handles[i] = CreateEvent (NULL, FALSE, FALSE, NULL);
+              if (!evt_handles[i])
+                api_fatal ("couldn't create an event, %E");
+
+              InitializeCriticalSection (&evt_locks[i]);
+            }
+
+          /* Create aiowaiter thread; waits for inline AIO completion events */
+          if (!new cygthread (aiowaiter, NULL, "aio"))
+            api_fatal ("couldn't create aiowaiter thread, %E");
+
+          /* Indicate we have completed initialization */
+          InterlockedExchange (&aioinitialized, -1);
+        }
+      else
+        /* If 'aioinitialized' is greater than zero, another thread is
+         * initializing for us; wait until 'aioinitialized' goes negative
+         */
+        while (InterlockedExchangeAdd (&aioinitialized, 0) >= 0)
+          yield ();
+    }
+}
+
+static int
+aioqueue (struct aiocb *aio)
+{ /* Add an AIO to the worklist, to be serviced by a worker thread */
+  if (aioinitialized >= 0)
+    aioinit ();
+
+  EnterCriticalSection (&workcrit);
+  TAILQ_INSERT_TAIL(&worklist, aio, aio_chain);
+  LeaveCriticalSection (&workcrit);
+
+  debug_printf ("queued aio %p", aio);
+  sem_post (&worksem);
+
+  return 0;
+}
+
+static DWORD WINAPI __attribute__ ((noreturn))
+aioworker (void *unused)
+{ /* Multiple instances; called on own cygthreads; runs 'til program exits */
+  struct aiocb *aio;
+
+  while (1)
+    {
+      /* Park here until there's work to do or a slot becomes available */
+      sem_wait (&worksem);
+
+look4work:
+      EnterCriticalSection (&workcrit);
+      if (TAILQ_EMPTY(&worklist))
+        {
+          /* Another aioworker picked up the work already */
+          LeaveCriticalSection (&workcrit);
+          continue;
+        }
+
+      /* Make sure a slot is available before starting this AIO */
+      aio = TAILQ_FIRST(&worklist);
+      int slot = aiogetslot (NULL);
+      if (slot >= 0) // a slot is available
+        TAILQ_REMOVE(&worklist, aio, aio_chain);
+      LeaveCriticalSection (&workcrit);
+      if (slot < 0) // no slot is available, so worklist unchanged and we park
+        continue;
+
+      debug_printf ("starting aio %p", aio);
+      switch (aio->aio_lio_opcode)
+        {
+          case LIO_NOP:
+            aio->aio_rbytes = 0;
+            break;
+
+          case LIO_READ:
+            aio->aio_rbytes = asyncread (aio);
+            break;
+
+          case LIO_WRITE:
+            aio->aio_rbytes = asyncwrite (aio);
+            break;
+
+          default:
+            errno = EINVAL;
+            aio->aio_rbytes = -1;
+            break;
+        }
+
+      /* If operation still underway, let aiowaiter hear about its finish */
+      if (aio->aio_rbytes == 0 && aio->aio_nbytes != 0) // not racy
+        continue;
+
+      /* If operation errored, save error number, else clear it */
+      if (aio->aio_rbytes == -1)
+        aio->aio_errno = get_errno ();
+      else
+        aio->aio_errno = 0;
+
+      /* If a slot for this queued async AIO was available, but we lost out */
+      if (aio->aio_errno == ENOBUFS)
+        {
+          aio->aio_errno = EINPROGRESS;
+          aioqueue (aio); /* Re-queue the AIO */
+
+          /* Another option would be to fail the AIO with error EAGAIN, but
+           * experience with iozone showed apps might not expect to see a
+           * deferred EAGAIN.  I.e. they should expect EAGAIN on their call to
+           * aio_read() or aio_write() but probably not expect to see EAGAIN
+           * on an aio_error() query after they'd previously seen EINPROGRESS
+           * on the initial AIO call.
+           */
+          continue;
+        }
+
+      /* If seeks aren't permitted on given fd, or pread|pwrite not legal */
+      if (aio->aio_errno == ESPIPE)
+        {
+          ssize_t res = 0;
+          off_t curpos;
+
+          cygheap_fdget cfd (aio->aio_fildes);
+          if (cfd < 0)
+            {
+              res = -1;
+              goto done; /* errno has been set to EBADF */
+            }
+
+          /* If we can get current file position, seek to aio_offset */
+          curpos = cfd->lseek (0, SEEK_CUR);
+          if (curpos < 0 || cfd->lseek (aio->aio_offset, SEEK_SET) < 0)
+            {
+              /* Can't seek */
+              res = curpos;
+              set_errno (0); /* Get rid of ESPIPE we've incurred */
+              aio->aio_errno = 0; /* Here too */
+            }
+
+          /* Do the requested AIO operation manually, synchronously */
+          switch (aio->aio_lio_opcode)
+            {
+              case LIO_READ:
+                /* 2nd argument to cfd->read() is passed by reference... */
+                cfd->read ((void *) aio->aio_buf, aio->aio_nbytes);
+                /* ...so on return it contains the number of bytes read */
+                res = aio->aio_nbytes;
+                break;
+
+              case LIO_WRITE:
+                res = cfd->write ((void *) aio->aio_buf, aio->aio_nbytes);
+                break;
+            }
+
+          /* If we had seeked successfully, restore original file position */
+          if (curpos >= 0)
+            if (cfd->lseek (curpos, SEEK_SET) < 0)
+              res = -1;
+
+done:
+          /* Update AIO to reflect final result */
+          aio->aio_rbytes = res;
+          aio->aio_errno = res == -1 ? get_errno () : 0;
+
+          /* Make like the requested async operation completed normally */
+          for (int i = 0; i < AIO_MAX; i++)
+            if (evt_aiocbs[i] == aio)
+              {
+                SetEvent (evt_handles[i]);
+                goto truly_done;
+              }
+
+          /* Free up the slot we ended up not using */
+          int slot = aiorelslot (aio);
+          debug_printf ("slot %d released", slot);
+        }
+
+      /* Send completion signal if user requested it */
+      aionotify (aio);
+
+truly_done:
+      debug_printf ("completed aio %p", aio);
+      goto look4work;
+    }
+}
+
+int
+aio_cancel (int fildes, struct aiocb *aio)
+{
+  int           aiocount = 0;
+  struct aiocb *ptr;
+  siginfo_t     si = {0};
+  si.si_code = SI_ASYNCIO;
+
+  /* Note 'aio' is allowed to be NULL here; it's used as a wildcard */
+restart:
+  EnterCriticalSection (&workcrit);
+  TAILQ_FOREACH(ptr, &worklist, aio_chain)
+    {
+      if (ptr->aio_fildes == fildes && (!aio || ptr == aio))
+        {
+          /* This queued AIO qualifies for cancellation */
+          TAILQ_REMOVE(&worklist, ptr, aio_chain);
+          LeaveCriticalSection (&workcrit);
+
+          ptr->aio_errno = ECANCELED;
+          ptr->aio_rbytes = -1;
+
+          /* If signal notification wanted, send AIO-canceled signal */
+          switch (ptr->aio_sigevent.sigev_notify) {
+          case SIGEV_NONE:
+            break;
+
+          case SIGEV_SIGNAL:
+            si.si_signo = ptr->aio_sigevent.sigev_signo;
+            si.si_value = ptr->aio_sigevent.sigev_value;
+            if (si.si_signo)
+              sig_send (myself, si);
+            break;
+
+          case SIGEV_THREAD:
+            aionotify_on_pthread (&ptr->aio_sigevent);
+            break;
+          }
+
+          ++aiocount;
+          goto restart;
+        }
+    }
+  LeaveCriticalSection (&workcrit);
+
+  /* Note that AIO_NOTCANCELED is not possible in this implementation.  That's
+   * because AIOs are dequeued to execute; the worklist search above won't
+   * find an AIO that's been dequeued from the worklist.
+   */
+  if (aiocount)
+    return AIO_CANCELED;
+  else
+    return AIO_ALLDONE;
+}
+
+int
+aio_error (const struct aiocb *aio)
+{
+  int res;
+
+  if (!aio)
+    {
+      set_errno (EINVAL);
+      return -1;
+    }
+
+  switch (aio->aio_errno)
+    {
+      case EBUSY:   /* This state for internal use only; not visible to app */
+      case ENOBUFS: /* This state for internal use only; not visible to app */
+        res = EINPROGRESS;
+        break;
+
+      default:
+        res = aio->aio_errno;
+    }
+
+  return res;
+}
+
+int
+aio_fsync (int mode, struct aiocb *aio)
+{
+  if (!aio)
+    {
+      set_errno (EINVAL);
+      return -1;
+    }
+
+  switch (mode)
+    {
+#if defined(O_SYNC)
+      case O_SYNC:
+        aio->aio_rbytes = fsync (aio->aio_fildes);
+        break;
+
+#if defined(O_DSYNC) && O_DSYNC != O_SYNC
+      case O_DSYNC:
+        aio->aio_rbytes = fdatasync (aio->aio_fildes);
+        break;
+#endif
+#endif
+
+      default:
+        set_errno (EINVAL);
+        return -1;
+    }
+
+  if (aio->aio_rbytes == -1)
+    aio->aio_errno = get_errno ();
+
+  return aio->aio_rbytes;
+}
+
+int
+aio_read (struct aiocb *aio)
+{
+  ssize_t       res = 0;
+
+  if (!aio)
+    {
+      set_errno (EINVAL);
+      return -1;
+    }
+  if (aioinitialized >= 0)
+    aioinit ();
+  if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
+    {
+      set_errno (EAGAIN);
+      return -1;
+    }
+
+  aio->aio_lio_opcode = LIO_READ;
+  aio->aio_errno = EINPROGRESS;
+  aio->aio_rbytes = -1;
+
+  /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
+  if (aio->aio_sigevent.sigev_signo == 0)
+    aio->aio_sigevent.sigev_notify = SIGEV_NONE;
+
+  /* Try to launch inline async read; only on ESPIPE/ENOBUFS is it queued */
+  pthread_testcancel ();
+  res = asyncread (aio);
+
+  /* If async read couldn't be launched, queue the AIO for a worker thread */
+  if (res == -1)
+    switch (get_errno ()) {
+    case ESPIPE:
+      {
+        int slot = aiorelslot (aio);
+        if (slot >= 0)
+          debug_printf ("slot %d released", slot);
+      }
+      /* fall through */
+
+    case ENOBUFS:
+      aio->aio_errno = EINPROGRESS;
+      aio->aio_rbytes = -1;
+
+      res = aioqueue (aio);
+      break;
+
+    default:
+      ; /* I think this is not possible */
+    }
+
+  return res;
+}
+
+ssize_t
+aio_return (struct aiocb *aio)
+{
+  if (!aio)
+    {
+      set_errno (EINVAL);
+      return -1;
+    }
+
+  switch (aio->aio_errno)
+    {
+      case EBUSY:       /* AIO is currently underway (internal state) */
+      case ENOBUFS:     /* AIO is currently underway (internal state) */
+      case EINPROGRESS: /* AIO has been queued successfully */
+        set_errno (EINPROGRESS);
+        return -1;
+
+      case EINVAL:      /* aio_return() has already been called on this AIO */
+        set_errno (aio->aio_errno);
+        return -1;
+
+      default:          /* AIO has completed, successfully or not */
+        ;
+    }
+
+  /* This AIO has completed so grab any error status if present */
+  if (aio->aio_rbytes == -1)
+    set_errno (aio->aio_errno);
+
+  /* Set this AIO's errno so later aio_return() calls on this AIO fail */
+  aio->aio_errno = EINVAL;
+
+  return aio->aio_rbytes;
+}
+
+static int
+aiosuspend (const struct aiocb *const aiolist[],
+         int nent, const struct timespec *timeout)
+{
+  /* Returns lowest list index of completed aios, else 'nent' if all completed.
+   * If none completed on entry, wait for interval specified by 'timeout'.
+   */
+  int       res;
+  sigset_t  sigmask;
+  siginfo_t si;
+  ULONGLONG ticks = 0;
+  ULONGLONG time0, time1;
+  struct timespec to = {0};
+
+  if (timeout)
+    {
+      to = *timeout;
+      if (to.tv_sec < 0 || to.tv_nsec < 0 || to.tv_nsec > NSPERSEC)
+        {
+          set_errno (EINVAL);
+          return -1;
+        }
+      ticks = (NS100PERSEC * to.tv_sec) +
+              ((to.tv_nsec + (NSPERSEC/NS100PERSEC) - 1) /
+                             (NSPERSEC/NS100PERSEC));
+    }
+
+retry:
+  sigemptyset (&sigmask);
+  int aiocount = 0;
+  for (int i = 0; i < nent; ++i)
+    if (aiolist[i] && aiolist[i]->aio_liocb)
+      {
+        if (aiolist[i]->aio_errno == EINPROGRESS ||
+            aiolist[i]->aio_errno == ENOBUFS ||
+            aiolist[i]->aio_errno == EBUSY)
+          {
+            ++aiocount;
+            if (aiolist[i]->aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
+                aiolist[i]->aio_sigevent.sigev_notify == SIGEV_THREAD)
+              sigaddset (&sigmask, aiolist[i]->aio_sigevent.sigev_signo);
+          }
+        else
+          return i;
+      }
+
+  if (aiocount == 0)
+    return nent;
+
+  if (timeout && ticks == 0)
+    {
+      set_errno (EAGAIN);
+      return -1;
+    }
+
+  QueryUnbiasedInterruptTime (&time0);
+  /* Note wait below is abortable even w/ empty sigmask and infinite timeout */
+  res = sigtimedwait (&sigmask, &si, timeout ? &to : NULL);
+  if (res == -1)
+    return -1; /* Return with errno set by failed sigtimedwait() */
+  QueryUnbiasedInterruptTime (&time1);
+
+  /* Adjust timeout to account for time just waited */
+  time1 -= time0;
+  if (time1 > ticks)
+    ticks = 0; // just in case we didn't get scheduled quickly
+  else
+    ticks -= time1;
+  to.tv_sec = ticks / NS100PERSEC;
+  to.tv_nsec = (ticks % NS100PERSEC) * (NSPERSEC / NS100PERSEC);
+
+  goto retry;
+}
+
+int
+aio_suspend (const struct aiocb *const aiolist[],
+             int nent, const struct timespec *timeout)
+{
+  int res;
+
+  if (nent < 0)
+    {
+      set_errno (EINVAL);
+      return -1;
+    }
+
+  pthread_testcancel ();
+  res = aiosuspend (aiolist, nent, timeout);
+
+  /* If there was an error, or no AIOs completed before or during timeout */
+  if (res == -1)
+    return res; /* If no AIOs completed, errno has been set to EAGAIN */
+
+  /* Else if all AIOs have completed */
+  else if (res == nent)
+    return 0;
+
+  /* Else at least one of the AIOs completed */
+  else
+    return 0;
+}
+
+int
+aio_write (struct aiocb *aio)
+{
+  ssize_t       res = 0;
+
+  if (!aio)
+    {
+      set_errno (EINVAL);
+      return -1;
+    }
+  if (aioinitialized >= 0)
+    aioinit ();
+  if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
+    {
+      set_errno (EAGAIN);
+      return -1;
+    }
+
+  aio->aio_lio_opcode = LIO_WRITE;
+  aio->aio_errno = EINPROGRESS;
+  aio->aio_rbytes = -1;
+
+  /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
+  if (aio->aio_sigevent.sigev_signo == 0)
+    aio->aio_sigevent.sigev_notify = SIGEV_NONE;
+
+  /* Try to launch inline async write; only on ESPIPE/ENOBUFS is it queued */
+  pthread_testcancel ();
+  res = asyncwrite (aio);
+
+  /* If async write couldn't be launched, queue the AIO for a worker thread */
+  if (res == -1)
+    switch (get_errno ()) {
+    case ESPIPE:
+      {
+        int slot = aiorelslot (aio);
+        if (slot >= 0)
+          debug_printf ("slot %d released", slot);
+      }
+      /* fall through */
+
+    case ENOBUFS:
+      aio->aio_errno = EINPROGRESS;
+      aio->aio_rbytes = -1;
+
+      res = aioqueue (aio);
+      break;
+
+    default:
+      ; /* I think this is not possible */
+    }
+
+  return res;
+}
+
+int
+lio_listio (int mode, struct aiocb *__restrict const aiolist[__restrict],
+            int nent, struct sigevent *__restrict sig)
+{
+  struct aiocb *aio;
+  struct __liocb *lio;
+
+  pthread_testcancel ();
+
+  if ((mode != LIO_WAIT && mode != LIO_NOWAIT) || 
+      (nent < 0 || nent > AIO_LISTIO_MAX))
+    {
+      set_errno (EINVAL);
+      return -1;
+    }
+
+  if (sig && nent && mode == LIO_NOWAIT)
+    {
+      lio = (struct __liocb *) malloc (sizeof (struct __liocb));
+      if (!lio)
+        {
+          set_errno (ENOMEM);
+          return -1;
+        }
+
+      lio->lio_count = nent;
+      lio->lio_sigevent = sig;
+    }
+  else
+    lio = NULL;
+
+  int aiocount = 0;
+  for (int i = 0; i < nent; ++i)
+    {
+      aio = (struct aiocb *) aiolist[i];
+      if (!aio)
+        {
+          if (lio)
+            InterlockedDecrement (&lio->lio_count);
+          continue;
+        }
+
+      aio->aio_liocb = lio;
+      switch (aio->aio_lio_opcode)
+        {
+          case LIO_NOP:
+            if (lio)
+              InterlockedDecrement (&lio->lio_count);
+            continue;
+
+          case LIO_READ:
+            aio_read (aio);
+            ++aiocount;
+            continue;
+
+          case LIO_WRITE:
+            aio_write (aio);
+            ++aiocount;
+            continue;
+
+          default:
+            break;
+        }
+
+      if (lio)
+        InterlockedDecrement (&lio->lio_count);
+      aio->aio_errno = EINVAL;
+      aio->aio_rbytes = -1;
+    }
+
+  /* mode is LIO_NOWAIT so return some kind of answer immediately */
+  if (mode == LIO_NOWAIT)
+    {
+      /* At least one AIO has been launched or queued */
+      if (aiocount)
+        return 0;
+
+      /* No AIOs have been launched or queued */
+      set_errno (EAGAIN);
+      return -1;
+    }
+
+  /* Else mode is LIO_WAIT so wait for all AIOs to complete or error */
+  while (nent)
+    {
+      int i = aiosuspend ((const struct aiocb *const *) aiolist, nent, NULL);
+      if (i >= nent)
+        break;
+      else
+        aiolist[i]->aio_liocb = NULL; /* Avoids repeating notify on this AIO */
+    }
+
+  return 0;
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/winsup/cygwin/include/aio.h b/winsup/cygwin/include/aio.h
new file mode 100644
index 000000000..523a47870
--- /dev/null
+++ b/winsup/cygwin/include/aio.h
@@ -0,0 +1,82 @@
+/* aio.h: Support for Posix asynchronous i/o routines.
+
+This file is part of Cygwin.
+
+This software is a copyrighted work licensed under the terms of the
+Cygwin license.  Please consult the file "CYGWIN_LICENSE" for
+details. */
+
+#ifndef _AIO_H
+#define _AIO_H
+
+#include <sys/features.h>
+#include <sys/queue.h>
+#include <sys/signal.h>
+#include <sys/types.h>
+#include <limits.h> // for AIO_LISTIO_MAX, AIO_MAX, and AIO_PRIO_DELTA_MAX
+
+/* defines for return value of aio_cancel() */
+#define AIO_ALLDONE     0
+#define AIO_CANCELED    1
+#define AIO_NOTCANCELED 2
+
+/* defines for 'mode' argument of lio_listio() */
+#define LIO_NOWAIT      0
+#define LIO_WAIT        1
+
+/* defines for 'aio_lio_opcode' element of struct aiocb */
+#define LIO_NOP         0
+#define LIO_READ        1
+#define LIO_WRITE       2
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* struct __liocb is Cygwin-specific */
+struct __liocb {
+    volatile uint32_t    lio_count;
+    struct sigevent     *lio_sigevent;
+};
+
+/* struct __wincb is Cygwin-specific */
+struct __wincb {
+    int32_t              status; /* These two fields must be first... */
+    uintptr_t            info;   /* ...and must be adjacent, in this order */
+    void                *event;
+};
+
+/* struct aiocb is defined by Posix */
+struct aiocb {
+    /* these elements of aiocb are Cygwin-specific */
+    TAILQ_ENTRY(aiocb)   aio_chain;
+    struct __liocb      *aio_liocb;
+    struct __wincb       aio_wincb;
+    ssize_t              aio_rbytes;
+    int                  aio_errno;
+    /* the remaining elements of aiocb are defined by Posix */
+    int                  aio_lio_opcode;
+    int                  aio_reqprio; /* Not yet implemented; must be zero */
+    int                  aio_fildes;
+    volatile void       *aio_buf;
+    size_t               aio_nbytes;
+    off_t                aio_offset;
+    struct sigevent      aio_sigevent;
+};
+
+/* function prototypes as defined by Posix */
+int     aio_cancel  (int, struct aiocb *);
+int     aio_error   (const struct aiocb *);
+int     aio_fsync   (int, struct aiocb *);
+int     aio_read    (struct aiocb *);
+ssize_t aio_return  (struct aiocb *);
+int     aio_suspend (const struct aiocb *const [], int,
+                        const struct timespec *);
+int     aio_write   (struct aiocb *);
+int     lio_listio  (int, struct aiocb *__restrict const [__restrict], int,
+                        struct sigevent *__restrict);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* _AIO_H */
-- 
2.17.0



More information about the Cygwin-patches mailing list