]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
* posix_ipc.cc (ipc_mutex_init): Create global object name.
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 Copyright 2007 Red Hat, Inc.
4
5 This file is part of Cygwin.
6
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9 details. */
10
11 /* TODO: POSIX semaphores are implemented in thread.cc right now. The
12 implementation in thread.cc disallows implementing kernel
13 persistent semaphores, so in the long run we should move the
14 implementation here, using file based shared memory instead. */
15
16 #include "winsup.h"
17 #include "path.h"
18 #include "cygerrno.h"
19 #include "cygtls.h"
20 #include "security.h"
21 #include "sigproc.h"
22 #include <sys/stat.h>
23 #include <sys/mman.h>
24 #include <sys/param.h>
25 #include <fcntl.h>
26 #include <pwd.h>
27 #include <stdlib.h>
28 #include <limits.h>
29 #include <unistd.h>
30 #include <stdarg.h>
31 #include <mqueue.h>
32
33 struct
34 {
35 const char *prefix;
36 const char *description;
37 } ipc_names[] = {
38 { "/dev/shm", "POSIX shared memory object" },
39 { "/dev/mqueue", "POSIX message queue" },
40 { "/dev/sem", "POSIX semaphore" }
41 };
42
43 enum ipc_type_t
44 {
45 shmem,
46 mqueue,
47 sem
48 };
49
50 static bool
51 check_path (char *res_name, ipc_type_t type, const char *name)
52 {
53 /* Note that we require the existance of the apprpriate /dev subdirectories
54 for POSIX IPC object support, similar to Linux (which supports the
55 directories, but doesn't require to mount them). We don't create
56 these directory here, that's the task of the installer. But we check
57 for existance and give ample warning. */
58 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
59 if (path.error || !path.exists () || !path.isdir ())
60 {
61 small_printf (
62 "Warning: '%s' does not exists or is not a directory.\n\n"
63 "%ss require the existance of this directory.\n"
64 "Create the directory '%s' and set the permissions to 01777.\n"
65 "For instance on the command line: mkdir -m 01777 %s\n",
66 ipc_names[type].prefix, ipc_names[type].description,
67 ipc_names[type].prefix, ipc_names[type].prefix);
68 set_errno (EINVAL);
69 return false;
70 }
71 /* Name must start with a single slash. */
72 if (!name || name[0] != '/' || name[1] == '/')
73 {
74 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
75 set_errno (EINVAL);
76 return false;
77 }
78 if (strlen (name) > CYG_MAX_PATH - sizeof (ipc_names[type].prefix))
79 {
80 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
81 set_errno (ENAMETOOLONG);
82 return false;
83 }
84 strcpy (res_name, ipc_names[type].prefix);
85 strcat (res_name, name);
86 return true;
87 }
88
89 static int
90 ipc_mutex_init (HANDLE *pmtx, const char *name)
91 {
92 char buf[CYG_MAX_PATH];
93 __small_sprintf (buf, "%scyg_pmtx/%s",
94 wincap.has_terminal_services () ? "Global\\" : "", name);
95 *pmtx = CreateMutex (&sec_all, FALSE, buf);
96 if (!*pmtx)
97 debug_printf ("failed: %E\n");
98 return *pmtx ? 0 : geterrno_from_win_error ();
99 }
100
101 static int
102 ipc_mutex_lock (HANDLE mtx)
103 {
104 HANDLE h[2] = { mtx, signal_arrived };
105
106 switch (WaitForMultipleObjects (2, h, FALSE, INFINITE))
107 {
108 case WAIT_OBJECT_0:
109 case WAIT_ABANDONED_0:
110 return 0;
111 case WAIT_OBJECT_0 + 1:
112 set_errno (EINTR);
113 return 1;
114 default:
115 break;
116 }
117 return geterrno_from_win_error ();
118 }
119
120 static inline int
121 ipc_mutex_unlock (HANDLE mtx)
122 {
123 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
124 }
125
126 static inline int
127 ipc_mutex_close (HANDLE mtx)
128 {
129 return CloseHandle (mtx) ? 0 : geterrno_from_win_error ();
130 }
131
132 static int
133 ipc_cond_init (HANDLE *pevt, const char *name)
134 {
135 char buf[CYG_MAX_PATH];
136 strcpy (buf, wincap.has_terminal_services () ? "Global\\" : "");
137 __small_sprintf (buf, "%scyg_pevt/%s",
138 wincap.has_terminal_services () ? "Global\\" : "", name);
139 *pevt = CreateEvent (&sec_all, TRUE, FALSE, buf);
140 if (!*pevt)
141 debug_printf ("failed: %E\n");
142 return *pevt ? 0 : geterrno_from_win_error ();
143 }
144
145 static int
146 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
147 {
148 struct timeval tv;
149 DWORD timeout;
150 HANDLE h[2] = { mtx, evt };
151
152 if (!abstime)
153 timeout = INFINITE;
154 else if (abstime->tv_sec < 0
155 || abstime->tv_nsec < 0
156 || abstime->tv_nsec > 999999999)
157 return EINVAL;
158 else
159 {
160 gettimeofday (&tv, NULL);
161 /* Check for immediate timeout. */
162 if (tv.tv_sec > abstime->tv_sec
163 || (tv.tv_sec == abstime->tv_sec
164 && tv.tv_usec > abstime->tv_nsec / 1000))
165 return ETIMEDOUT;
166 timeout = (abstime->tv_sec - tv.tv_sec) * 1000;
167 timeout += (abstime->tv_nsec / 1000 - tv.tv_usec) / 1000;
168 }
169 if (ipc_mutex_unlock (mtx))
170 return -1;
171 switch (WaitForMultipleObjects (2, h, TRUE, timeout))
172 {
173 case WAIT_OBJECT_0:
174 case WAIT_ABANDONED_0:
175 ResetEvent (evt);
176 return 0;
177 case WAIT_TIMEOUT:
178 ipc_mutex_lock (mtx);
179 return ETIMEDOUT;
180 default:
181 break;
182 }
183 return geterrno_from_win_error ();
184 }
185
186 static inline int
187 ipc_cond_signal (HANDLE evt)
188 {
189 return SetEvent (evt) ? 0 : geterrno_from_win_error ();
190 }
191
192 static inline int
193 ipc_cond_close (HANDLE evt)
194 {
195 return CloseHandle (evt) ? 0 : geterrno_from_win_error ();
196 }
197
198 /* POSIX shared memory object implementation. */
199
200 extern "C" int
201 shm_open (const char *name, int oflag, mode_t mode)
202 {
203 char shmname[CYG_MAX_PATH];
204
205 if (!check_path (shmname, shmem, name))
206 return -1;
207
208 /* Check for valid flags. */
209 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
210 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
211 {
212 debug_printf ("Invalid oflag 0%o", oflag);
213 set_errno (EINVAL);
214 return -1;
215 }
216
217 return open (shmname, oflag, mode & 0777);
218 }
219
220 extern "C" int
221 shm_unlink (const char *name)
222 {
223 char shmname[CYG_MAX_PATH];
224
225 if (!check_path (shmname, shmem, name))
226 return -1;
227
228 return unlink (shmname);
229 }
230
231 /* The POSIX message queue implementation is based on W. Richard STEVENS
232 implementation, just tweaked for Cygwin. The main change is
233 the usage of Windows mutexes and events instead of using the pthread
234 synchronization objects. The pathname is massaged so that the
235 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
236 are implemented additionally. */
237
238 struct mq_hdr
239 {
240 struct mq_attr mqh_attr; /* the queue's attributes */
241 long mqh_head; /* index of first message */
242 long mqh_free; /* index of first free message */
243 long mqh_nwait; /* #threads blocked in mq_receive() */
244 pid_t mqh_pid; /* nonzero PID if mqh_event set */
245 char mqh_uname[20]; /* unique name used to identify synchronization
246 objects connected to this queue */
247 struct sigevent mqh_event; /* for mq_notify() */
248 };
249
250 struct msg_hdr
251 {
252 long msg_next; /* index of next on linked list */
253 ssize_t msg_len; /* actual length */
254 unsigned int msg_prio; /* priority */
255 };
256
257 struct mq_info
258 {
259 struct mq_hdr *mqi_hdr; /* start of mmap'ed region */
260 unsigned long mqi_magic; /* magic number if open */
261 int mqi_flags; /* flags for this process */
262 HANDLE mqi_lock; /* mutex lock */
263 HANDLE mqi_wait; /* and condition variable */
264 };
265
266 #define MQI_MAGIC 0x98765432UL
267
268 #define MSGSIZE(i) roundup((i), sizeof(long))
269
270 #define MAX_TRIES 10 /* for waiting for initialization */
271
272 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
273
274 extern "C" _off64_t lseek64 (int, _off64_t, int);
275 extern "C" void *mmap64 (void *, size_t, int, int, int, _off64_t);
276
277 extern "C" mqd_t
278 mq_open (const char *name, int oflag, ...)
279 {
280 int i, fd, nonblock, created;
281 long msgsize, index;
282 _off64_t filesize;
283 va_list ap;
284 mode_t mode;
285 int8_t *mptr;
286 struct __stat64 statbuff;
287 struct mq_hdr *mqhdr;
288 struct msg_hdr *msghdr;
289 struct mq_attr *attr;
290 struct mq_info *mqinfo;
291 char mqname[CYG_MAX_PATH];
292
293 if (!check_path (mqname, mqueue, name))
294 return (mqd_t) -1;
295
296 myfault efault;
297 if (efault.faulted (EFAULT))
298 return (mqd_t) -1;
299
300 created = 0;
301 nonblock = oflag & O_NONBLOCK;
302 oflag &= ~O_NONBLOCK;
303 mptr = (int8_t *) MAP_FAILED;
304 mqinfo = NULL;
305
306 again:
307 if (oflag & O_CREAT)
308 {
309 va_start (ap, oflag); /* init ap to final named argument */
310 mode = va_arg (ap, mode_t) & ~S_IXUSR;
311 attr = va_arg (ap, struct mq_attr *);
312 va_end (ap);
313
314 /* Open and specify O_EXCL and user-execute */
315 fd = open (mqname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
316 if (fd < 0)
317 {
318 if (errno == EEXIST && (oflag & O_EXCL) == 0)
319 goto exists; /* already exists, OK */
320 return (mqd_t) -1;
321 }
322 created = 1;
323 /* First one to create the file initializes it */
324 if (attr == NULL)
325 attr = &defattr;
326 else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
327 {
328 set_errno (EINVAL);
329 goto err;
330 }
331 /* Calculate and set the file size */
332 msgsize = MSGSIZE (attr->mq_msgsize);
333 filesize = sizeof (struct mq_hdr)
334 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
335 if (lseek64 (fd, filesize - 1, SEEK_SET) == -1)
336 goto err;
337 if (write (fd, "", 1) == -1)
338 goto err;
339
340 /* Memory map the file */
341 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
342 MAP_SHARED, fd, 0);
343 if (mptr == (int8_t *) MAP_FAILED)
344 goto err;
345
346 /* Allocate one mq_info{} for the queue */
347 if (!(mqinfo = (struct mq_info *) malloc (sizeof (struct mq_info))))
348 goto err;
349 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
350 mqinfo->mqi_magic = MQI_MAGIC;
351 mqinfo->mqi_flags = nonblock;
352
353 /* Initialize header at beginning of file */
354 /* Create free list with all messages on it */
355 mqhdr->mqh_attr.mq_flags = 0;
356 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
357 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
358 mqhdr->mqh_attr.mq_curmsgs = 0;
359 mqhdr->mqh_nwait = 0;
360 mqhdr->mqh_pid = 0;
361 __small_sprintf (mqhdr->mqh_uname, "cyg%016X", hash_path_name (0,mqname));
362 mqhdr->mqh_head = 0;
363 index = sizeof (struct mq_hdr);
364 mqhdr->mqh_free = index;
365 for (i = 0; i < attr->mq_maxmsg - 1; i++)
366 {
367 msghdr = (struct msg_hdr *) &mptr[index];
368 index += sizeof (struct msg_hdr) + msgsize;
369 msghdr->msg_next = index;
370 }
371 msghdr = (struct msg_hdr *) &mptr[index];
372 msghdr->msg_next = 0; /* end of free list */
373
374 /* Initialize mutex & condition variable */
375 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
376 if (i != 0)
377 goto pthreaderr;
378
379 i = ipc_cond_init (&mqinfo->mqi_wait, mqhdr->mqh_uname);
380 if (i != 0)
381 goto pthreaderr;
382
383 /* Initialization complete, turn off user-execute bit */
384 if (fchmod (fd, mode) == -1)
385 goto err;
386 close (fd);
387 return ((mqd_t) mqinfo);
388 }
389
390 exists:
391 /* Open the file then memory map */
392 if ((fd = open (mqname, O_RDWR)) < 0)
393 {
394 if (errno == ENOENT && (oflag & O_CREAT))
395 goto again;
396 goto err;
397 }
398 /* Make certain initialization is complete */
399 for (i = 0; i < MAX_TRIES; i++)
400 {
401 if (stat64 (mqname, &statbuff) == -1)
402 {
403 if (errno == ENOENT && (oflag & O_CREAT))
404 {
405 close(fd);
406 goto again;
407 }
408 goto err;
409 }
410 if ((statbuff.st_mode & S_IXUSR) == 0)
411 break;
412 sleep (1);
413 }
414 if (i == MAX_TRIES)
415 {
416 set_errno (ETIMEDOUT);
417 goto err;
418 }
419
420 filesize = statbuff.st_size;
421 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
422 MAP_SHARED, fd, 0);
423 if (mptr == (int8_t *) MAP_FAILED)
424 goto err;
425 close (fd);
426
427 /* Allocate one mq_info{} for each open */
428 if (!(mqinfo = (struct mq_info *) malloc (sizeof (struct mq_info))))
429 goto err;
430 mqinfo->mqi_hdr = (struct mq_hdr *) mptr;
431 mqinfo->mqi_magic = MQI_MAGIC;
432 mqinfo->mqi_flags = nonblock;
433
434 /* Initialize mutex & condition variable */
435 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
436 if (i != 0)
437 goto pthreaderr;
438
439 i = ipc_cond_init (&mqinfo->mqi_wait, mqhdr->mqh_uname);
440 if (i != 0)
441 goto pthreaderr;
442
443 return (mqd_t) mqinfo;
444
445 pthreaderr:
446 errno = i;
447 err:
448 /* Don't let following function calls change errno */
449 save_errno save;
450
451 if (created)
452 unlink (mqname);
453 if (mptr != (int8_t *) MAP_FAILED)
454 munmap((void *) mptr, (size_t) filesize);
455 if (mqinfo)
456 free (mqinfo);
457 close (fd);
458 return (mqd_t) -1;
459 }
460
461 extern "C" int
462 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
463 {
464 int n;
465 struct mq_hdr *mqhdr;
466 struct mq_attr *attr;
467 struct mq_info *mqinfo;
468
469 myfault efault;
470 if (efault.faulted (EBADF))
471 return -1;
472
473 mqinfo = (struct mq_info *) mqd;
474 if (mqinfo->mqi_magic != MQI_MAGIC)
475 {
476 set_errno (EBADF);
477 return -1;
478 }
479 mqhdr = mqinfo->mqi_hdr;
480 attr = &mqhdr->mqh_attr;
481 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
482 {
483 errno = n;
484 return -1;
485 }
486 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
487 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
488 mqstat->mq_msgsize = attr->mq_msgsize;
489 mqstat->mq_curmsgs = attr->mq_curmsgs;
490
491 ipc_mutex_unlock (mqinfo->mqi_lock);
492 return 0;
493 }
494
495 extern "C" int
496 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
497 {
498 int n;
499 struct mq_hdr *mqhdr;
500 struct mq_attr *attr;
501 struct mq_info *mqinfo;
502
503 myfault efault;
504 if (efault.faulted (EBADF))
505 return -1;
506
507 mqinfo = (struct mq_info *) mqd;
508 if (mqinfo->mqi_magic != MQI_MAGIC)
509 {
510 set_errno (EBADF);
511 return -1;
512 }
513 mqhdr = mqinfo->mqi_hdr;
514 attr = &mqhdr->mqh_attr;
515 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
516 {
517 errno = n;
518 return -1;
519 }
520
521 if (omqstat != NULL)
522 {
523 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
524 omqstat->mq_maxmsg = attr->mq_maxmsg;
525 omqstat->mq_msgsize = attr->mq_msgsize;
526 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
527 }
528
529 if (mqstat->mq_flags & O_NONBLOCK)
530 mqinfo->mqi_flags |= O_NONBLOCK;
531 else
532 mqinfo->mqi_flags &= ~O_NONBLOCK;
533
534 ipc_mutex_unlock (mqinfo->mqi_lock);
535 return 0;
536 }
537
538 extern "C" int
539 mq_notify (mqd_t mqd, const struct sigevent *notification)
540 {
541 int n;
542 pid_t pid;
543 struct mq_hdr *mqhdr;
544 struct mq_info *mqinfo;
545
546 myfault efault;
547 if (efault.faulted (EBADF))
548 return -1;
549
550 mqinfo = (struct mq_info *) mqd;
551 if (mqinfo->mqi_magic != MQI_MAGIC)
552 {
553 set_errno (EBADF);
554 return -1;
555 }
556 mqhdr = mqinfo->mqi_hdr;
557 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
558 {
559 errno = n;
560 return -1;
561 }
562
563 pid = getpid ();
564 if (!notification)
565 {
566 if (mqhdr->mqh_pid == pid)
567 mqhdr->mqh_pid = 0; /* unregister calling process */
568 }
569 else
570 {
571 if (mqhdr->mqh_pid != 0)
572 {
573 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
574 {
575 set_errno (EBUSY);
576 ipc_mutex_unlock (mqinfo->mqi_lock);
577 return -1;
578 }
579 }
580 mqhdr->mqh_pid = pid;
581 mqhdr->mqh_event = *notification;
582 }
583 ipc_mutex_unlock (mqinfo->mqi_lock);
584 return 0;
585 }
586
587 static int
588 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
589 const struct timespec *abstime)
590 {
591 int n;
592 long index, freeindex;
593 int8_t *mptr;
594 struct sigevent *sigev;
595 struct mq_hdr *mqhdr;
596 struct mq_attr *attr;
597 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
598 struct mq_info *mqinfo;
599
600 myfault efault;
601 if (efault.faulted (EBADF))
602 return -1;
603
604 mqinfo = (struct mq_info *) mqd;
605 if (mqinfo->mqi_magic != MQI_MAGIC)
606 {
607 set_errno (EBADF);
608 return -1;
609 }
610 if (prio > MQ_PRIO_MAX)
611 {
612 set_errno (EINVAL);
613 return -1;
614 }
615
616 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
617 mptr = (int8_t *) mqhdr; /* byte pointer */
618 attr = &mqhdr->mqh_attr;
619 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
620 {
621 errno = n;
622 return -1;
623 }
624
625 if (len > (size_t) attr->mq_msgsize)
626 {
627 set_errno (EMSGSIZE);
628 goto err;
629 }
630 if (attr->mq_curmsgs == 0)
631 {
632 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
633 {
634 sigev = &mqhdr->mqh_event;
635 if (sigev->sigev_notify == SIGEV_SIGNAL)
636 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, sigev->sigev_value);
637 mqhdr->mqh_pid = 0; /* unregister */
638 }
639 }
640 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
641 {
642 /* Queue is full */
643 if (mqinfo->mqi_flags & O_NONBLOCK)
644 {
645 set_errno (EAGAIN);
646 goto err;
647 }
648 /* Wait for room for one message on the queue */
649 while (attr->mq_curmsgs >= attr->mq_maxmsg)
650 ipc_cond_timedwait (mqinfo->mqi_wait, mqinfo->mqi_lock, abstime);
651 }
652
653 /* nmsghdr will point to new message */
654 if ((freeindex = mqhdr->mqh_free) == 0)
655 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
656
657 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
658 nmsghdr->msg_prio = prio;
659 nmsghdr->msg_len = len;
660 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
661 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
662
663 /* Find right place for message in linked list */
664 index = mqhdr->mqh_head;
665 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
666 while (index)
667 {
668 msghdr = (struct msg_hdr *) &mptr[index];
669 if (prio > msghdr->msg_prio)
670 {
671 nmsghdr->msg_next = index;
672 pmsghdr->msg_next = freeindex;
673 break;
674 }
675 index = msghdr->msg_next;
676 pmsghdr = msghdr;
677 }
678 if (index == 0)
679 {
680 /* Queue was empty or new goes at end of list */
681 pmsghdr->msg_next = freeindex;
682 nmsghdr->msg_next = 0;
683 }
684 /* Wake up anyone blocked in mq_receive waiting for a message */
685 if (attr->mq_curmsgs == 0)
686 ipc_cond_signal (mqinfo->mqi_wait);
687 attr->mq_curmsgs++;
688
689 ipc_mutex_unlock (mqinfo->mqi_lock);
690 return 0;
691
692 err:
693 ipc_mutex_unlock (mqinfo->mqi_lock);
694 return -1;
695 }
696
697 extern "C" int
698 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
699 {
700 return _mq_send (mqd, ptr, len, prio, NULL);
701 }
702
703 extern "C" int
704 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
705 const struct timespec *abstime)
706 {
707 return _mq_send (mqd, ptr, len, prio, abstime);
708 }
709
710 static ssize_t
711 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
712 const struct timespec *abstime)
713 {
714 int n;
715 long index;
716 int8_t *mptr;
717 ssize_t len;
718 struct mq_hdr *mqhdr;
719 struct mq_attr *attr;
720 struct msg_hdr *msghdr;
721 struct mq_info *mqinfo;
722
723 myfault efault;
724 if (efault.faulted (EBADF))
725 return -1;
726
727 mqinfo = (struct mq_info *) mqd;
728 if (mqinfo->mqi_magic != MQI_MAGIC)
729 {
730 set_errno (EBADF);
731 return -1;
732 }
733 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
734 mptr = (int8_t *) mqhdr; /* byte pointer */
735 attr = &mqhdr->mqh_attr;
736 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
737 {
738 errno = n;
739 return -1;
740 }
741
742 if (maxlen < (size_t) attr->mq_msgsize)
743 {
744 set_errno (EMSGSIZE);
745 goto err;
746 }
747 if (attr->mq_curmsgs == 0) /* queue is empty */
748 {
749 if (mqinfo->mqi_flags & O_NONBLOCK)
750 {
751 set_errno (EAGAIN);
752 goto err;
753 }
754 /* Wait for a message to be placed onto queue */
755 mqhdr->mqh_nwait++;
756 while (attr->mq_curmsgs == 0)
757 ipc_cond_timedwait (mqinfo->mqi_wait, mqinfo->mqi_lock, abstime);
758 mqhdr->mqh_nwait--;
759 }
760
761 if ((index = mqhdr->mqh_head) == 0)
762 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
763
764 msghdr = (struct msg_hdr *) &mptr[index];
765 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
766 len = msghdr->msg_len;
767 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
768 if (priop != NULL)
769 *priop = msghdr->msg_prio;
770
771 /* Just-read message goes to front of free list */
772 msghdr->msg_next = mqhdr->mqh_free;
773 mqhdr->mqh_free = index;
774
775 /* Wake up anyone blocked in mq_send waiting for room */
776 if (attr->mq_curmsgs == attr->mq_maxmsg)
777 ipc_cond_signal (mqinfo->mqi_wait);
778 attr->mq_curmsgs--;
779
780 ipc_mutex_unlock (mqinfo->mqi_lock);
781 return len;
782
783 err:
784 ipc_mutex_unlock (mqinfo->mqi_lock);
785 return -1;
786 }
787
788 extern "C" ssize_t
789 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
790 {
791 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
792 }
793
794 extern "C" ssize_t
795 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
796 const struct timespec *abstime)
797 {
798 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
799 }
800
801 extern "C" int
802 mq_close (mqd_t mqd)
803 {
804 long msgsize, filesize;
805 struct mq_hdr *mqhdr;
806 struct mq_attr *attr;
807 struct mq_info *mqinfo;
808
809 myfault efault;
810 if (efault.faulted (EBADF))
811 return -1;
812
813 mqinfo = (struct mq_info *) mqd;
814 if (mqinfo->mqi_magic != MQI_MAGIC)
815 {
816 set_errno (EBADF);
817 return -1;
818 }
819 mqhdr = mqinfo->mqi_hdr;
820 attr = &mqhdr->mqh_attr;
821
822 if (mq_notify (mqd, NULL)) /* unregister calling process */
823 return -1;
824
825 msgsize = MSGSIZE (attr->mq_msgsize);
826 filesize = sizeof (struct mq_hdr)
827 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
828 if (munmap (mqinfo->mqi_hdr, filesize) == -1)
829 return -1;
830
831 mqinfo->mqi_magic = 0; /* just in case */
832 ipc_cond_close (mqinfo->mqi_wait);
833 ipc_mutex_close (mqinfo->mqi_lock);
834 free (mqinfo);
835 return 0;
836 }
837
838 extern "C" int
839 mq_unlink (const char *name)
840 {
841 char mqname[CYG_MAX_PATH];
842
843 if (!check_path (mqname, mqueue, name))
844 return -1;
845 if (unlink (mqname) == -1)
846 return -1;
847 return 0;
848 }
849
This page took 0.077658 seconds and 6 git commands to generate.