[newlib-cygwin] Cygwin: POSIX msg queues: move all mq_* functionality into fhandler_mqueue

Corinna Vinschen corinna@sourceware.org
Tue May 25 18:20:04 GMT 2021


https://sourceware.org/git/gitweb.cgi?p=newlib-cygwin.git;h=46f3b0ce85a9884821af0662bea70dde47b3f0c5

commit 46f3b0ce85a9884821af0662bea70dde47b3f0c5
Author: Corinna Vinschen <corinna@vinschen.de>
Date:   Tue May 25 20:15:16 2021 +0200

    Cygwin: POSIX msg queues: move all mq_* functionality into fhandler_mqueue
    
    The POSIX entry points are just wrappers now, calling into
    fhandler_mqueue.  While at it, eliminate mqi_flags, replace with
    standard fhandler nonblocking flag.
    
    Signed-off-by: Corinna Vinschen <corinna@vinschen.de>

Diff:
---
 winsup/cygwin/fhandler.h         |  12 +
 winsup/cygwin/fhandler_mqueue.cc | 436 +++++++++++++++++++++++++++++++-
 winsup/cygwin/mqueue_types.h     |   1 -
 winsup/cygwin/posix_ipc.cc       | 519 ++++-----------------------------------
 4 files changed, 492 insertions(+), 476 deletions(-)

diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h
index ff51d29a5..abb13b0e2 100644
--- a/winsup/cygwin/fhandler.h
+++ b/winsup/cygwin/fhandler.h
@@ -3115,6 +3115,11 @@ class fhandler_mqueue: public fhandler_disk_file
 
   int _dup (HANDLE parent, fhandler_mqueue *child);
 
+  int mutex_lock (HANDLE mtx, bool eintr);
+  int mutex_unlock (HANDLE mtx);
+  int cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime);
+  void cond_signal (HANDLE evt);
+
 public:
   fhandler_mqueue ();
   fhandler_mqueue (void *) {}
@@ -3126,6 +3131,13 @@ public:
 
   int open (int, mode_t);
   int mq_open (int, mode_t, struct mq_attr *);
+  int mq_getattr (struct mq_attr *);
+  int mq_setattr (const struct mq_attr *, struct mq_attr *);
+  int mq_notify (const struct sigevent *);
+  int mq_timedsend (const char *, size_t, unsigned int,
+		    const struct timespec *);
+  ssize_t mq_timedrecv (char *, size_t, unsigned int *,
+			const struct timespec *);
 
   struct mq_info *mqinfo () { return &mqi; }
 
diff --git a/winsup/cygwin/fhandler_mqueue.cc b/winsup/cygwin/fhandler_mqueue.cc
index c450c0337..28aae314e 100644
--- a/winsup/cygwin/fhandler_mqueue.cc
+++ b/winsup/cygwin/fhandler_mqueue.cc
@@ -11,6 +11,7 @@ details. */
 #include "path.h"
 #include "fhandler.h"
 #include "dtable.h"
+#include "clock.h"
 #include <mqueue.h>
 #include <sys/param.h>
 
@@ -137,7 +138,7 @@ fhandler_mqueue::_mqinfo (SIZE_T filesize, mode_t mode, int flags,
      get pagesize aligned, which breaks the next NtMapViewOfSection in fork. */
   mqinfo ()->mqi_sectsize = filesize;
   mqinfo ()->mqi_mode = mode;
-  mqinfo ()->mqi_flags = flags;
+  set_nonblocking (flags & O_NONBLOCK);
 
   __small_swprintf (buf, L"mqueue/mtx%s", get_name ());
   RtlInitUnicodeString (&uname, buf);
@@ -426,3 +427,436 @@ fhandler_mqueue::close ()
   __endtry
   return 0;
 }
