]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
* Makefile.in (DLL_OFILES): Add posix_ipc.o.
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 Copyright 2007 Red Hat, Inc.
4
5 This file is part of Cygwin.
6
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9 details. */
10
11 /* TODO: POSIX semaphores are implemented in thread.cc right now. The
12 implementation in thread.cc disallows implementing kernel
13 persistent semaphores, so in the long run we should move the
14 implementation here, using file based shared memory instead. */
15
16 #include "winsup.h"
17 #include "path.h"
18 #include "cygerrno.h"
19 #include "cygtls.h"
20 #include "security.h"
21 #include "sigproc.h"
22 #include <sys/stat.h>
23 #include <sys/mman.h>
24 #include <sys/param.h>
25 #include <fcntl.h>
26 #include <pwd.h>
27 #include <stdlib.h>
28 #include <limits.h>
29 #include <unistd.h>
30 #include <stdarg.h>
31 #include <mqueue.h>
32
33 struct
34 {
35 const char *prefix;
36 const char *description;
37 } ipc_names[] = {
38 { "/dev/shm", "POSIX shared memory object" },
39 { "/dev/mqueue", "POSIX message queue" },
40 { "/dev/sem", "POSIX semaphore" }
41 };
42
43 enum ipc_type_t
44 {
45 shmem,
46 mqueue,
47 sem
48 };
49
50 static bool
51 check_path (char *res_name, ipc_type_t type, const char *name)
52 {
53 /* Note that we require the existance of the apprpriate /dev subdirectories
54 for POSIX IPC object support, similar to Linux (which supports the
55 directories, but doesn't require to mount them). We don't create
56 these directory here, that's the task of the installer. But we check
57 for existance and give ample warning. */
58 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
59 if (path.error || !path.exists () || !path.isdir ())
60 {
61 small_printf (
62 "Warning: '%s' does not exists or is not a directory.\n\n"
63 "%ss require the existance of this directory.\n"
64 "Create the directory '%s' and set the permissions to 01777.\n"
65 "For instance on the command line: mkdir -m 01777 %s\n",
66 ipc_names[type].prefix, ipc_names[type].description,
67 ipc_names[type].prefix, ipc_names[type].prefix);
68 set_errno (EINVAL);
69 return false;
70 }
71 /* Name must start with a single slash. */
72 if (!name || name[0] != '/' || name[1] == '/')
73 {
74 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
75 set_errno (EINVAL);
76 return false;
77 }
78 if (strlen (name) > CYG_MAX_PATH - sizeof (ipc_names[type].prefix))
79 {
80 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
81 set_errno (ENAMETOOLONG);
82 return false;
83 }
84 strcpy (res_name, ipc_names[type].prefix);
85 strcat (res_name, name);
86 return true;
87 }
88
89 static int
90 ipc_mutex_init (HANDLE *pmtx, const char *name)
91 {
92 char buf[CYG_MAX_PATH];
93 strcpy (buf, "cyg_pmtx");
94 strcat (buf, name);
95 for (char *c = buf; c = strchr (c + 1, '\\'); ++c)
96 *c = '/';
97 *pmtx = CreateMutex (&sec_all, FALSE, buf);
98 if (!*pmtx)
99 debug_printf ("failed: %E\n");
100 return *pmtx ? 0 : geterrno_from_win_error ();
101 }
102
103 static int
104 ipc_mutex_lock (HANDLE mtx)
105 {
106 HANDLE h[2] = { mtx, signal_arrived };
107
108 switch (WaitForMultipleObjects (2, h, FALSE, INFINITE))
109 {
110 case WAIT_OBJECT_0:
111 case WAIT_ABANDONED_0:
112 return 0;
113 case WAIT_OBJECT_0 + 1:
114 set_errno (EINTR);
115 return 1;
116 default:
117 break;
118 }
119 return geterrno_from_win_error ();
120 }
121
122 static inline int
123 ipc_mutex_unlock (HANDLE mtx)
124 {
125 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
126 }
127
128 static inline int
129 ipc_mutex_close (HANDLE mtx)
130 {
131 return CloseHandle (mtx) ? 0 : geterrno_from_win_error ();
132 }
133
134 static int
135 ipc_cond_init (HANDLE *pevt, const char *name)
136 {
137 char buf[CYG_MAX_PATH];
138 strcpy (buf, "cyg_pevt");
139 strcat (buf, name);
140 for (char *c = buf; c = strchr (c + 1, '\\'); ++c)
141 *c = '/';
142 *pevt = CreateEvent (&sec_all, TRUE, FALSE, buf);
143 if (!*pevt)
144 debug_printf ("failed: %E\n");
145 return *pevt ? 0 : geterrno_from_win_error ();
146 }
147
148 static int
149 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
150 {
151 struct timeval tv;
152 DWORD timeout;
153 HANDLE h[2] = { mtx, evt };
154
155 if (!abstime)
156 timeout = INFINITE;
157 else if (abstime->tv_sec < 0
158 || abstime->tv_nsec < 0
159 || abstime->tv_nsec > 999999999)
160 return EINVAL;
161 else
162 {
163 gettimeofday (&tv, NULL);
164 /* Check for immediate timeout. */
165 if (tv.tv_sec > abstime->tv_sec
166 || (tv.tv_sec == abstime->tv_sec
167 && tv.tv_usec > abstime->tv_nsec / 1000))
168 return ETIMEDOUT;
169 timeout = (abstime->tv_sec - tv.tv_sec) * 1000;
170 timeout += (abstime->tv_nsec / 1000 - tv.tv_usec) / 1000;
171 }
172 if (ipc_mutex_unlock (mtx))
173 return -1;
174 switch (WaitForMultipleObjects (2, h, TRUE, timeout))
175 {
176 case WAIT_OBJECT_0:
177 case WAIT_ABANDONED_0:
178 ResetEvent (evt);
179 return 0;
180 case WAIT_TIMEOUT:
181 ipc_mutex_lock (mtx);
182 return ETIMEDOUT;
183 default:
184 break;
185 }
186 return geterrno_from_win_error ();
187 }
188
189 static inline int
190 ipc_cond_signal (HANDLE evt)
191 {
192 return SetEvent (evt) ? 0 : geterrno_from_win_error ();
193 }
194
195 static inline int
196 ipc_cond_close (HANDLE evt)
197 {
198 return CloseHandle (evt) ? 0 : geterrno_from_win_error ();
199 }
200
201 /* POSIX shared memory object implementation. */
202
203 extern "C" int
204 shm_open (const char *name, int oflag, mode_t mode)
205 {
206 char shmname[CYG_MAX_PATH];
207
208 if (!check_path (shmname, shmem, name))
209 return -1;
210
211 /* Check for valid flags. */
212 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
213 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
214 {
215 debug_printf ("Invalid oflag 0%o", oflag);
216 set_errno (EINVAL);
217 return -1;
218 }
219
220 return open (shmname, oflag, mode & 0777);
221 }
222
223 extern "C" int
224 shm_unlink (const char *name)
225 {
226 char shmname[CYG_MAX_PATH];
227
228 if (!check_path (shmname, shmem, name))
229 return -1;
230
231 return unlink (shmname);
232 }
233
234 /* The POSIX message queue implementation is based on W. Richard STEVENS
235 implementation, just tweaked for Cygwin. The main change is
236 the usage of Windows mutexes and events instead of using the pthread
237 synchronization objects. The pathname is massaged so that the
238 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
239 are implemented additionally. */
240
241 struct mq_hdr
242 {
243 struct mq_attr mqh_attr; /* the queue's attributes */
244 long mqh_head; /* index of first message */
245 long mqh_free; /* index of first free message */
246 long mqh_nwait; /* #threads blocked in mq_receive() */
247 pid_t mqh_pid; /* nonzero PID if mqh_event set */
248 struct sigevent mqh_event; /* for mq_notify() */
249 };
250
251 struct msg_hdr
252 {
253 long msg_next; /* index of next on linked list */
254 ssize_t msg_len; /* actual length */
255 unsigned int msg_prio; /* priority */
256 };
257
258 struct mq_info
259 {
260 struct mq_hdr *mqi_hdr; /* start of mmap'ed region */
261 unsigned long mqi_magic; /* magic number if open */
262 int mqi_flags; /* flags for this process */
263 HANDLE mqi_lock; /* mutex lock */
264 HANDLE mqi_wait; /* and condition variable */
265 };
266
267 #define MQI_MAGIC 0x98765432UL
268
269 #define MSGSIZE(i) roundup((i), sizeof(long))
270
271 #define MAX_TRIES 10 /* for waiting for initialization */
272
273 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
274
275 extern "C" _off64_t lseek64 (int, _off64_t, int);
276 extern "C" void *mmap64 (void *, size_t, int, int, int, _off64_t);
277
278 extern "C" mqd_t
279 mq_open (const char *name, int oflag, ...)
280 {
281 int i, fd, nonblock, created;
282 long msgsize, index;
283 _off64_t filesize;
284 va_list ap;
285 mode_t mode;
286 int8_t *mptr;
287 struct __stat64 statbuff;
288 struct mq_hdr *mqhdr;
289 struct msg_hdr *msghdr;
290 struct mq_attr *attr;
291 struct mq_info *mqinfo;
292 char mqname[CYG_MAX_PATH];
293
294 if (!check_path (mqname, mqueue, name))
295 return (mqd_t) -1;
296
297 myfault efault;
298 if (efault.faulted (EFAULT))
299 return (mqd_t) -1;
300
301 created = 0;
302 nonblock = oflag & O_NONBLOCK;
303 oflag &= ~O_NONBLOCK;
304 mptr = (int8_t *) MAP_FAILED;
305 mqinfo = NULL;
306
307 again:
308 if (oflag & O_CREAT)
309 {
310 va_start (ap, oflag); /* init ap to final named argument */
311 mode = va_arg (ap, mode_t) & ~S_IXUSR;
312 attr = va_arg (ap, struct mq_attr *);
313 va_end (ap);
314
315 /* Open and specify O_EXCL and user-execute */
316 fd = open (mqname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
317 if (fd < 0)
318 {
319 if (errno == EEXIST && (oflag & O_EXCL) == 0)
320 goto exists; /* already exists, OK */
321 return (mqd_t) -1;
322 }
323 created = 1;
324 /* First one to create the file initializes it */
325 if (attr == NULL)
326 attr = &defattr;
327 else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
328 {
329 set_errno (EINVAL);
330 goto err;
331 }
332 /* Calculate and set the file size */
333 msgsize = MSGSIZE (attr->mq_msgsize);
334 filesize = sizeof (struct mq_hdr)
335 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
336 if (lseek64 (fd, filesize - 1, SEEK_SET) == -1)
337 goto err;
338 if (write (fd, "", 1) == -1)
339 goto err;
340
341 /* Memory map the file */
342 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
343 MAP_SHARED, fd, 0);
344 if (mptr == (int8_t *) MAP_FAILED)
345 goto err;
346
347 /* Allocate one mq_info{} for the queue */
348 if (!(mqinfo = (struct mq_info *) malloc (sizeof (struct mq_info))))
349 goto err;
350 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
351 mqinfo->mqi_magic = MQI_MAGIC;
352 mqinfo->mqi_flags = nonblock;
353
354 /* Initialize header at beginning of file */
355 /* Create free list with all messages on it */
356 mqhdr->mqh_attr.mq_flags = 0;
357 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
358 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
359 mqhdr->mqh_attr.mq_curmsgs = 0;
360 mqhdr->mqh_nwait = 0;
361 mqhdr->mqh_pid = 0;
362 mqhdr->mqh_head = 0;
363 index = sizeof (struct mq_hdr);
364 mqhdr->mqh_free = index;
365 for (i = 0; i < attr->mq_maxmsg - 1; i++)
366 {
367 msghdr = (struct msg_hdr *) &mptr[index];
368 index += sizeof (struct msg_hdr) + msgsize;
369 msghdr->msg_next = index;
370 }
371 msghdr = (struct msg_hdr *) &mptr[index];
372 msghdr->msg_next = 0; /* end of free list */
373
374 /* Initialize mutex & condition variable */
375 i = ipc_mutex_init (&mqinfo->mqi_lock, mqname);
376 if (i != 0)
377 goto pthreaderr;
378
379 i = ipc_cond_init (&mqinfo->mqi_wait, mqname);
380 if (i != 0)
381 goto pthreaderr;
382
383 /* Initialization complete, turn off user-execute bit */
384 if (fchmod (fd, mode) == -1)
385 goto err;
386 close (fd);
387 return ((mqd_t) mqinfo);
388 }
389
390 exists:
391 /* Open the file then memory map */
392 if ((fd = open (mqname, O_RDWR)) < 0)
393 {
394 if (errno == ENOENT && (oflag & O_CREAT))
395 goto again;
396 goto err;
397 }
398 /* Make certain initialization is complete */
399 for (i = 0; i < MAX_TRIES; i++)
400 {
401 if (stat64 (mqname, &statbuff) == -1)
402 {
403 if (errno == ENOENT && (oflag & O_CREAT))
404 {
405 close(fd);
406 goto again;
407 }
408 goto err;
409 }
410 if ((statbuff.st_mode & S_IXUSR) == 0)
411 break;
412 sleep (1);
413 }
414 if (i == MAX_TRIES)
415 {
416 set_errno (ETIMEDOUT);
417 goto err;
418 }
419
420 filesize = statbuff.st_size;
421 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
422 MAP_SHARED, fd, 0);
423 if (mptr == (int8_t *) MAP_FAILED)
424 goto err;
425 close (fd);
426
427 /* Allocate one mq_info{} for each open */
428 if (!(mqinfo = (struct mq_info *) malloc (sizeof (struct mq_info))))
429 goto err;
430 mqinfo->mqi_hdr = (struct mq_hdr *) mptr;
431 mqinfo->mqi_magic = MQI_MAGIC;
432 mqinfo->mqi_flags = nonblock;
433
434 /* Initialize mutex & condition variable */
435 i = ipc_mutex_init (&mqinfo->mqi_lock, mqname);
436 if (i != 0)
437 goto pthreaderr;
438
439 i = ipc_cond_init (&mqinfo->mqi_wait, mqname);
440 if (i != 0)
441 goto pthreaderr;
442
443 return (mqd_t) mqinfo;
444
445 pthreaderr:
446 errno = i;
447 err:
448 /* Don't let following function calls change errno */
449 save_errno save;
450
451 if (created)
452 unlink (mqname);
453 if (mptr != (int8_t *) MAP_FAILED)
454 munmap((void *) mptr, (size_t) filesize);
455 if (mqinfo)
456 free (mqinfo);
457 close (fd);
458 return (mqd_t) -1;
459 }
460
461 extern "C" int
462 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
463 {
464 int n;
465 struct mq_hdr *mqhdr;
466 struct mq_attr *attr;
467 struct mq_info *mqinfo;
468
469 myfault efault;
470 if (efault.faulted (EBADF))
471 return -1;
472
473 mqinfo = (struct mq_info *) mqd;
474 if (mqinfo->mqi_magic != MQI_MAGIC)
475 {
476 set_errno (EBADF);
477 return -1;
478 }
479 mqhdr = mqinfo->mqi_hdr;
480 attr = &mqhdr->mqh_attr;
481 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
482 {
483 errno = n;
484 return -1;
485 }
486 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
487 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
488 mqstat->mq_msgsize = attr->mq_msgsize;
489 mqstat->mq_curmsgs = attr->mq_curmsgs;
490
491 ipc_mutex_unlock (mqinfo->mqi_lock);
492 return 0;
493 }
494
495 extern "C" int
496 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
497 {
498 int n;
499 struct mq_hdr *mqhdr;
500 struct mq_attr *attr;
501 struct mq_info *mqinfo;
502
503 myfault efault;
504 if (efault.faulted (EBADF))
505 return -1;
506
507 mqinfo = (struct mq_info *) mqd;
508 if (mqinfo->mqi_magic != MQI_MAGIC)
509 {
510 set_errno (EBADF);
511 return -1;
512 }
513 mqhdr = mqinfo->mqi_hdr;
514 attr = &mqhdr->mqh_attr;
515 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
516 {
517 errno = n;
518 return -1;
519 }
520
521 if (omqstat != NULL)
522 {
523 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
524 omqstat->mq_maxmsg = attr->mq_maxmsg;
525 omqstat->mq_msgsize = attr->mq_msgsize;
526 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
527 }
528
529 if (mqstat->mq_flags & O_NONBLOCK)
530 mqinfo->mqi_flags |= O_NONBLOCK;
531 else
532 mqinfo->mqi_flags &= ~O_NONBLOCK;
533
534 ipc_mutex_unlock (mqinfo->mqi_lock);
535 return 0;
536 }
537
538 extern "C" int
539 mq_notify (mqd_t mqd, const struct sigevent *notification)
540 {
541 int n;
542 pid_t pid;
543 struct mq_hdr *mqhdr;
544 struct mq_info *mqinfo;
545
546 myfault efault;
547 if (efault.faulted (EBADF))
548 return -1;
549
550 mqinfo = (struct mq_info *) mqd;
551 if (mqinfo->mqi_magic != MQI_MAGIC)
552 {
553 set_errno (EBADF);
554 return -1;
555 }
556 mqhdr = mqinfo->mqi_hdr;
557 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
558 {
559 errno = n;
560 return -1;
561 }
562
563 pid = getpid ();
564 if (!notification)
565 {
566 if (mqhdr->mqh_pid == pid)
567 mqhdr->mqh_pid = 0; /* unregister calling process */
568 }
569 else
570 {
571 if (mqhdr->mqh_pid != 0)
572 {
573 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
574 {
575 set_errno (EBUSY);
576 ipc_mutex_unlock (mqinfo->mqi_lock);
577 return -1;
578 }
579 }
580 mqhdr->mqh_pid = pid;
581 mqhdr->mqh_event = *notification;
582 }
583 ipc_mutex_unlock (mqinfo->mqi_lock);
584 return 0;
585 }
586
587 static int
588 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
589 const struct timespec *abstime)
590 {
591 int n;
592 long index, freeindex;
593 int8_t *mptr;
594 struct sigevent *sigev;
595 struct mq_hdr *mqhdr;
596 struct mq_attr *attr;
597 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
598 struct mq_info *mqinfo;
599
600 myfault efault;
601 if (efault.faulted (EBADF))
602 return -1;
603
604 mqinfo = (struct mq_info *) mqd;
605 if (mqinfo->mqi_magic != MQI_MAGIC)
606 {
607 set_errno (EBADF);
608 return -1;
609 }
610 if (prio > MQ_PRIO_MAX)
611 {
612 set_errno (EINVAL);
613 return -1;
614 }
615
616 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
617 mptr = (int8_t *) mqhdr; /* byte pointer */
618 attr = &mqhdr->mqh_attr;
619 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
620 {
621 errno = n;
622 return -1;
623 }
624
625 if (len > (size_t) attr->mq_msgsize)
626 {
627 set_errno (EMSGSIZE);
628 goto err;
629 }
630 if (attr->mq_curmsgs == 0)
631 {
632 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
633 {
634 sigev = &mqhdr->mqh_event;
635 if (sigev->sigev_notify == SIGEV_SIGNAL)
636 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, sigev->sigev_value);
637 mqhdr->mqh_pid = 0; /* unregister */
638 }
639 }
640 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
641 {
642 /* Queue is full */
643 if (mqinfo->mqi_flags & O_NONBLOCK)
644 {
645 set_errno (EAGAIN);
646 goto err;
647 }
648 /* Wait for room for one message on the queue */
649 while (attr->mq_curmsgs >= attr->mq_maxmsg)
650 ipc_cond_timedwait (mqinfo->mqi_wait, mqinfo->mqi_lock, abstime);
651 }
652
653 /* nmsghdr will point to new message */
654 if ((freeindex = mqhdr->mqh_free) == 0)
655 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
656
657 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
658 nmsghdr->msg_prio = prio;
659 nmsghdr->msg_len = len;
660 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
661 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
662
663 /* Find right place for message in linked list */
664 index = mqhdr->mqh_head;
665 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
666 while (index)
667 {
668 msghdr = (struct msg_hdr *) &mptr[index];
669 if (prio > msghdr->msg_prio)
670 {
671 nmsghdr->msg_next = index;
672 pmsghdr->msg_next = freeindex;
673 break;
674 }
675 index = msghdr->msg_next;
676 pmsghdr = msghdr;
677 }
678 if (index == 0)
679 {
680 /* Queue was empty or new goes at end of list */
681 pmsghdr->msg_next = freeindex;
682 nmsghdr->msg_next = 0;
683 }
684 /* Wake up anyone blocked in mq_receive waiting for a message */
685 if (attr->mq_curmsgs == 0)
686 ipc_cond_signal (mqinfo->mqi_wait);
687 attr->mq_curmsgs++;
688
689 ipc_mutex_unlock (mqinfo->mqi_lock);
690 return 0;
691
692 err:
693 ipc_mutex_unlock (mqinfo->mqi_lock);
694 return -1;
695 }
696
697 extern "C" int
698 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
699 {
700 return _mq_send (mqd, ptr, len, prio, NULL);
701 }
702
703 extern "C" int
704 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
705 const struct timespec *abstime)
706 {
707 return _mq_send (mqd, ptr, len, prio, abstime);
708 }
709
710 static ssize_t
711 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
712 const struct timespec *abstime)
713 {
714 int n;
715 long index;
716 int8_t *mptr;
717 ssize_t len;
718 struct mq_hdr *mqhdr;
719 struct mq_attr *attr;
720 struct msg_hdr *msghdr;
721 struct mq_info *mqinfo;
722
723 myfault efault;
724 if (efault.faulted (EBADF))
725 return -1;
726
727 mqinfo = (struct mq_info *) mqd;
728 if (mqinfo->mqi_magic != MQI_MAGIC)
729 {
730 set_errno (EBADF);
731 return -1;
732 }
733 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
734 mptr = (int8_t *) mqhdr; /* byte pointer */
735 attr = &mqhdr->mqh_attr;
736 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
737 {
738 errno = n;
739 return -1;
740 }
741
742 if (maxlen < (size_t) attr->mq_msgsize)
743 {
744 set_errno (EMSGSIZE);
745 goto err;
746 }
747 if (attr->mq_curmsgs == 0) /* queue is empty */
748 {
749 if (mqinfo->mqi_flags & O_NONBLOCK)
750 {
751 set_errno (EAGAIN);
752 goto err;
753 }
754 /* Wait for a message to be placed onto queue */
755 mqhdr->mqh_nwait++;
756 while (attr->mq_curmsgs == 0)
757 ipc_cond_timedwait (mqinfo->mqi_wait, mqinfo->mqi_lock, abstime);
758 mqhdr->mqh_nwait--;
759 }
760
761 if ((index = mqhdr->mqh_head) == 0)
762 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
763
764 msghdr = (struct msg_hdr *) &mptr[index];
765 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
766 len = msghdr->msg_len;
767 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
768 if (priop != NULL)
769 *priop = msghdr->msg_prio;
770
771 /* Just-read message goes to front of free list */
772 msghdr->msg_next = mqhdr->mqh_free;
773 mqhdr->mqh_free = index;
774
775 /* Wake up anyone blocked in mq_send waiting for room */
776 if (attr->mq_curmsgs == attr->mq_maxmsg)
777 ipc_cond_signal (mqinfo->mqi_wait);
778 attr->mq_curmsgs--;
779
780 ipc_mutex_unlock (mqinfo->mqi_lock);
781 return len;
782
783 err:
784 ipc_mutex_unlock (mqinfo->mqi_lock);
785 return -1;
786 }
787
788 extern "C" ssize_t
789 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
790 {
791 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
792 }
793
794 extern "C" ssize_t
795 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
796 const struct timespec *abstime)
797 {
798 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
799 }
800
801 extern "C" int
802 mq_close (mqd_t mqd)
803 {
804 long msgsize, filesize;
805 struct mq_hdr *mqhdr;
806 struct mq_attr *attr;
807 struct mq_info *mqinfo;
808
809 myfault efault;
810 if (efault.faulted (EBADF))
811 return -1;
812
813 mqinfo = (struct mq_info *) mqd;
814 if (mqinfo->mqi_magic != MQI_MAGIC)
815 {
816 set_errno (EBADF);
817 return -1;
818 }
819 mqhdr = mqinfo->mqi_hdr;
820 attr = &mqhdr->mqh_attr;
821
822 if (mq_notify (mqd, NULL)) /* unregister calling process */
823 return -1;
824
825 msgsize = MSGSIZE (attr->mq_msgsize);
826 filesize = sizeof (struct mq_hdr)
827 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
828 if (munmap (mqinfo->mqi_hdr, filesize) == -1)
829 return -1;
830
831 mqinfo->mqi_magic = 0; /* just in case */
832 ipc_cond_close (mqinfo->mqi_wait);
833 ipc_mutex_close (mqinfo->mqi_lock);
834 free (mqinfo);
835 return 0;
836 }
837
838 extern "C" int
839 mq_unlink (const char *name)
840 {
841 char mqname[CYG_MAX_PATH];
842
843 if (!check_path (mqname, mqueue, name))
844 return -1;
845 if (unlink (mqname) == -1)
846 return -1;
847 return 0;
848 }
849
This page took 0.076605 seconds and 6 git commands to generate.