This is the mail archive of the
cygwin-patches
mailing list for the Cygwin project.
[PATCH v3 1/3] POSIX Asynchronous I/O support: aio files
- From: Mark Geisert <mark at maxrnd dot com>
- To: cygwin-patches at cygwin dot com
- Cc: Mark Geisert <mark at maxrnd dot com>
- Date: Sun, 15 Jul 2018 01:20:23 -0700
- Subject: [PATCH v3 1/3] POSIX Asynchronous I/O support: aio files
- References: <20180715082025.4920-1-mark@maxrnd.com>
This is the core of the AIO implementation: aio.cc and aio.h. The
latter is used within the Cygwin DLL by aio.cc and the fhandler* modules,
as well as by user programs wanting the AIO functionality.
---
winsup/cygwin/aio.cc | 984 ++++++++++++++++++++++++++++++++++++
winsup/cygwin/include/aio.h | 82 +++
2 files changed, 1066 insertions(+)
create mode 100644 winsup/cygwin/aio.cc
create mode 100644 winsup/cygwin/include/aio.h
diff --git a/winsup/cygwin/aio.cc b/winsup/cygwin/aio.cc
new file mode 100644
index 000000000..b8f367dc6
--- /dev/null
+++ b/winsup/cygwin/aio.cc
@@ -0,0 +1,984 @@
+/* aio.cc: Posix asynchronous i/o functions.
+
+This file is part of Cygwin.
+
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
+
+#include "winsup.h"
+#include "path.h"
+#include "fhandler.h"
+#include "dtable.h"
+#include "cygheap.h"
+#include "sigproc.h"
+#include <aio.h>
+#include <fcntl.h>
+#include <semaphore.h>
+#include <unistd.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* 'aioinitialized' is a thread-safe status of AIO feature initialization:
+ * 0 means uninitialized, >0 means initializing, <0 means initialized
+ */
+static NO_COPY volatile LONG aioinitialized = 0;
+
+/* This implementation supports two flavors of asynchronous operation:
+ * "inline" and "queued". Inline AIOs are used when:
+ * (1) fd refers to a local non-locked disk file opened in binary mode,
+ * (2) no more than AIO_MAX inline AIOs will be in progress at same time.
+ * In all other cases queued AIOs will be used.
+ *
+ * An inline AIO is performed by the calling app's thread as a pread|pwrite on
+ * a shadow fd that permits Windows asynchronous i/o, with event notification
+ * on completion. Event arrival causes AIO context for the fd to be updated.
+ *
+ * A queued AIO is performed in a similar manner, but by an AIO worker thread
+ * rather than the calling app's thread. The queued flavor can also operate
+ * on sockets, pipes, non-binary files, mandatory-locked files, and files
+ * that don't support pread|pwrite. Generally all these cases are handled as
+ * synchronous read|write operations, but still don't delay the app because
+ * they're taken care of by AIO worker threads.
+ */
+
+/* These variables support inline AIO operations */
+static NO_COPY HANDLE evt_handles[AIO_MAX];
+static NO_COPY struct aiocb *evt_aiocbs[AIO_MAX];
+static NO_COPY CRITICAL_SECTION evt_locks[AIO_MAX]; /* per-slot locks */
+static NO_COPY CRITICAL_SECTION slotcrit; /* lock for slot variables in toto */
+
+/* These variables support queued AIO operations */
+static NO_COPY sem_t worksem; /* tells whether AIOs are queued */
+static NO_COPY CRITICAL_SECTION workcrit; /* lock for AIO work queue */
+TAILQ_HEAD(queue, aiocb) worklist = TAILQ_HEAD_INITIALIZER(worklist);
+
+static int
+aiochkslot (struct aiocb *aio)
+{
+ /* Sanity check.. make sure this AIO is not already busy */
+ for (int slot = 0; slot < AIO_MAX; ++slot)
+ if (evt_aiocbs[slot] == aio)
+ {
+ debug_printf ("aio %p is already busy in slot %d", aio, slot);
+ return slot;
+ }
+
+ return -1;
+}
+
+static int
+aiogetslot (struct aiocb *aio)
+{
+ EnterCriticalSection (&slotcrit);
+
+ /* Find free slot for this inline AIO; if none available AIO will be queued */
+ for (int slot = 0; slot < AIO_MAX; ++slot)
+ if (evt_aiocbs[slot] == NULL)
+ {
+ /* If aio is NULL this is just an availability check.. no change made */
+ if (aio)
+ evt_aiocbs[slot] = aio;
+ LeaveCriticalSection (&slotcrit);
+ return slot;
+ }
+
+ LeaveCriticalSection (&slotcrit);
+ return -1;
+}
+
+static int
+aiorelslot (struct aiocb *aio)
+{
+ /* Find slot associated with this inline AIO and free it */
+ EnterCriticalSection (&slotcrit);
+ for (int slot = 0; slot < AIO_MAX; ++slot)
+ if (evt_aiocbs[slot] == aio)
+ {
+ evt_aiocbs[slot] = NULL;
+ LeaveCriticalSection (&slotcrit);
+ return slot;
+ }
+
+ LeaveCriticalSection (&slotcrit);
+ return -1;
+}
+
+static void
+aionotify_on_pthread (struct sigevent *evp)
+{
+ pthread_attr_t *attr;
+ pthread_attr_t default_attr;
+ int rc;
+ pthread_t vaquita; /* == "little porpoise", endangered */
+
+ if (evp->sigev_notify_attributes)
+ attr = evp->sigev_notify_attributes;
+ else
+ {
+ pthread_attr_init (attr = &default_attr);
+ pthread_attr_setdetachstate (attr, PTHREAD_CREATE_DETACHED);
+ }
+
+ rc = pthread_create (&vaquita, attr,
+ (void * (*) (void *)) evp->sigev_notify_function,
+ evp->sigev_value.sival_ptr);
+
+ /* The following error is not expected. If seen often, develop a recovery. */
+ if (rc)
+ debug_printf ("aio vaquita thread creation failed, %E");
+
+ /* Should we wait for the signal delivery thread to finish? We can't: Who
+ * knows what mischief the app coder may have in their handler? Worst case
+ * is they accidentally used non-signal-safe functions in their handler. We
+ * return hoping for the best and finish cleaning up our end of notification.
+ */
+ return;
+}
+
+static void
+aionotify (struct aiocb *aio)
+{
+ siginfo_t si = {0};
+ si.si_code = SI_ASYNCIO;
+
+ /* If signal notification wanted, send AIO-complete signal */
+ switch (aio->aio_sigevent.sigev_notify) {
+ case SIGEV_NONE:
+ break;
+
+ case SIGEV_SIGNAL:
+ si.si_signo = aio->aio_sigevent.sigev_signo;
+ si.si_value = aio->aio_sigevent.sigev_value;
+ if (si.si_signo)
+ sig_send (myself, si);
+ break;
+
+ case SIGEV_THREAD:
+ aionotify_on_pthread (&aio->aio_sigevent);
+ break;
+ }
+
+ /* If this op is on LIO list and is last op, send LIO-complete signal */
+ if (aio->aio_liocb)
+ {
+ if (1 == InterlockedExchangeAdd (&aio->aio_liocb->lio_count, -1))
+ {
+ /* LIO's count has decremented to zero */
+ switch (aio->aio_liocb->lio_sigevent->sigev_notify) {
+ case SIGEV_NONE:
+ break;
+
+ case SIGEV_SIGNAL:
+ si.si_signo = aio->aio_liocb->lio_sigevent->sigev_signo;
+ si.si_value = aio->aio_liocb->lio_sigevent->sigev_value;
+ if (si.si_signo)
+ sig_send (myself, si);
+ break;
+
+ case SIGEV_THREAD:
+ aionotify_on_pthread (aio->aio_liocb->lio_sigevent);
+ break;
+ }
+
+ free (aio->aio_liocb);
+ aio->aio_liocb = NULL;
+ }
+ }
+}
+
+static DWORD WINAPI __attribute__ ((noreturn))
+aiowaiter (void *unused)
+{ /* One instance, called on its own cygthread; runs until program exits */
+ struct aiocb *aio;
+
+ while (1)
+ {
+ /* Wait forever for at least one event to be set */
+ DWORD res = WaitForMultipleObjects(AIO_MAX, evt_handles, FALSE, INFINITE);
+ switch (res)
+ {
+ case WAIT_FAILED:
+ api_fatal ("aiowaiter fatal error, %E");
+
+ default:
+ if (res < WAIT_OBJECT_0 || res >= WAIT_OBJECT_0 + AIO_MAX)
+ api_fatal ("aiowaiter unexpected WFMO result %d", res);
+ int slot = res - WAIT_OBJECT_0;
+
+ /* Guard against "saw completion before request finished" gotcha */
+ EnterCriticalSection (&evt_locks[slot]);
+ LeaveCriticalSection (&evt_locks[slot]);
+
+ aio = evt_aiocbs[slot];
+ debug_printf ("WFMO returns %d, aio %p", res, aio);
+
+ if (aio->aio_errno == EBUSY)
+ {
+ /* Capture Windows status and convert to Cygwin status */
+ NTSTATUS status = aio->aio_win_iosb.Status;
+ if (NT_SUCCESS (status))
+ {
+ aio->aio_rbytes = aio->aio_win_iosb.Information;
+ aio->aio_errno = 0;
+ }
+ else
+ {
+ aio->aio_rbytes = -1;
+ aio->aio_errno = geterrno_from_nt_status (status);
+ }
+ }
+ else
+ {
+ /* Async operation was simulated; AIO status already updated */
+ }
+
+ /* Send completion signal if user requested it */
+ aionotify (aio);
+
+ /* Free up the slot used for this inline AIO. We do this
+ * manually rather than calling aiorelslot() because we
+ * already have the slot number handy.
+ */
+ EnterCriticalSection (&slotcrit);
+ evt_aiocbs[slot] = NULL;
+ LeaveCriticalSection (&slotcrit);
+ debug_printf ("retired aio %p; slot %d released", aio, slot);
+
+ /* Notify workers that a slot has opened up */
+ sem_post (&worksem);
+ }
+ }
+}
+
+static int
+asyncread (struct aiocb *aio)
+{ /* Try to initiate an asynchronous read, either from app or worker thread */
+ ssize_t res = 0;
+
+ cygheap_fdget cfd (aio->aio_fildes);
+ if (cfd < 0)
+ res = -1; /* errno has been set to EBADF */
+ else
+ {
+ int slot = aiogetslot (aio);
+ debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
+ if (slot >= 0)
+ {
+ EnterCriticalSection (&evt_locks[slot]);
+ aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
+ aio->aio_win_event = evt_handles[slot];
+ res = cfd->pread ((void *) aio->aio_buf, aio->aio_nbytes,
+ aio->aio_offset, (void *) aio);
+ LeaveCriticalSection (&evt_locks[slot]);
+ }
+ else
+ {
+ set_errno (ENOBUFS); /* Internal use only */
+ res = -1;
+ }
+ }
+
+ return res;
+}
+
+static int
+asyncwrite (struct aiocb *aio)
+{ /* Try to initiate an asynchronous write, either from app or worker thread */
+ ssize_t res = 0;
+
+ cygheap_fdget cfd (aio->aio_fildes);
+ if (cfd < 0)
+ res = -1; /* errno has been set to EBADF */
+ else
+ {
+ int slot = aiogetslot (aio);
+ debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
+ if (slot >= 0)
+ {
+ EnterCriticalSection (&evt_locks[slot]);
+ aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
+ aio->aio_win_event = evt_handles[slot];
+ res = cfd->pwrite ((void *) aio->aio_buf, aio->aio_nbytes,
+ aio->aio_offset, (void *) aio);
+ LeaveCriticalSection (&evt_locks[slot]);
+ }
+ else
+ {
+ set_errno (ENOBUFS); /* Internal use only */
+ res = -1;
+ }
+ }
+
+ return res;
+}
+
+/* Have to forward ref because of chicken v. egg situation */
+static DWORD WINAPI __attribute__ ((noreturn)) aioworker (void *);
+
+static void
+aioinit (void)
+{
+ /* First a cheap test to speed processing after initialization completes */
+ if (aioinitialized >= 0)
+ {
+ /* Guard against multiple threads initializing at same time */
+ if (0 == InterlockedExchangeAdd (&aioinitialized, 1))
+ {
+ int i = AIO_MAX;
+ char *tnames = (char *) malloc (AIO_MAX * 8);
+
+ if (!tnames)
+ api_fatal ("couldn't create aioworker tname table");
+
+ InitializeCriticalSection (&slotcrit);
+ InitializeCriticalSection (&workcrit);
+ sem_init (&worksem, 0, 0);
+ TAILQ_INIT(&worklist);
+
+ /* Create AIO_MAX number of aioworker threads for queued AIOs */
+ while (i--)
+ {
+ __small_sprintf (&tnames[i * 8], "aio%d", AIO_MAX - i);
+ if (!new cygthread (aioworker, NULL, &tnames[i * 8]))
+ api_fatal ("couldn't create an aioworker thread, %E");
+ }
+
+ /* Initialize event handles and slot locks arrays for inline AIOs */
+ for (i = 0; i < AIO_MAX; ++i)
+ {
+ /* Events are non-inheritable, auto-reset, init unset, unnamed */
+ evt_handles[i] = CreateEvent (NULL, FALSE, FALSE, NULL);
+ if (!evt_handles[i])
+ api_fatal ("couldn't create an event, %E");
+
+ InitializeCriticalSection (&evt_locks[i]);
+ }
+
+ /* Create aiowaiter thread; waits for inline AIO completion events */
+ if (!new cygthread (aiowaiter, NULL, "aio"))
+ api_fatal ("couldn't create aiowaiter thread, %E");
+
+ /* Indicate we have completed initialization */
+ InterlockedExchange (&aioinitialized, -1);
+ }
+ else
+ /* If 'aioinitialized' is greater than zero, another thread is
+ * initializing for us; wait until 'aioinitialized' goes negative
+ */
+ while (InterlockedExchangeAdd (&aioinitialized, 0) >= 0)
+ yield ();
+ }
+}
+
+static int
+aioqueue (struct aiocb *aio)
+{ /* Add an AIO to the worklist, to be serviced by a worker thread */
+ if (aioinitialized >= 0)
+ aioinit ();
+
+ EnterCriticalSection (&workcrit);
+ TAILQ_INSERT_TAIL(&worklist, aio, aio_chain);
+ LeaveCriticalSection (&workcrit);
+
+ debug_printf ("queued aio %p", aio);
+ sem_post (&worksem);
+
+ return 0;
+}
+
+static DWORD WINAPI __attribute__ ((noreturn))
+aioworker (void *unused)
+{ /* Multiple instances; called on own cygthreads; runs 'til program exits */
+ struct aiocb *aio;
+
+ while (1)
+ {
+ /* Park here until there's work to do or a slot becomes available */
+ sem_wait (&worksem);
+
+look4work:
+ EnterCriticalSection (&workcrit);
+ if (TAILQ_EMPTY(&worklist))
+ {
+ /* Another aioworker picked up the work already */
+ LeaveCriticalSection (&workcrit);
+ continue;
+ }
+
+ /* Make sure we have a slot before starting this AIO */
+ aio = TAILQ_FIRST(&worklist);
+ int slot = aiogetslot (NULL);
+ if (slot >= 0) // we have a slot
+ TAILQ_REMOVE(&worklist, aio, aio_chain);
+ LeaveCriticalSection (&workcrit);
+ if (slot < 0) // we have no slot, so worklist unchanged and we park
+ continue;
+
+ debug_printf ("starting aio %p", aio);
+ switch (aio->aio_lio_opcode)
+ {
+ case LIO_NOP:
+ aio->aio_rbytes = 0;
+ break;
+
+ case LIO_READ:
+ aio->aio_rbytes = asyncread (aio);
+ break;
+
+ case LIO_WRITE:
+ aio->aio_rbytes = asyncwrite (aio);
+ break;
+
+ default:
+ errno = EINVAL;
+ aio->aio_rbytes = -1;
+ break;
+ }
+
+ /* If operation still underway, let aiowaiter hear about its finish */
+ if (aio->aio_rbytes == 0 && aio->aio_nbytes != 0) // not racy
+ continue;
+
+ /* If operation errored, save error number, else clear it */
+ if (aio->aio_rbytes == -1)
+ aio->aio_errno = get_errno ();
+ else
+ aio->aio_errno = 0;
+
+ /* If we thought we had a slot for this queued async AIO, but lost out */
+ if (aio->aio_errno == ENOBUFS)
+ {
+ aio->aio_errno = EINPROGRESS;
+ aioqueue (aio); /* Re-queue the AIO */
+
+ /* Another option would be to fail the AIO with error EAGAIN, but
+ * experience with iozone showed apps might not expect to see a
+ * deferred EAGAIN. I.e. they should expect EAGAIN on their call to
+ * aio_read() or aio_write() but probably not expect to see EAGAIN
+ * on an aio_error() query after they'd previously seen EINPROGRESS
+ * on the initial AIO call.
+ */
+ continue;
+ }
+
+ /* If seeks aren't permitted on given fd, or pread|pwrite not legal */
+ if (aio->aio_errno == ESPIPE)
+ {
+ ssize_t res = 0;
+ off_t curpos;
+
+ cygheap_fdget cfd (aio->aio_fildes);
+ if (cfd < 0)
+ {
+ res = -1;
+ goto done; /* errno has been set to EBADF */
+ }
+
+ /* If we can get current file position, seek to aio_offset */
+ curpos = cfd->lseek (0, SEEK_CUR);
+ if (curpos < 0 || cfd->lseek (aio->aio_offset, SEEK_SET) < 0)
+ {
+ /* Can't seek */
+ res = curpos;
+ set_errno (0); /* Get rid of ESPIPE we've incurred */
+ aio->aio_errno = 0; /* Here too */
+ }
+
+ /* Do the requested AIO operation manually, synchronously */
+ switch (aio->aio_lio_opcode)
+ {
+ case LIO_READ:
+ /*XXX Hmm, no direct result? This OK? */
+ cfd->read ((void *) aio->aio_buf, aio->aio_nbytes);
+ res = aio->aio_nbytes;
+ break;
+
+ case LIO_WRITE:
+ res = cfd->write ((void *) aio->aio_buf, aio->aio_nbytes);
+ break;
+ }
+
+ /* If we had seeked successfully, restore original file position */
+ if (curpos >= 0)
+ if (cfd->lseek (curpos, SEEK_SET) < 0)
+ res = -1;
+
+done:
+ /* Update AIO to reflect final result */
+ aio->aio_rbytes = res;
+ aio->aio_errno = res == -1 ? get_errno () : 0;
+
+ /* Make like the requested async operation completed normally */
+ for (int i = 0; i < AIO_MAX; i++)
+ if (evt_aiocbs[i] == aio)
+ {
+ SetEvent (evt_handles[i]);
+ goto truly_done;
+ }
+
+ /* Free up the slot we ended up not using */
+ int slot = aiorelslot (aio);
+ debug_printf ("slot %d released", slot);
+ }
+
+ /* Send completion signal if user requested it */
+ aionotify (aio);
+
+truly_done:
+ debug_printf ("completed aio %p", aio);
+ goto look4work;
+ }
+}
+
+int
+aio_cancel (int fildes, struct aiocb *aio)
+{
+ int aiocount = 0;
+ struct aiocb *ptr;
+ siginfo_t si = {0};
+ si.si_code = SI_ASYNCIO;
+
+ /* Note 'aio' is allowed to be NULL here; it's used as a wildcard */
+restart:
+ EnterCriticalSection (&workcrit);
+ TAILQ_FOREACH(ptr, &worklist, aio_chain)
+ {
+ if (ptr->aio_fildes == fildes && (!aio || ptr == aio))
+ {
+ /* This queued AIO qualifies for cancellation */
+ TAILQ_REMOVE(&worklist, ptr, aio_chain);
+ LeaveCriticalSection (&workcrit);
+
+ ptr->aio_errno = ECANCELED;
+ ptr->aio_rbytes = -1;
+
+ /* If signal notification wanted, send AIO-canceled signal */
+ switch (ptr->aio_sigevent.sigev_notify) {
+ case SIGEV_NONE:
+ break;
+
+ case SIGEV_SIGNAL:
+ si.si_signo = ptr->aio_sigevent.sigev_signo;
+ si.si_value = ptr->aio_sigevent.sigev_value;
+ if (si.si_signo)
+ sig_send (myself, si);
+ break;
+
+ case SIGEV_THREAD:
+ aionotify_on_pthread (&ptr->aio_sigevent);
+ break;
+ }
+
+ ++aiocount;
+ goto restart;
+ }
+ }
+ LeaveCriticalSection (&workcrit);
+
+ /* Note that AIO_NOTCANCELED is not possible in this implementation. That's
+ * because AIOs are dequeued to execute; the worklist search above won't
+ * find an AIO that's been dequeued from the worklist.
+ */
+ if (aiocount)
+ return AIO_CANCELED;
+ else
+ return AIO_ALLDONE;
+}
+
+int
+aio_error (const struct aiocb *aio)
+{
+ int res;
+
+ if (!aio)
+ {
+ set_errno (EINVAL);
+ return -1;
+ }
+
+ switch (aio->aio_errno)
+ {
+ case EBUSY: /* This state for internal use only; not visible to app */
+ case ENOBUFS: /* This state for internal use only; not visible to app */
+ res = EINPROGRESS;
+ break;
+
+ default:
+ res = aio->aio_errno;
+ }
+
+ return res;
+}
+
+#ifdef _POSIX_SYNCHRONIZED_IO
+int
+aio_fsync (int mode, struct aiocb *aio)
+{
+ if (!aio)
+ {
+ set_errno (EINVAL);
+ return -1;
+ }
+
+ switch (mode)
+ {
+#if defined(O_SYNC)
+ case O_SYNC:
+ aio->aio_rbytes = fsync (aio->aio_fildes);
+ break;
+
+#if defined(O_DSYNC) && O_DSYNC != O_SYNC
+ case O_DSYNC:
+ aio->aio_rbytes = fdatasync (aio->aio_fildes);
+ break;
+#endif
+#endif
+
+ default:
+ set_errno (EINVAL);
+ return -1;
+ }
+
+ if (aio->aio_rbytes == -1)
+ aio->aio_errno = get_errno ();
+
+ return aio->aio_rbytes;
+}
+#endif /* _POSIX_SYNCHRONIZED_IO */
+
+int
+aio_read (struct aiocb *aio)
+{
+ ssize_t res = 0;
+
+ if (!aio)
+ {
+ set_errno (EINVAL);
+ return -1;
+ }
+ if (aioinitialized >= 0)
+ aioinit ();
+ if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
+ {
+ set_errno (EAGAIN);
+ return -1;
+ }
+
+ aio->aio_lio_opcode = LIO_READ;
+ aio->aio_errno = EINPROGRESS;
+ aio->aio_rbytes = -1;
+
+ /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
+ if (aio->aio_sigevent.sigev_signo == 0)
+ aio->aio_sigevent.sigev_notify = SIGEV_NONE;
+
+ /* Try to launch inline async read; only on ESPIPE/ENOBUFS is it queued */
+ pthread_testcancel ();
+ res = asyncread (aio);
+
+ /* If async read couldn't be launched, queue the AIO for a worker thread */
+ if (res == -1)
+ switch (get_errno ()) {
+ case ESPIPE:
+ {
+ int slot = aiorelslot (aio);
+ if (slot >= 0)
+ debug_printf ("slot %d released", slot);
+ }
+ /* fall through */
+
+ case ENOBUFS:
+ aio->aio_errno = EINPROGRESS;
+ aio->aio_rbytes = -1;
+
+ res = aioqueue (aio);
+ break;
+
+ default:
+ ; /* I think this is not possible */
+ }
+
+ return res;
+}
+
+ssize_t
+aio_return (struct aiocb *aio)
+{
+ if (!aio)
+ {
+ set_errno (EINVAL);
+ return -1;
+ }
+
+ switch (aio->aio_errno)
+ {
+ case EBUSY: /* AIO is currently underway (internal state) */
+ case ENOBUFS: /* AIO is currently underway (internal state) */
+ case EINPROGRESS: /* AIO has been queued successfully */
+ set_errno (EINPROGRESS);
+ return -1;
+
+ case EINVAL: /* aio_return() has already been called on this AIO */
+ set_errno (aio->aio_errno);
+ return -1;
+
+ default: /* AIO has completed, successfully or not */
+ ;
+ }
+
+ /* This AIO has completed so grab any error status if present */
+ if (aio->aio_rbytes == -1)
+ set_errno (aio->aio_errno);
+
+ /* Set this AIO's errno so later aio_return() calls on this AIO fail */
+ aio->aio_errno = EINVAL;
+
+ return aio->aio_rbytes;
+}
+
+static int
+aiosuspend (const struct aiocb *const aiolist[],
+ int nent, const struct timespec *timeout)
+{
+ /* Returns lowest list index of completed aios, else 'nent' if all completed.
+ * If none completed on entry, wait for interval specified by 'timeout'.
+ */
+ DWORD msecs = 0;
+ int res;
+ sigset_t sigmask;
+ siginfo_t si;
+ DWORD time0, time1;
+ struct timespec *to = (struct timespec *) timeout;
+
+ if (to)
+ msecs = (1000 * to->tv_sec) + ((to->tv_nsec + 999999) / 1000000);
+
+retry:
+ sigemptyset (&sigmask);
+ int aiocount = 0;
+ for (int i = 0; i < nent; ++i)
+ if (aiolist[i] && aiolist[i]->aio_liocb)
+ {
+ if (aiolist[i]->aio_errno == EINPROGRESS ||
+ aiolist[i]->aio_errno == ENOBUFS ||
+ aiolist[i]->aio_errno == EBUSY)
+ {
+ ++aiocount;
+ if (aiolist[i]->aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
+ aiolist[i]->aio_sigevent.sigev_notify == SIGEV_THREAD)
+ sigaddset (&sigmask, aiolist[i]->aio_sigevent.sigev_signo);
+ }
+ else
+ return i;
+ }
+
+ if (aiocount == 0)
+ return nent;
+
+ if (to && msecs == 0)
+ {
+ set_errno (EAGAIN);
+ return -1;
+ }
+
+ time0 = GetTickCount ();
+ //XXX Is it possible have an empty signal mask and infinite timeout?
+ res = sigtimedwait (&sigmask, &si, to);
+ if (res == -1)
+ return -1; /* Return with errno set by failed sigtimedwait() */
+ time1 = GetTickCount ();
+
+ /* Adjust timeout to account for time just waited */
+ msecs -= (time1 - time0);
+ if (msecs < 0)
+ msecs = 0;
+ to->tv_sec = msecs / 1000;
+ to->tv_nsec = (msecs % 1000) * 1000000;
+
+ goto retry;
+}
+
+int
+aio_suspend (const struct aiocb *const aiolist[],
+ int nent, const struct timespec *timeout)
+{
+ int res;
+
+ if (nent < 0)
+ {
+ set_errno (EINVAL);
+ return -1;
+ }
+
+ pthread_testcancel ();
+ res = aiosuspend (aiolist, nent, timeout);
+
+ /* If there was an error, or no AIOs completed before or during timeout */
+ if (res == -1)
+ return res; /* If no AIOs completed, errno has been set to EAGAIN */
+
+ /* Else if all AIOs have completed */
+ else if (res == nent)
+ return 0;
+
+ /* Else at least one of the AIOs completed */
+ else
+ return 0;
+}
+
+int
+aio_write (struct aiocb *aio)
+{
+ ssize_t res = 0;
+
+ if (!aio)
+ {
+ set_errno (EINVAL);
+ return -1;
+ }
+ if (aioinitialized >= 0)
+ aioinit ();
+ if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
+ {
+ set_errno (EAGAIN);
+ return -1;
+ }
+
+ aio->aio_lio_opcode = LIO_WRITE;
+ aio->aio_errno = EINPROGRESS;
+ aio->aio_rbytes = -1;
+
+ /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
+ if (aio->aio_sigevent.sigev_signo == 0)
+ aio->aio_sigevent.sigev_notify = SIGEV_NONE;
+
+ /* Try to launch inline async write; only on ESPIPE/ENOBUFS is it queued */
+ pthread_testcancel ();
+ res = asyncwrite (aio);
+
+ /* If async write couldn't be launched, queue the AIO for a worker thread */
+ if (res == -1)
+ switch (get_errno ()) {
+ case ESPIPE:
+ {
+ int slot = aiorelslot (aio);
+ if (slot >= 0)
+ debug_printf ("slot %d released", slot);
+ }
+ /* fall through */
+
+ case ENOBUFS:
+ aio->aio_errno = EINPROGRESS;
+ aio->aio_rbytes = -1;
+
+ res = aioqueue (aio);
+ break;
+
+ default:
+ ; /* I think this is not possible */
+ }
+
+ return res;
+}
+
+int
+lio_listio (int mode, struct aiocb *__restrict const aiolist[__restrict],
+ int nent, struct sigevent *__restrict sig)
+{
+ struct aiocb *aio;
+ struct liocb *lio;
+
+ pthread_testcancel ();
+
+ if ((mode != LIO_WAIT && mode != LIO_NOWAIT) ||
+ (nent < 0 || nent > AIO_LISTIO_MAX))
+ {
+ set_errno (EINVAL);
+ return -1;
+ }
+
+ if (sig && nent && mode == LIO_NOWAIT)
+ {
+ lio = (struct liocb *) malloc (sizeof (struct liocb));
+ if (!lio)
+ {
+ set_errno (ENOMEM);
+ return -1;
+ }
+
+ lio->lio_count = nent;
+ lio->lio_sigevent = sig;
+ }
+ else
+ lio = NULL;
+
+ int aiocount = 0;
+ for (int i = 0; i < nent; ++i)
+ {
+ aio = (struct aiocb *) aiolist[i];
+ if (!aio)
+ {
+ if (lio)
+ InterlockedDecrement (&lio->lio_count);
+ continue;
+ }
+
+ aio->aio_liocb = lio;
+ switch (aio->aio_lio_opcode)
+ {
+ case LIO_NOP:
+ if (lio)
+ InterlockedDecrement (&lio->lio_count);
+ continue;
+
+ case LIO_READ:
+ aio_read (aio);
+ ++aiocount;
+ continue;
+
+ case LIO_WRITE:
+ aio_write (aio);
+ ++aiocount;
+ continue;
+
+ default:
+ break;
+ }
+
+ if (lio)
+ InterlockedDecrement (&lio->lio_count);
+ aio->aio_errno = EINVAL;
+ aio->aio_rbytes = -1;
+ }
+
+ /* mode is LIO_NOWAIT so return some kind of answer immediately */
+ if (mode == LIO_NOWAIT)
+ {
+ /* At least one AIO has been launched or queued */
+ if (aiocount)
+ return 0;
+
+ /* No AIOs have been launched or queued */
+ set_errno (EAGAIN);
+ return -1;
+ }
+
+ /* Else mode is LIO_WAIT so wait for all AIOs to complete or error */
+ while (nent)
+ {
+ int i = aiosuspend ((const struct aiocb *const *) aiolist, nent, NULL);
+ if (i >= nent)
+ break;
+ else
+ aiolist[i]->aio_liocb = NULL; /* Avoids repeating notify on this AIO */
+ }
+
+ return 0;
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/winsup/cygwin/include/aio.h b/winsup/cygwin/include/aio.h
new file mode 100644
index 000000000..34ff29969
--- /dev/null
+++ b/winsup/cygwin/include/aio.h
@@ -0,0 +1,82 @@
+/* aio.h: Support for Posix asynchronous i/o routines.
+
+This file is part of Cygwin.
+
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
+
+#ifndef _AIO_H
+#define _AIO_H
+
+#include <sys/features.h>
+#include <sys/queue.h>
+#include <sys/signal.h>
+#include <sys/types.h>
+#include <limits.h> // for AIO_LISTIO_MAX, AIO_MAX, and AIO_PRIO_DELTA_MAX
+
+#ifndef __INSIDE_CYGWIN__
+#include <w32api/winternl.h> // for HANDLE and IO_STATUS_BLOCK
+#endif
+
+/* defines for return value of aio_cancel() */
+#define AIO_ALLDONE 0
+#define AIO_CANCELED 1
+#define AIO_NOTCANCELED 2
+
+/* defines for 'mode' argument of lio_listio() */
+#define LIO_NOWAIT 0
+#define LIO_WAIT 1
+
+/* defines for 'aio_lio_opcode' element of struct aiocb */
+#define LIO_NOP 0
+#define LIO_READ 1
+#define LIO_WRITE 2
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* struct liocb is Cygwin-specific */
+struct liocb {
+ volatile LONG lio_count;
+ struct sigevent *lio_sigevent;
+};
+
+/* struct aiocb is defined by Posix */
+struct aiocb {
+ /* these elements of aiocb are Cygwin-specific */
+ TAILQ_ENTRY(aiocb) aio_chain;
+ struct liocb *aio_liocb;
+ HANDLE aio_win_event;
+ IO_STATUS_BLOCK aio_win_iosb;
+ ssize_t aio_rbytes;
+ int aio_errno;
+ /* the remaining elements of aiocb are defined by Posix */
+ int aio_lio_opcode;
+ int aio_reqprio; /* Not yet implemented; must be zero */
+ int aio_fildes;
+ volatile void *aio_buf;
+ size_t aio_nbytes;
+ off_t aio_offset;
+ struct sigevent aio_sigevent;
+};
+
+/* function prototypes as defined by Posix */
+int aio_cancel (int, struct aiocb *);
+int aio_error (const struct aiocb *);
+#ifdef _POSIX_SYNCHRONIZED_IO
+int aio_fsync (int, struct aiocb *);
+#endif
+int aio_read (struct aiocb *);
+ssize_t aio_return (struct aiocb *);
+int aio_suspend (const struct aiocb *const [], int,
+ const struct timespec *);
+int aio_write (struct aiocb *);
+int lio_listio (int, struct aiocb *__restrict const [__restrict], int,
+ struct sigevent *__restrict);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* _AIO_H */
--
2.17.0