+
+int
+fhandler_mqueue::mutex_lock (HANDLE mtx, bool eintr)
+{
+  switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
+				     | (eintr ? cw_sig_eintr : cw_sig_restart)))
+    {
+    case WAIT_OBJECT_0:
+    case WAIT_ABANDONED_0:
+      return 0;
+    case WAIT_SIGNALED:
+      set_errno (EINTR);
+      return 1;
+    default:
+      break;
+    }
+  return geterrno_from_win_error ();
+}
+
+int
+fhandler_mqueue::mutex_unlock (HANDLE mtx)
+{
+  return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
+}
+
+int
+fhandler_mqueue::cond_timedwait (HANDLE evt, HANDLE mtx,
+				 const struct timespec *abstime)
+{
+  HANDLE w4[4] = { evt, };
+  DWORD cnt = 2;
+  DWORD timer_idx = 0;
+  int ret = 0;
+
+  wait_signal_arrived here (w4[1]);
+  if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
+    ++cnt;
+  if (abstime)
+    {
+      if (!valid_timespec (*abstime))
+	return EINVAL;
+
+      /* If a timeout is set, we create a waitable timer to wait for.
+	 This is the easiest way to handle the absolute timeout value, given
+	 that NtSetTimer also takes absolute times and given the double
+	 dependency on evt *and* mtx, which requires to call WFMO twice. */
+      NTSTATUS status;
+      LARGE_INTEGER duetime;
+
+      timer_idx = cnt++;
+      status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
+			      NotificationTimer);
+      if (!NT_SUCCESS (status))
+	return geterrno_from_nt_status (status);
+      timespec_to_filetime (abstime, &duetime);
+      status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
+      if (!NT_SUCCESS (status))
+	{
+	  NtClose (w4[timer_idx]);
+	  return geterrno_from_nt_status (status);
+	}
+    }
+  ResetEvent (evt);
+  if ((ret = mutex_unlock (mtx)) != 0)
+    return ret;
+  /* Everything's set up, so now wait for the event to be signalled. */
+restart1:
+  switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
+    {
+    case WAIT_OBJECT_0:
+      break;
+    case WAIT_OBJECT_0 + 1:
+      if (_my_tls.call_signal_handler ())
+	goto restart1;
+      ret = EINTR;
+      break;
+    case WAIT_OBJECT_0 + 2:
+      if (timer_idx != 2)
+	pthread::static_cancel_self ();
+      fallthrough;
+    case WAIT_OBJECT_0 + 3:
+      ret = ETIMEDOUT;
+      break;
+    default:
+      ret = geterrno_from_win_error ();
+      break;
+    }
+  if (ret == 0)
+    {
+      /* At this point we need to lock the mutex.  The wait is practically
+	 the same as before, just that we now wait on the mutex instead of the
+	 event. */
+    restart2:
+      w4[0] = mtx;
+      switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
+	{
+	case WAIT_OBJECT_0:
+	case WAIT_ABANDONED_0:
+	  break;
+	case WAIT_OBJECT_0 + 1:
+	  if (_my_tls.call_signal_handler ())
+	    goto restart2;
+	  ret = EINTR;
+	  break;
+	case WAIT_OBJECT_0 + 2:
+	  if (timer_idx != 2)
+	    pthread_testcancel ();
+	  fallthrough;
+	case WAIT_OBJECT_0 + 3:
+	  ret = ETIMEDOUT;
+	  break;
+	default:
+	  ret = geterrno_from_win_error ();
+	  break;
+	}
+    }
+  if (timer_idx)
+    {
+      if (ret != ETIMEDOUT)
+	NtCancelTimer (w4[timer_idx], NULL);
+      NtClose (w4[timer_idx]);
+    }
+  return ret;
+}
+
+void
+fhandler_mqueue::cond_signal (HANDLE evt)
+{
+  SetEvent (evt);
+}
+
+int
+fhandler_mqueue::mq_getattr (struct mq_attr *mqstat)
+{
+  int n;
+  struct mq_hdr *mqhdr;
+  struct mq_fattr *attr;
+
+  __try
+    {
+      mqhdr = mqinfo ()->mqi_hdr;
+      attr = &mqhdr->mqh_attr;
+      if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
+	{
+	  errno = n;
+	  __leave;
+	}
+      mqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0;   /* per-open */
+      mqstat->mq_maxmsg = attr->mq_maxmsg;    /* remaining three per-queue */
+      mqstat->mq_msgsize = attr->mq_msgsize;
+      mqstat->mq_curmsgs = attr->mq_curmsgs;
+
+      mutex_unlock (mqinfo ()->mqi_lock);
+      return 0;
+    }
+  __except (EBADF) {}
+  __endtry
+  return -1;
+}
+
+int
+fhandler_mqueue::mq_setattr (const struct mq_attr *mqstat,
+			     struct mq_attr *omqstat)
+{
+  int n;
+  struct mq_hdr *mqhdr;
+  struct mq_fattr *attr;
+
+  __try
+    {
+      mqhdr = mqinfo ()->mqi_hdr;
+      attr = &mqhdr->mqh_attr;
+      if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
+	{
+	  errno = n;
+	  __leave;
+	}
+
+      if (omqstat != NULL)
+	{
+	  omqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0;
+	  omqstat->mq_maxmsg = attr->mq_maxmsg;
+	  omqstat->mq_msgsize = attr->mq_msgsize;
+	  omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
+	}
+
+      set_nonblocking (mqstat->mq_flags & O_NONBLOCK);
+
+      mutex_unlock (mqinfo ()->mqi_lock);
+      return 0;
+    }
+  __except (EBADF) {}
+  __endtry
+  return -1;
+}
+
+int
+fhandler_mqueue::mq_notify (const struct sigevent *notification)
+{
+  int n;
+  pid_t pid;
+  struct mq_hdr *mqhdr;
+
+  __try
+    {
+      mqhdr = mqinfo ()->mqi_hdr;
+      if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
+	{
+	  errno = n;
+	  __leave;
+	}
+
+      pid = myself->pid;
+      if (!notification)
+	{
+	  if (mqhdr->mqh_pid == pid)
+	      mqhdr->mqh_pid = 0;     /* unregister calling process */
+	}
+      else
+	{
+	  if (mqhdr->mqh_pid != 0)
+	    {
+	      if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
+		{
+		  set_errno (EBUSY);
+		  mutex_unlock (mqinfo ()->mqi_lock);
+		  __leave;
+		}
+	    }
+	  mqhdr->mqh_pid = pid;
+	  mqhdr->mqh_event = *notification;
+	}
+      mutex_unlock (mqinfo ()->mqi_lock);
+      return 0;
+    }
+  __except (EBADF) {}
+  __endtry
+  return -1;
+}
+
+int
+fhandler_mqueue::mq_timedsend (const char *ptr, size_t len, unsigned int prio,
+			       const struct timespec *abstime)
+{
+  int n;
+  long index, freeindex;
+  int8_t *mptr;
+  struct sigevent *sigev;
+  struct mq_hdr *mqhdr;
+  struct mq_fattr *attr;
+  struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
+  bool mutex_locked = false;
+  int ret = -1;
+
+  pthread_testcancel ();
+
+  __try
+    {
+      if (prio >= MQ_PRIO_MAX)
+	{
+	  set_errno (EINVAL);
+	  __leave;
+	}
+
+      mqhdr = mqinfo ()->mqi_hdr;     /* struct pointer */
+      mptr = (int8_t *) mqhdr;        /* byte pointer */
+      attr = &mqhdr->mqh_attr;
+      if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
+	{
+	  errno = n;
+	  __leave;
+	}
+      mutex_locked = true;
+      if (len > (size_t) attr->mq_msgsize)
+	{
+	  set_errno (EMSGSIZE);
+	  __leave;
+	}
+      if (attr->mq_curmsgs == 0)
+	{
+	  if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
+	    {
+	      sigev = &mqhdr->mqh_event;
+	      if (sigev->sigev_notify == SIGEV_SIGNAL)
+		sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
+			  sigev->sigev_value);
+	      mqhdr->mqh_pid = 0;             /* unregister */
+	    }
+	}
+      else if (attr->mq_curmsgs >= attr->mq_maxmsg)
+	{
+	  /* Queue is full */
+	  if (is_nonblocking ())
+	    {
+	      set_errno (EAGAIN);
+	      __leave;
+	    }
+	  /* Wait for room for one message on the queue */
+	  while (attr->mq_curmsgs >= attr->mq_maxmsg)
+	    {
+	      int ret = cond_timedwait (mqinfo ()->mqi_waitsend,
+					mqinfo ()->mqi_lock, abstime);
+	      if (ret != 0)
+		{
+		  set_errno (ret);
+		  __leave;
+		}
+	    }
+	}
+
+      /* nmsghdr will point to new message */
+      if ((freeindex = mqhdr->mqh_free) == 0)
+	api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
+
+      nmsghdr = (struct msg_hdr *) &mptr[freeindex];
+      nmsghdr->msg_prio = prio;
+      nmsghdr->msg_len = len;
+      memcpy (nmsghdr + 1, ptr, len);         /* copy message from caller */
+      mqhdr->mqh_free = nmsghdr->msg_next;    /* new freelist head */
+
+      /* Find right place for message in linked list */
+      index = mqhdr->mqh_head;
+      pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
+      while (index)
+	{
+	  msghdr = (struct msg_hdr *) &mptr[index];
+	  if (prio > msghdr->msg_prio)
+	    {
+	      nmsghdr->msg_next = index;
+	      pmsghdr->msg_next = freeindex;
+	      break;
+	    }
+	  index = msghdr->msg_next;
+	  pmsghdr = msghdr;
+	}
+      if (index == 0)
+	{
+	  /* Queue was empty or new goes at end of list */
+	  pmsghdr->msg_next = freeindex;
+	  nmsghdr->msg_next = 0;
+	}
+      /* Wake up anyone blocked in mq_receive waiting for a message */
+      if (attr->mq_curmsgs == 0)
+	cond_signal (mqinfo ()->mqi_waitrecv);
+      attr->mq_curmsgs++;
+
+      ret = 0;
+    }
+  __except (EBADF) {}
+  __endtry
+  if (mutex_locked)
+    mutex_unlock (mqinfo ()->mqi_lock);
+  return ret;
+}
+
+ssize_t
+fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
+			       const struct timespec *abstime)
+{
+  int n;
+  long index;
+  int8_t *mptr;
+  ssize_t len = -1;
+  struct mq_hdr *mqhdr;
+  struct mq_fattr *attr;
+  struct msg_hdr *msghdr;
+  bool mutex_locked = false;
+
+  pthread_testcancel ();
+
+  __try
+    {
+      mqhdr = mqinfo ()->mqi_hdr;     /* struct pointer */
+      mptr = (int8_t *) mqhdr;        /* byte pointer */
+      attr = &mqhdr->mqh_attr;
+      if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
+	{
+	  errno = n;
+	  __leave;
+	}
+      mutex_locked = true;
+      if (maxlen < (size_t) attr->mq_msgsize)
+	{
+	  set_errno (EMSGSIZE);
+	  __leave;
+	}
+      if (attr->mq_curmsgs == 0)	/* queue is empty */
+	{
+	  if (is_nonblocking ())
+	    {
+	      set_errno (EAGAIN);
+	      __leave;
+	    }
+	  /* Wait for a message to be placed onto queue */
+	  mqhdr->mqh_nwait++;
+	  while (attr->mq_curmsgs == 0)
+	    {
+	      int ret = cond_timedwait (mqinfo ()->mqi_waitrecv,
+					mqinfo ()->mqi_lock, abstime);
+	      if (ret != 0)
+		{
+		  set_errno (ret);
+		  __leave;
+		}
+	    }
+	  mqhdr->mqh_nwait--;
+	}
+
+      if ((index = mqhdr->mqh_head) == 0)
+	api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
+
+      msghdr = (struct msg_hdr *) &mptr[index];
+      mqhdr->mqh_head = msghdr->msg_next;     /* new head of list */
+      len = msghdr->msg_len;
+      memcpy(ptr, msghdr + 1, len);           /* copy the message itself */
+      if (priop != NULL)
+	*priop = msghdr->msg_prio;
+
+      /* Just-read message goes to front of free list */
+      msghdr->msg_next = mqhdr->mqh_free;
+      mqhdr->mqh_free = index;
+
+      /* Wake up anyone blocked in mq_send waiting for room */
+      if (attr->mq_curmsgs == attr->mq_maxmsg)
+	cond_signal (mqinfo ()->mqi_waitsend);
+      attr->mq_curmsgs--;
+    }
+  __except (EBADF) {}
+  __endtry
+  if (mutex_locked)
+    mutex_unlock (mqinfo ()->mqi_lock);
+  return len;
+}
diff --git a/winsup/cygwin/mqueue_types.h b/winsup/cygwin/mqueue_types.h
index 3a4b127ca..4d0d910e4 100644
--- a/winsup/cygwin/mqueue_types.h
+++ b/winsup/cygwin/mqueue_types.h
@@ -58,7 +58,6 @@ struct mq_info
   HANDLE          mqi_waitsend;	 /* and condition variable for full queue */
   HANDLE          mqi_waitrecv;	 /* and condition variable for empty queue */
   uint32_t        mqi_magic;	 /* magic number if open */
-  int             mqi_flags;	 /* flags for this process */
 };
 
 
diff --git a/winsup/cygwin/posix_ipc.cc b/winsup/cygwin/posix_ipc.cc
index 772072d93..1932ac8db 100644
--- a/winsup/cygwin/posix_ipc.cc
+++ b/winsup/cygwin/posix_ipc.cc
@@ -104,135 +104,6 @@ check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
   return true;
 }
 
-static int
-ipc_mutex_lock (HANDLE mtx, bool eintr)
-{
-  switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
-				     | (eintr ? cw_sig_eintr : cw_sig_restart)))
-    {
-    case WAIT_OBJECT_0:
-    case WAIT_ABANDONED_0:
-      return 0;
-    case WAIT_SIGNALED:
-      set_errno (EINTR);
-      return 1;
-    default:
-      break;
-    }
-  return geterrno_from_win_error ();
-}
-
-static inline int
-ipc_mutex_unlock (HANDLE mtx)
-{
-  return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
-}
-
-static int
-ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
-{
-  HANDLE w4[4] = { evt, };
-  DWORD cnt = 2;
-  DWORD timer_idx = 0;
-  int ret = 0;
-
-  wait_signal_arrived here (w4[1]);
-  if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
-    ++cnt;
-  if (abstime)
-    {
-      if (!valid_timespec (*abstime))
-	return EINVAL;
-
-      /* If a timeout is set, we create a waitable timer to wait for.
-	 This is the easiest way to handle the absolute timeout value, given
-	 that NtSetTimer also takes absolute times and given the double
-	 dependency on evt *and* mtx, which requires to call WFMO twice. */
-      NTSTATUS status;
-      LARGE_INTEGER duetime;
-
-      timer_idx = cnt++;
-      status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
-			      NotificationTimer);
-      if (!NT_SUCCESS (status))
-	return geterrno_from_nt_status (status);
-      timespec_to_filetime (abstime, &duetime);
-      status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
-      if (!NT_SUCCESS (status))
-	{
-	  NtClose (w4[timer_idx]);
-	  return geterrno_from_nt_status (status);
-	}
-    }
-  ResetEvent (evt);
-  if ((ret = ipc_mutex_unlock (mtx)) != 0)
-    return ret;
-  /* Everything's set up, so now wait for the event to be signalled. */
-restart1:
-  switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
-    {
-    case WAIT_OBJECT_0:
-      break;
-    case WAIT_OBJECT_0 + 1:
-      if (_my_tls.call_signal_handler ())
-	goto restart1;
-      ret = EINTR;
-      break;
-    case WAIT_OBJECT_0 + 2:
-      if (timer_idx != 2)
-	pthread::static_cancel_self ();
-      fallthrough;
-    case WAIT_OBJECT_0 + 3:
-      ret = ETIMEDOUT;
-      break;
-    default:
-      ret = geterrno_from_win_error ();
-      break;
-    }
-  if (ret == 0)
-    {
-      /* At this point we need to lock the mutex.  The wait is practically
-	 the same as before, just that we now wait on the mutex instead of the
-	 event. */
-    restart2:
-      w4[0] = mtx;
-      switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
-	{
-	case WAIT_OBJECT_0:
-	case WAIT_ABANDONED_0:
-	  break;
-	case WAIT_OBJECT_0 + 1:
-	  if (_my_tls.call_signal_handler ())
-	    goto restart2;
-	  ret = EINTR;
-	  break;
-	case WAIT_OBJECT_0 + 2:
-	  if (timer_idx != 2)
-	    pthread_testcancel ();
-	  fallthrough;
-	case WAIT_OBJECT_0 + 3:
-	  ret = ETIMEDOUT;
-	  break;
-	default:
-	  ret = geterrno_from_win_error ();
-	  break;
-	}
-    }
-  if (timer_idx)
-    {
-      if (ret != ETIMEDOUT)
-	NtCancelTimer (w4[timer_idx], NULL);
-      NtClose (w4[timer_idx]);
-    }
-  return ret;
-}
-
-static inline void
-ipc_cond_signal (HANDLE evt)
-{
-  SetEvent (evt);
-}
-
 class ipc_flock
 {
   struct flock fl;
@@ -348,388 +219,88 @@ mq_open (const char *name, int oflag, ...)
   return (mqd_t) -1;
 }
 
-static struct mq_info *
-get_mqinfo (cygheap_fdget &fd)
-{
-  if (fd >= 0)
-    {
-      fhandler_mqueue *fh = fd->is_mqueue ();
-      if (fh)
-	return fh->mqinfo ();
-      set_errno (EINVAL);
-    }
-  return NULL;
-}
-
 extern "C" int
 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
 {
-  int n;
-  struct mq_hdr *mqhdr;
-  struct mq_fattr *attr;
-  struct mq_info *mqinfo;
-
-  __try
-    {
-      cygheap_fdget fd ((int) mqd, true);
-      mqinfo = get_mqinfo (fd);
-      if (mqinfo->mqi_magic != MQI_MAGIC)
-	{
-	  set_errno (EBADF);
-	  __leave;
-	}
-      mqhdr = mqinfo->mqi_hdr;
-      attr = &mqhdr->mqh_attr;
-      if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
-	{
-	  errno = n;
-	  __leave;
-	}
-      mqstat->mq_flags = mqinfo->mqi_flags;   /* per-open */
-      mqstat->mq_maxmsg = attr->mq_maxmsg;    /* remaining three per-queue */
-      mqstat->mq_msgsize = attr->mq_msgsize;
-      mqstat->mq_curmsgs = attr->mq_curmsgs;
+  int ret = -1;
 
-      ipc_mutex_unlock (mqinfo->mqi_lock);
-      return 0;
-    }
-  __except (EBADF) {}
-  __endtry
-  return -1;
+  cygheap_fdget fd ((int) mqd, true);
+  fhandler_mqueue *fh = fd->is_mqueue ();
+  if (!fh)
+    set_errno (EBADF);
+  else
+    ret = fh->mq_getattr (mqstat);
+  return ret;
 }
 
 extern "C" int
 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
 {
-  int n;
-  struct mq_hdr *mqhdr;
-  struct mq_fattr *attr;
-  struct mq_info *mqinfo;
-
-  __try
-    {
-      cygheap_fdget fd ((int) mqd, true);
-      mqinfo = get_mqinfo (fd);
-      if (mqinfo->mqi_magic != MQI_MAGIC)
-	{
-	  set_errno (EBADF);
-	  __leave;
-	}
-      mqhdr = mqinfo->mqi_hdr;
-      attr = &mqhdr->mqh_attr;
-      if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
-	{
-	  errno = n;
-	  __leave;
-	}
-
-      if (omqstat != NULL)
-	{
-	  omqstat->mq_flags = mqinfo->mqi_flags;  /* previous attributes */
-	  omqstat->mq_maxmsg = attr->mq_maxmsg;
-	  omqstat->mq_msgsize = attr->mq_msgsize;
-	  omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
-	}
-
-      if (mqstat->mq_flags & O_NONBLOCK)
-	mqinfo->mqi_flags |= O_NONBLOCK;
-      else
-	mqinfo->mqi_flags &= ~O_NONBLOCK;
+  int ret = -1;
 
-      ipc_mutex_unlock (mqinfo->mqi_lock);
-      return 0;
-    }
-  __except (EBADF) {}
-  __endtry
-  return -1;
+  cygheap_fdget fd ((int) mqd, true);
+  fhandler_mqueue *fh = fd->is_mqueue ();
+  if (!fh)
+    set_errno (EBADF);
+  else
+    ret = fh->mq_setattr (mqstat, omqstat);
+  return ret;
 }
 
 extern "C" int
 mq_notify (mqd_t mqd, const struct sigevent *notification)
 {
-  int n;
-  pid_t pid;
-  struct mq_hdr *mqhdr;
-  struct mq_info *mqinfo;
-
-  __try
-    {
-      cygheap_fdget fd ((int) mqd, true);
-      mqinfo = get_mqinfo (fd);
-      if (mqinfo->mqi_magic != MQI_MAGIC)
-	{
-	  set_errno (EBADF);
-	  __leave;
-	}
-      mqhdr = mqinfo->mqi_hdr;
-      if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
-	{
-	  errno = n;
-	  __leave;
-	}
+  int ret = -1;
 
-      pid = getpid ();
-      if (!notification)
-	{
-	  if (mqhdr->mqh_pid == pid)
-	      mqhdr->mqh_pid = 0;     /* unregister calling process */
-	}
-      else
-	{
-	  if (mqhdr->mqh_pid != 0)
-	    {
-	      if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
-		{
-		  set_errno (EBUSY);
-		  ipc_mutex_unlock (mqinfo->mqi_lock);
-		  __leave;
-		}
-	    }
-	  mqhdr->mqh_pid = pid;
-	  mqhdr->mqh_event = *notification;
-	}
-      ipc_mutex_unlock (mqinfo->mqi_lock);
-      return 0;
-    }
-  __except (EBADF) {}
-  __endtry
-  return -1;
+  cygheap_fdget fd ((int) mqd, true);
+  fhandler_mqueue *fh = fd->is_mqueue ();
+  if (!fh)
+    set_errno (EBADF);
+  else
+    ret = fh->mq_notify (notification);
+  return ret;
 }
 
-static int
-_mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
-	  const struct timespec *abstime)
+extern "C" int
+mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
+	      const struct timespec *abstime)
 {
-  int n;
-  long index, freeindex;
-  int8_t *mptr;
-  struct sigevent *sigev;
-  struct mq_hdr *mqhdr;
-  struct mq_fattr *attr;
-  struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
-  struct mq_info *mqinfo = NULL;
-  bool ipc_mutex_locked = false;
   int ret = -1;
 
-  pthread_testcancel ();
-
-  __try
-    {
-      cygheap_fdget fd ((int) mqd);
-      mqinfo = get_mqinfo (fd);
-      if (mqinfo->mqi_magic != MQI_MAGIC)
-	{
-	  set_errno (EBADF);
-	  __leave;
-	}
-      if (prio >= MQ_PRIO_MAX)
-	{
-	  set_errno (EINVAL);
-	  __leave;
-	}
-
-      mqhdr = mqinfo->mqi_hdr;        /* struct pointer */
-      mptr = (int8_t *) mqhdr;        /* byte pointer */
-      attr = &mqhdr->mqh_attr;
-      if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
-	{
-	  errno = n;
-	  __leave;
-	}
-      ipc_mutex_locked = true;
-      if (len > (size_t) attr->mq_msgsize)
-	{
-	  set_errno (EMSGSIZE);
-	  __leave;
-	}
-      if (attr->mq_curmsgs == 0)
-	{
-	  if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
-	    {
-	      sigev = &mqhdr->mqh_event;
-	      if (sigev->sigev_notify == SIGEV_SIGNAL)
-		sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
-			  sigev->sigev_value);
-	      mqhdr->mqh_pid = 0;             /* unregister */
-	    }
-	}
-      else if (attr->mq_curmsgs >= attr->mq_maxmsg)
-	{
-	  /* Queue is full */
-	  if (mqinfo->mqi_flags & O_NONBLOCK)
-	    {
-	      set_errno (EAGAIN);
-	      __leave;
-	    }
-	  /* Wait for room for one message on the queue */
-	  while (attr->mq_curmsgs >= attr->mq_maxmsg)
-	    {
-	      int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend,
-					    mqinfo->mqi_lock, abstime);
-	      if (ret != 0)
-		{
-		  set_errno (ret);
-		  __leave;
-		}
-	    }
-	}
-
-      /* nmsghdr will point to new message */
-      if ((freeindex = mqhdr->mqh_free) == 0)
-	api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
-
-      nmsghdr = (struct msg_hdr *) &mptr[freeindex];
-      nmsghdr->msg_prio = prio;
-      nmsghdr->msg_len = len;
-      memcpy (nmsghdr + 1, ptr, len);         /* copy message from caller */
-      mqhdr->mqh_free = nmsghdr->msg_next;    /* new freelist head */
-
-      /* Find right place for message in linked list */
-      index = mqhdr->mqh_head;
-      pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
-      while (index)
-	{
-	  msghdr = (struct msg_hdr *) &mptr[index];
-	  if (prio > msghdr->msg_prio)
-	    {
-	      nmsghdr->msg_next = index;
-	      pmsghdr->msg_next = freeindex;
-	      break;
-	    }
-	  index = msghdr->msg_next;
-	  pmsghdr = msghdr;
-	}
-      if (index == 0)
-	{
-	  /* Queue was empty or new goes at end of list */
-	  pmsghdr->msg_next = freeindex;
-	  nmsghdr->msg_next = 0;
-	}
-      /* Wake up anyone blocked in mq_receive waiting for a message */
-      if (attr->mq_curmsgs == 0)
-	ipc_cond_signal (mqinfo->mqi_waitrecv);
-      attr->mq_curmsgs++;
-
-      ret = 0;
-    }
-  __except (EBADF) {}
-  __endtry
-  if (ipc_mutex_locked)
-    ipc_mutex_unlock (mqinfo->mqi_lock);
+  cygheap_fdget fd ((int) mqd, true);
+  fhandler_mqueue *fh = fd->is_mqueue ();
+  if (!fh)
+    set_errno (EBADF);
+  else
+    ret = fh->mq_timedsend (ptr, len, prio, abstime);
   return ret;
 }
 
 extern "C" int
 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
 {
-  return _mq_send (mqd, ptr, len, prio, NULL);
-}
-
-extern "C" int
-mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
-	      const struct timespec *abstime)
-{
-  return _mq_send (mqd, ptr, len, prio, abstime);
+  return mq_timedsend (mqd, ptr, len, prio, NULL);
 }
 
-static ssize_t
-_mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
+extern "C" ssize_t
+mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
 	     const struct timespec *abstime)
 {
-  int n;
-  long index;
-  int8_t *mptr;
-  ssize_t len = -1;
-  struct mq_hdr *mqhdr;
-  struct mq_fattr *attr;
-  struct msg_hdr *msghdr;
-  struct mq_info *mqinfo;
-  bool ipc_mutex_locked = false;
-
-  pthread_testcancel ();
-
-  __try
-    {
-      cygheap_fdget fd ((int) mqd);
-      mqinfo = get_mqinfo (fd);
-      if (mqinfo->mqi_magic != MQI_MAGIC)
-	{
-	  set_errno (EBADF);
-	  __leave;
-	}
-      mqhdr = mqinfo->mqi_hdr;        /* struct pointer */
-      mptr = (int8_t *) mqhdr;        /* byte pointer */
-      attr = &mqhdr->mqh_attr;
-      if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
-	{
-	  errno = n;
-	  __leave;
-	}
-      ipc_mutex_locked = true;
-      if (maxlen < (size_t) attr->mq_msgsize)
-	{
-	  set_errno (EMSGSIZE);
-	  __leave;
-	}
-      if (attr->mq_curmsgs == 0)	/* queue is empty */
-	{
-	  if (mqinfo->mqi_flags & O_NONBLOCK)
-	    {
-	      set_errno (EAGAIN);
-	      __leave;
-	    }
-	  /* Wait for a message to be placed onto queue */
-	  mqhdr->mqh_nwait++;
-	  while (attr->mq_curmsgs == 0)
-	    {
-	      int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv,
-					    mqinfo->mqi_lock, abstime);
-	      if (ret != 0)
-		{
-		  set_errno (ret);
-		  __leave;
-		}
-	    }
-	  mqhdr->mqh_nwait--;
-	}
-
-      if ((index = mqhdr->mqh_head) == 0)
-	api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
-
-      msghdr = (struct msg_hdr *) &mptr[index];
-      mqhdr->mqh_head = msghdr->msg_next;     /* new head of list */
-      len = msghdr->msg_len;
-      memcpy(ptr, msghdr + 1, len);           /* copy the message itself */
-      if (priop != NULL)
-	*priop = msghdr->msg_prio;
-
-      /* Just-read message goes to front of free list */
-      msghdr->msg_next = mqhdr->mqh_free;
-      mqhdr->mqh_free = index;
+  int ret = -1;
 
-      /* Wake up anyone blocked in mq_send waiting for room */
-      if (attr->mq_curmsgs == attr->mq_maxmsg)
-	ipc_cond_signal (mqinfo->mqi_waitsend);
-      attr->mq_curmsgs--;
-    }
-  __except (EBADF) {}
-  __endtry
-  if (ipc_mutex_locked)
-    ipc_mutex_unlock (mqinfo->mqi_lock);
-  return len;
+  cygheap_fdget fd ((int) mqd, true);
+  fhandler_mqueue *fh = fd->is_mqueue ();
+  if (!fh)
+    set_errno (EBADF);
+  else
+    ret = fh->mq_timedrecv (ptr, maxlen, priop, abstime);
+  return ret;
 }
 
 extern "C" ssize_t
 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
 {
-  return _mq_receive (mqd, ptr, maxlen, priop, NULL);
-}
-
-extern "C" ssize_t
-mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
-		 const struct timespec *abstime)
-{
-  return _mq_receive (mqd, ptr, maxlen, priop, abstime);
+  return mq_timedreceive (mqd, ptr, maxlen, priop, NULL);
 }
 
 extern "C" int


More information about the Cygwin-cvs mailing list