]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
* posix_ipc.cc (ipc_cond_timedwait): Also wait for pthread's
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 Copyright 2007, 2008, 2009, 2010, 2011 Red Hat, Inc.
4
5 This file is part of Cygwin.
6
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9 details. */
10
11 #include "winsup.h"
12 #include "shared_info.h"
13 #include "thread.h"
14 #include "path.h"
15 #include "cygtls.h"
16 #include "fhandler.h"
17 #include "dtable.h"
18 #include "cygheap.h"
19 #include "sigproc.h"
20 #include "ntdll.h"
21 #include <sys/mman.h>
22 #include <sys/param.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 #include <mqueue.h>
26 #include <semaphore.h>
27
28 /* The prefix_len is the length of the path prefix ncluding trailing "/"
29 (or "/sem." for semaphores) as well as the trailing NUL. */
30 static struct
31 {
32 const char *prefix;
33 const size_t prefix_len;
34 const char *description;
35 } ipc_names[] = {
36 { "/dev/shm", 10, "POSIX shared memory object" },
37 { "/dev/mqueue", 13, "POSIX message queue" },
38 { "/dev/shm", 14, "POSIX semaphore" }
39 };
40
41 enum ipc_type_t
42 {
43 shmem,
44 mqueue,
45 semaphore
46 };
47
48 static bool
49 check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
50 {
51 /* Note that we require the existance of the appropriate /dev subdirectories
52 for POSIX IPC object support, similar to Linux (which supports the
53 directories, but doesn't require to mount them). We don't create
54 these directory here, that's the task of the installer. But we check
55 for existance and give ample warning. */
56 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
57 if (path.error || !path.exists () || !path.isdir ())
58 {
59 small_printf (
60 "Warning: '%s' does not exists or is not a directory.\n\n"
61 "%ss require the existance of this directory.\n"
62 "Create the directory '%s' and set the permissions to 01777.\n"
63 "For instance on the command line: mkdir -m 01777 %s\n",
64 ipc_names[type].prefix, ipc_names[type].description,
65 ipc_names[type].prefix, ipc_names[type].prefix);
66 set_errno (EINVAL);
67 return false;
68 }
69 /* Name must not be empty, or just be a single slash, or start with more
70 than one slash. Same for backslash.
71 Apart from handling backslash like slash, the naming rules are identical
72 to Linux, including the names and requirements for subdirectories, if
73 the name contains further slashes. */
74 if (!name || (strchr ("/\\", name[0])
75 && (!name[1] || strchr ("/\\", name[1]))))
76 {
77 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
78 set_errno (EINVAL);
79 return false;
80 }
81 /* Skip leading (back-)slash. */
82 if (strchr ("/\\", name[0]))
83 ++name;
84 if (len > PATH_MAX - ipc_names[type].prefix_len)
85 {
86 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
87 set_errno (ENAMETOOLONG);
88 return false;
89 }
90 __small_sprintf (res_name, "%s/%s%s", ipc_names[type].prefix,
91 type == semaphore ? "sem." : "",
92 name);
93 return true;
94 }
95
96 static int
97 ipc_mutex_init (HANDLE *pmtx, const char *name)
98 {
99 WCHAR buf[MAX_PATH];
100 UNICODE_STRING uname;
101 OBJECT_ATTRIBUTES attr;
102 NTSTATUS status;
103
104 __small_swprintf (buf, L"mqueue/mtx_%s", name);
105 RtlInitUnicodeString (&uname, buf);
106 InitializeObjectAttributes (&attr, &uname,
107 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
108 get_shared_parent_dir (),
109 everyone_sd (CYG_MUTANT_ACCESS));
110 status = NtCreateMutant (pmtx, CYG_MUTANT_ACCESS, &attr, FALSE);
111 if (!NT_SUCCESS (status))
112 {
113 debug_printf ("NtCreateMutant: %p", status);
114 return geterrno_from_win_error (RtlNtStatusToDosError (status));
115 }
116 return 0;
117 }
118
119 static int
120 ipc_mutex_lock (HANDLE mtx)
121 {
122 HANDLE h[2] = { mtx, signal_arrived };
123
124 switch (WaitForMultipleObjects (2, h, FALSE, INFINITE))
125 {
126 case WAIT_OBJECT_0:
127 case WAIT_ABANDONED_0:
128 return 0;
129 case WAIT_OBJECT_0 + 1:
130 set_errno (EINTR);
131 return 1;
132 default:
133 break;
134 }
135 return geterrno_from_win_error ();
136 }
137
138 static inline int
139 ipc_mutex_unlock (HANDLE mtx)
140 {
141 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
142 }
143
144 static inline int
145 ipc_mutex_close (HANDLE mtx)
146 {
147 return CloseHandle (mtx) ? 0 : geterrno_from_win_error ();
148 }
149
150 static int
151 ipc_cond_init (HANDLE *pevt, const char *name, char sr)
152 {
153 WCHAR buf[MAX_PATH];
154 UNICODE_STRING uname;
155 OBJECT_ATTRIBUTES attr;
156 NTSTATUS status;
157
158 __small_swprintf (buf, L"mqueue/evt_%s%c", name, sr);
159 RtlInitUnicodeString (&uname, buf);
160 InitializeObjectAttributes (&attr, &uname,
161 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
162 get_shared_parent_dir (),
163 everyone_sd (CYG_EVENT_ACCESS));
164 status = NtCreateEvent (pevt, CYG_EVENT_ACCESS, &attr,
165 NotificationEvent, FALSE);
166 if (!NT_SUCCESS (status))
167 {
168 debug_printf ("NtCreateEvent: %p", status);
169 return geterrno_from_win_error (RtlNtStatusToDosError (status));
170 }
171 return 0;
172 }
173
174 static int
175 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
176 {
177 pthread_t thread;
178 HANDLE w4[4] = { evt, signal_arrived, NULL, NULL };
179 DWORD cnt = 2;
180 DWORD timer_idx = 0;
181 int ret = 0;
182
183 thread = pthread::self ();
184 if (thread && thread->cancel_event)
185 w4[cnt++] = thread->cancel_event;
186 if (abstime)
187 {
188 if (abstime->tv_sec < 0
189 || abstime->tv_nsec < 0
190 || abstime->tv_nsec > 999999999)
191 return EINVAL;
192
193 /* If a timeout is set, we create a waitable timer to wait for.
194 This is the easiest way to handle the absolute timeout value, given
195 that NtSetTimer also takes absolute times and given the double
196 dependency on evt *and* mtx, which requires to call WFMO twice. */
197 NTSTATUS status;
198 LARGE_INTEGER duetime;
199
200 timer_idx = cnt++;
201 status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
202 NotificationTimer);
203 if (!NT_SUCCESS (status))
204 return geterrno_from_nt_status (status);
205 timespec_to_filetime (abstime, (FILETIME *) &duetime);
206 status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
207 if (!NT_SUCCESS (status))
208 {
209 NtClose (w4[timer_idx]);
210 return geterrno_from_nt_status (status);
211 }
212 }
213 ResetEvent (evt);
214 if ((ret = ipc_mutex_unlock (mtx)) != 0)
215 return ret;
216 /* Everything's set up, so now wait for the event to be signalled. */
217 restart1:
218 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
219 {
220 case WAIT_OBJECT_0:
221 break;
222 case WAIT_OBJECT_0 + 1:
223 if (_my_tls.call_signal_handler ())
224 goto restart1;
225 ret = EINTR;
226 break;
227 case WAIT_OBJECT_0 + 2:
228 if (timer_idx != 2)
229 pthread_testcancel ();
230 /*FALLTHRU*/
231 case WAIT_OBJECT_0 + 3:
232 ret = ETIMEDOUT;
233 break;
234 default:
235 ret = geterrno_from_win_error ();
236 break;
237 }
238 if (ret == 0)
239 {
240 /* At this point we need to lock the mutex. The wait is practically
241 the same as before, just that we now wait on the mutex instead of the
242 event. */
243 restart2:
244 w4[0] = mtx;
245 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
246 {
247 case WAIT_OBJECT_0:
248 case WAIT_ABANDONED_0:
249 break;
250 case WAIT_OBJECT_0 + 1:
251 if (_my_tls.call_signal_handler ())
252 goto restart2;
253 ret = EINTR;
254 break;
255 case WAIT_OBJECT_0 + 2:
256 if (timer_idx != 2)
257 pthread_testcancel ();
258 /*FALLTHRU*/
259 case WAIT_OBJECT_0 + 3:
260 ret = ETIMEDOUT;
261 break;
262 default:
263 ret = geterrno_from_win_error ();
264 break;
265 }
266 }
267 if (timer_idx)
268 {
269 if (ret != ETIMEDOUT)
270 NtCancelTimer (w4[timer_idx], NULL);
271 NtClose (w4[timer_idx]);
272 }
273 return ret;
274 }
275
276 static inline void
277 ipc_cond_signal (HANDLE evt)
278 {
279 SetEvent (evt);
280 }
281
282 static inline void
283 ipc_cond_close (HANDLE evt)
284 {
285 CloseHandle (evt);
286 }
287
288 class ipc_flock
289 {
290 struct __flock64 fl;
291
292 public:
293 ipc_flock () { memset (&fl, 0, sizeof fl); }
294
295 int lock (int fd, size_t size)
296 {
297 fl.l_type = F_WRLCK;
298 fl.l_whence = SEEK_SET;
299 fl.l_start = 0;
300 fl.l_len = size;
301 return fcntl64 (fd, F_SETLKW, &fl);
302 }
303 int unlock (int fd)
304 {
305 if (!fl.l_len)
306 return 0;
307 fl.l_type = F_UNLCK;
308 return fcntl64 (fd, F_SETLKW, &fl);
309 }
310 };
311
312 /* POSIX shared memory object implementation. */
313
314 extern "C" int
315 shm_open (const char *name, int oflag, mode_t mode)
316 {
317 size_t len = strlen (name);
318 char shmname[ipc_names[shmem].prefix_len + len];
319
320 if (!check_path (shmname, shmem, name, len))
321 return -1;
322
323 /* Check for valid flags. */
324 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
325 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
326 {
327 debug_printf ("Invalid oflag 0%o", oflag);
328 set_errno (EINVAL);
329 return -1;
330 }
331
332 return open (shmname, oflag | O_CLOEXEC, mode & 0777);
333 }
334
335 extern "C" int
336 shm_unlink (const char *name)
337 {
338 size_t len = strlen (name);
339 char shmname[ipc_names[shmem].prefix_len + len];
340
341 if (!check_path (shmname, shmem, name, len))
342 return -1;
343
344 return unlink (shmname);
345 }
346
347 /* The POSIX message queue implementation is based on W. Richard STEVENS
348 implementation, just tweaked for Cygwin. The main change is
349 the usage of Windows mutexes and events instead of using the pthread
350 synchronization objects. The pathname is massaged so that the
351 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
352 are implemented additionally. */
353
354 struct mq_hdr
355 {
356 struct mq_attr mqh_attr; /* the queue's attributes */
357 long mqh_head; /* index of first message */
358 long mqh_free; /* index of first free message */
359 long mqh_nwait; /* #threads blocked in mq_receive() */
360 pid_t mqh_pid; /* nonzero PID if mqh_event set */
361 char mqh_uname[36]; /* unique name used to identify synchronization
362 objects connected to this queue */
363 struct sigevent mqh_event; /* for mq_notify() */
364 };
365
366 struct msg_hdr
367 {
368 long msg_next; /* index of next on linked list */
369 ssize_t msg_len; /* actual length */
370 unsigned int msg_prio; /* priority */
371 };
372
373 struct mq_info
374 {
375 struct mq_hdr *mqi_hdr; /* start of mmap'ed region */
376 unsigned long mqi_magic; /* magic number if open */
377 int mqi_flags; /* flags for this process */
378 HANDLE mqi_lock; /* mutex lock */
379 HANDLE mqi_waitsend; /* and condition variable for full queue */
380 HANDLE mqi_waitrecv; /* and condition variable for empty queue */
381 };
382
383 #define MQI_MAGIC 0x98765432UL
384
385 #define MSGSIZE(i) roundup((i), sizeof(long))
386
387 #define MAX_TRIES 10 /* for waiting for initialization */
388
389 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
390
391 extern "C" _off64_t lseek64 (int, _off64_t, int);
392 extern "C" void *mmap64 (void *, size_t, int, int, int, _off64_t);
393
394 extern "C" mqd_t
395 mq_open (const char *name, int oflag, ...)
396 {
397 int i, fd = -1, nonblock, created;
398 long msgsize, index;
399 _off64_t filesize = 0;
400 va_list ap;
401 mode_t mode;
402 int8_t *mptr;
403 struct __stat64 statbuff;
404 struct mq_hdr *mqhdr;
405 struct msg_hdr *msghdr;
406 struct mq_attr *attr;
407 struct mq_info *mqinfo;
408 LUID luid;
409
410 size_t len = strlen (name);
411 char mqname[ipc_names[mqueue].prefix_len + len];
412
413 if (!check_path (mqname, mqueue, name, len))
414 return (mqd_t) -1;
415
416 myfault efault;
417 if (efault.faulted (EFAULT))
418 return (mqd_t) -1;
419
420 oflag &= (O_CREAT | O_EXCL | O_NONBLOCK);
421 created = 0;
422 nonblock = oflag & O_NONBLOCK;
423 oflag &= ~O_NONBLOCK;
424 mptr = (int8_t *) MAP_FAILED;
425 mqinfo = NULL;
426
427 again:
428 if (oflag & O_CREAT)
429 {
430 va_start (ap, oflag); /* init ap to final named argument */
431 mode = va_arg (ap, mode_t) & ~S_IXUSR;
432 attr = va_arg (ap, struct mq_attr *);
433 va_end (ap);
434
435 /* Open and specify O_EXCL and user-execute */
436 fd = open (mqname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
437 if (fd < 0)
438 {
439 if (errno == EEXIST && (oflag & O_EXCL) == 0)
440 goto exists; /* already exists, OK */
441 return (mqd_t) -1;
442 }
443 created = 1;
444 /* First one to create the file initializes it */
445 if (attr == NULL)
446 attr = &defattr;
447 else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
448 {
449 set_errno (EINVAL);
450 goto err;
451 }
452 /* Calculate and set the file size */
453 msgsize = MSGSIZE (attr->mq_msgsize);
454 filesize = sizeof (struct mq_hdr)
455 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
456 if (lseek64 (fd, filesize - 1, SEEK_SET) == -1)
457 goto err;
458 if (write (fd, "", 1) == -1)
459 goto err;
460
461 /* Memory map the file */
462 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
463 MAP_SHARED, fd, 0);
464 if (mptr == (int8_t *) MAP_FAILED)
465 goto err;
466
467 /* Allocate one mq_info{} for the queue */
468 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
469 goto err;
470 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
471 mqinfo->mqi_magic = MQI_MAGIC;
472 mqinfo->mqi_flags = nonblock;
473
474 /* Initialize header at beginning of file */
475 /* Create free list with all messages on it */
476 mqhdr->mqh_attr.mq_flags = 0;
477 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
478 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
479 mqhdr->mqh_attr.mq_curmsgs = 0;
480 mqhdr->mqh_nwait = 0;
481 mqhdr->mqh_pid = 0;
482 NtAllocateLocallyUniqueId (&luid);
483 __small_sprintf (mqhdr->mqh_uname, "%016X%08x%08x",
484 hash_path_name (0,mqname),
485 luid.HighPart, luid.LowPart);
486 mqhdr->mqh_head = 0;
487 index = sizeof (struct mq_hdr);
488 mqhdr->mqh_free = index;
489 for (i = 0; i < attr->mq_maxmsg - 1; i++)
490 {
491 msghdr = (struct msg_hdr *) &mptr[index];
492 index += sizeof (struct msg_hdr) + msgsize;
493 msghdr->msg_next = index;
494 }
495 msghdr = (struct msg_hdr *) &mptr[index];
496 msghdr->msg_next = 0; /* end of free list */
497
498 /* Initialize mutex & condition variables */
499 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
500 if (i != 0)
501 goto pthreaderr;
502
503 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
504 if (i != 0)
505 goto pthreaderr;
506
507 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
508 if (i != 0)
509 goto pthreaderr;
510
511 /* Initialization complete, turn off user-execute bit */
512 if (fchmod (fd, mode) == -1)
513 goto err;
514 close (fd);
515 return ((mqd_t) mqinfo);
516 }
517
518 exists:
519 /* Open the file then memory map */
520 if ((fd = open (mqname, O_RDWR | O_CLOEXEC)) < 0)
521 {
522 if (errno == ENOENT && (oflag & O_CREAT))
523 goto again;
524 goto err;
525 }
526 /* Make certain initialization is complete */
527 for (i = 0; i < MAX_TRIES; i++)
528 {
529 if (stat64 (mqname, &statbuff) == -1)
530 {
531 if (errno == ENOENT && (oflag & O_CREAT))
532 {
533 close (fd);
534 fd = -1;
535 goto again;
536 }
537 goto err;
538 }
539 if ((statbuff.st_mode & S_IXUSR) == 0)
540 break;
541 sleep (1);
542 }
543 if (i == MAX_TRIES)
544 {
545 set_errno (ETIMEDOUT);
546 goto err;
547 }
548
549 filesize = statbuff.st_size;
550 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
551 MAP_SHARED, fd, 0);
552 if (mptr == (int8_t *) MAP_FAILED)
553 goto err;
554 close (fd);
555 fd = -1;
556
557 /* Allocate one mq_info{} for each open */
558 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
559 goto err;
560 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
561 mqinfo->mqi_magic = MQI_MAGIC;
562 mqinfo->mqi_flags = nonblock;
563
564 /* Initialize mutex & condition variable */
565 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
566 if (i != 0)
567 goto pthreaderr;
568
569 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
570 if (i != 0)
571 goto pthreaderr;
572
573 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
574 if (i != 0)
575 goto pthreaderr;
576
577 return (mqd_t) mqinfo;
578
579 pthreaderr:
580 errno = i;
581 err:
582 /* Don't let following function calls change errno */
583 save_errno save;
584
585 if (created)
586 unlink (mqname);
587 if (mptr != (int8_t *) MAP_FAILED)
588 munmap((void *) mptr, (size_t) filesize);
589 if (mqinfo)
590 {
591 if (mqinfo->mqi_lock)
592 ipc_mutex_close (mqinfo->mqi_lock);
593 if (mqinfo->mqi_waitsend)
594 ipc_cond_close (mqinfo->mqi_waitsend);
595 if (mqinfo->mqi_waitrecv)
596 ipc_cond_close (mqinfo->mqi_waitrecv);
597 free (mqinfo);
598 }
599 if (fd >= 0)
600 close (fd);
601 return (mqd_t) -1;
602 }
603
604 extern "C" int
605 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
606 {
607 int n;
608 struct mq_hdr *mqhdr;
609 struct mq_attr *attr;
610 struct mq_info *mqinfo;
611
612 myfault efault;
613 if (efault.faulted (EBADF))
614 return -1;
615
616 mqinfo = (struct mq_info *) mqd;
617 if (mqinfo->mqi_magic != MQI_MAGIC)
618 {
619 set_errno (EBADF);
620 return -1;
621 }
622 mqhdr = mqinfo->mqi_hdr;
623 attr = &mqhdr->mqh_attr;
624 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
625 {
626 errno = n;
627 return -1;
628 }
629 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
630 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
631 mqstat->mq_msgsize = attr->mq_msgsize;
632 mqstat->mq_curmsgs = attr->mq_curmsgs;
633
634 ipc_mutex_unlock (mqinfo->mqi_lock);
635 return 0;
636 }
637
638 extern "C" int
639 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
640 {
641 int n;
642 struct mq_hdr *mqhdr;
643 struct mq_attr *attr;
644 struct mq_info *mqinfo;
645
646 myfault efault;
647 if (efault.faulted (EBADF))
648 return -1;
649
650 mqinfo = (struct mq_info *) mqd;
651 if (mqinfo->mqi_magic != MQI_MAGIC)
652 {
653 set_errno (EBADF);
654 return -1;
655 }
656 mqhdr = mqinfo->mqi_hdr;
657 attr = &mqhdr->mqh_attr;
658 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
659 {
660 errno = n;
661 return -1;
662 }
663
664 if (omqstat != NULL)
665 {
666 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
667 omqstat->mq_maxmsg = attr->mq_maxmsg;
668 omqstat->mq_msgsize = attr->mq_msgsize;
669 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
670 }
671
672 if (mqstat->mq_flags & O_NONBLOCK)
673 mqinfo->mqi_flags |= O_NONBLOCK;
674 else
675 mqinfo->mqi_flags &= ~O_NONBLOCK;
676
677 ipc_mutex_unlock (mqinfo->mqi_lock);
678 return 0;
679 }
680
681 extern "C" int
682 mq_notify (mqd_t mqd, const struct sigevent *notification)
683 {
684 int n;
685 pid_t pid;
686 struct mq_hdr *mqhdr;
687 struct mq_info *mqinfo;
688
689 myfault efault;
690 if (efault.faulted (EBADF))
691 return -1;
692
693 mqinfo = (struct mq_info *) mqd;
694 if (mqinfo->mqi_magic != MQI_MAGIC)
695 {
696 set_errno (EBADF);
697 return -1;
698 }
699 mqhdr = mqinfo->mqi_hdr;
700 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
701 {
702 errno = n;
703 return -1;
704 }
705
706 pid = getpid ();
707 if (!notification)
708 {
709 if (mqhdr->mqh_pid == pid)
710 mqhdr->mqh_pid = 0; /* unregister calling process */
711 }
712 else
713 {
714 if (mqhdr->mqh_pid != 0)
715 {
716 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
717 {
718 set_errno (EBUSY);
719 ipc_mutex_unlock (mqinfo->mqi_lock);
720 return -1;
721 }
722 }
723 mqhdr->mqh_pid = pid;
724 mqhdr->mqh_event = *notification;
725 }
726 ipc_mutex_unlock (mqinfo->mqi_lock);
727 return 0;
728 }
729
730 static int
731 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
732 const struct timespec *abstime)
733 {
734 int n;
735 long index, freeindex;
736 int8_t *mptr;
737 struct sigevent *sigev;
738 struct mq_hdr *mqhdr;
739 struct mq_attr *attr;
740 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
741 struct mq_info *mqinfo;
742
743 pthread_testcancel ();
744
745 myfault efault;
746 if (efault.faulted (EBADF))
747 return -1;
748
749 mqinfo = (struct mq_info *) mqd;
750 if (mqinfo->mqi_magic != MQI_MAGIC)
751 {
752 set_errno (EBADF);
753 return -1;
754 }
755 if (prio > MQ_PRIO_MAX)
756 {
757 set_errno (EINVAL);
758 return -1;
759 }
760
761 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
762 mptr = (int8_t *) mqhdr; /* byte pointer */
763 attr = &mqhdr->mqh_attr;
764 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
765 {
766 errno = n;
767 return -1;
768 }
769
770 if (len > (size_t) attr->mq_msgsize)
771 {
772 set_errno (EMSGSIZE);
773 goto err;
774 }
775 if (attr->mq_curmsgs == 0)
776 {
777 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
778 {
779 sigev = &mqhdr->mqh_event;
780 if (sigev->sigev_notify == SIGEV_SIGNAL)
781 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, sigev->sigev_value);
782 mqhdr->mqh_pid = 0; /* unregister */
783 }
784 }
785 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
786 {
787 /* Queue is full */
788 if (mqinfo->mqi_flags & O_NONBLOCK)
789 {
790 set_errno (EAGAIN);
791 goto err;
792 }
793 /* Wait for room for one message on the queue */
794 while (attr->mq_curmsgs >= attr->mq_maxmsg)
795 {
796 int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend, mqinfo->mqi_lock,
797 abstime);
798 if (ret != 0)
799 {
800 set_errno (ret);
801 return -1;
802 }
803 }
804 }
805
806 /* nmsghdr will point to new message */
807 if ((freeindex = mqhdr->mqh_free) == 0)
808 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
809
810 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
811 nmsghdr->msg_prio = prio;
812 nmsghdr->msg_len = len;
813 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
814 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
815
816 /* Find right place for message in linked list */
817 index = mqhdr->mqh_head;
818 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
819 while (index)
820 {
821 msghdr = (struct msg_hdr *) &mptr[index];
822 if (prio > msghdr->msg_prio)
823 {
824 nmsghdr->msg_next = index;
825 pmsghdr->msg_next = freeindex;
826 break;
827 }
828 index = msghdr->msg_next;
829 pmsghdr = msghdr;
830 }
831 if (index == 0)
832 {
833 /* Queue was empty or new goes at end of list */
834 pmsghdr->msg_next = freeindex;
835 nmsghdr->msg_next = 0;
836 }
837 /* Wake up anyone blocked in mq_receive waiting for a message */
838 if (attr->mq_curmsgs == 0)
839 ipc_cond_signal (mqinfo->mqi_waitrecv);
840 attr->mq_curmsgs++;
841
842 ipc_mutex_unlock (mqinfo->mqi_lock);
843 return 0;
844
845 err:
846 ipc_mutex_unlock (mqinfo->mqi_lock);
847 return -1;
848 }
849
850 extern "C" int
851 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
852 {
853 return _mq_send (mqd, ptr, len, prio, NULL);
854 }
855
856 extern "C" int
857 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
858 const struct timespec *abstime)
859 {
860 return _mq_send (mqd, ptr, len, prio, abstime);
861 }
862
863 static ssize_t
864 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
865 const struct timespec *abstime)
866 {
867 int n;
868 long index;
869 int8_t *mptr;
870 ssize_t len;
871 struct mq_hdr *mqhdr;
872 struct mq_attr *attr;
873 struct msg_hdr *msghdr;
874 struct mq_info *mqinfo;
875
876 pthread_testcancel ();
877
878 myfault efault;
879 if (efault.faulted (EBADF))
880 return -1;
881
882 mqinfo = (struct mq_info *) mqd;
883 if (mqinfo->mqi_magic != MQI_MAGIC)
884 {
885 set_errno (EBADF);
886 return -1;
887 }
888 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
889 mptr = (int8_t *) mqhdr; /* byte pointer */
890 attr = &mqhdr->mqh_attr;
891 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
892 {
893 errno = n;
894 return -1;
895 }
896
897 if (maxlen < (size_t) attr->mq_msgsize)
898 {
899 set_errno (EMSGSIZE);
900 goto err;
901 }
902 if (attr->mq_curmsgs == 0) /* queue is empty */
903 {
904 if (mqinfo->mqi_flags & O_NONBLOCK)
905 {
906 set_errno (EAGAIN);
907 goto err;
908 }
909 /* Wait for a message to be placed onto queue */
910 mqhdr->mqh_nwait++;
911 while (attr->mq_curmsgs == 0)
912 {
913 int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv, mqinfo->mqi_lock,
914 abstime);
915 if (ret != 0)
916 {
917 set_errno (ret);
918 return -1;
919 }
920 }
921 mqhdr->mqh_nwait--;
922 }
923
924 if ((index = mqhdr->mqh_head) == 0)
925 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
926
927 msghdr = (struct msg_hdr *) &mptr[index];
928 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
929 len = msghdr->msg_len;
930 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
931 if (priop != NULL)
932 *priop = msghdr->msg_prio;
933
934 /* Just-read message goes to front of free list */
935 msghdr->msg_next = mqhdr->mqh_free;
936 mqhdr->mqh_free = index;
937
938 /* Wake up anyone blocked in mq_send waiting for room */
939 if (attr->mq_curmsgs == attr->mq_maxmsg)
940 ipc_cond_signal (mqinfo->mqi_waitsend);
941 attr->mq_curmsgs--;
942
943 ipc_mutex_unlock (mqinfo->mqi_lock);
944 return len;
945
946 err:
947 ipc_mutex_unlock (mqinfo->mqi_lock);
948 return -1;
949 }
950
951 extern "C" ssize_t
952 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
953 {
954 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
955 }
956
957 extern "C" ssize_t
958 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
959 const struct timespec *abstime)
960 {
961 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
962 }
963
964 extern "C" int
965 mq_close (mqd_t mqd)
966 {
967 long msgsize, filesize;
968 struct mq_hdr *mqhdr;
969 struct mq_attr *attr;
970 struct mq_info *mqinfo;
971
972 myfault efault;
973 if (efault.faulted (EBADF))
974 return -1;
975
976 mqinfo = (struct mq_info *) mqd;
977 if (mqinfo->mqi_magic != MQI_MAGIC)
978 {
979 set_errno (EBADF);
980 return -1;
981 }
982 mqhdr = mqinfo->mqi_hdr;
983 attr = &mqhdr->mqh_attr;
984
985 if (mq_notify (mqd, NULL)) /* unregister calling process */
986 return -1;
987
988 msgsize = MSGSIZE (attr->mq_msgsize);
989 filesize = sizeof (struct mq_hdr)
990 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
991 if (munmap (mqinfo->mqi_hdr, filesize) == -1)
992 return -1;
993
994 mqinfo->mqi_magic = 0; /* just in case */
995 ipc_cond_close (mqinfo->mqi_waitsend);
996 ipc_cond_close (mqinfo->mqi_waitrecv);
997 ipc_mutex_close (mqinfo->mqi_lock);
998 free (mqinfo);
999 return 0;
1000 }
1001
1002 extern "C" int
1003 mq_unlink (const char *name)
1004 {
1005 size_t len = strlen (name);
1006 char mqname[ipc_names[mqueue].prefix_len + len];
1007
1008 if (!check_path (mqname, mqueue, name, len))
1009 return -1;
1010 if (unlink (mqname) == -1)
1011 return -1;
1012 return 0;
1013 }
1014
1015 /* POSIX named semaphore implementation. Loosely based on Richard W. STEPHENS
1016 implementation as far as sem_open is concerned, but under the hood using
1017 the already existing semaphore class in thread.cc. Using a file backed
1018 solution allows to implement kernel persistent named semaphores. */
1019
1020 struct sem_finfo
1021 {
1022 unsigned int value;
1023 unsigned long long hash;
1024 LUID luid;
1025 };
1026
1027 extern "C" sem_t *
1028 sem_open (const char *name, int oflag, ...)
1029 {
1030 int i, fd = -1, created;
1031 va_list ap;
1032 mode_t mode = 0;
1033 unsigned int value = 0;
1034 struct __stat64 statbuff;
1035 sem_t *sem = SEM_FAILED;
1036 sem_finfo sf;
1037 bool wasopen = false;
1038 ipc_flock file;
1039
1040 size_t len = strlen (name);
1041 char semname[ipc_names[semaphore].prefix_len + len];
1042
1043 if (!check_path (semname, semaphore, name, len))
1044 return SEM_FAILED;
1045
1046 myfault efault;
1047 if (efault.faulted (EFAULT))
1048 return SEM_FAILED;
1049
1050 created = 0;
1051 oflag &= (O_CREAT | O_EXCL);
1052
1053 again:
1054 if (oflag & O_CREAT)
1055 {
1056 va_start (ap, oflag); /* init ap to final named argument */
1057 mode = va_arg (ap, mode_t) & ~S_IXUSR;
1058 value = va_arg (ap, unsigned int);
1059 va_end (ap);
1060
1061 /* Open and specify O_EXCL and user-execute */
1062 fd = open (semname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
1063 if (fd < 0)
1064 {
1065 if (errno == EEXIST && (oflag & O_EXCL) == 0)
1066 goto exists; /* already exists, OK */
1067 return SEM_FAILED;
1068 }
1069 created = 1;
1070 /* First one to create the file initializes it. */
1071 NtAllocateLocallyUniqueId (&sf.luid);
1072 sf.value = value;
1073 sf.hash = hash_path_name (0, semname);
1074 if (write (fd, &sf, sizeof sf) != sizeof sf)
1075 goto err;
1076 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, value, wasopen);
1077 if (sem == SEM_FAILED)
1078 goto err;
1079 /* Initialization complete, turn off user-execute bit */
1080 if (fchmod (fd, mode) == -1)
1081 goto err;
1082 /* Don't close (fd); */
1083 return sem;
1084 }
1085
1086 exists:
1087 /* Open the file and fetch the semaphore name. */
1088 if ((fd = open (semname, O_RDWR | O_CLOEXEC)) < 0)
1089 {
1090 if (errno == ENOENT && (oflag & O_CREAT))
1091 goto again;
1092 goto err;
1093 }
1094 /* Make certain initialization is complete */
1095 for (i = 0; i < MAX_TRIES; i++)
1096 {
1097 if (stat64 (semname, &statbuff) == -1)
1098 {
1099 if (errno == ENOENT && (oflag & O_CREAT))
1100 {
1101 close (fd);
1102 fd = -1;
1103 goto again;
1104 }
1105 goto err;
1106 }
1107 if ((statbuff.st_mode & S_IXUSR) == 0)
1108 break;
1109 sleep (1);
1110 }
1111 if (i == MAX_TRIES)
1112 {
1113 set_errno (ETIMEDOUT);
1114 goto err;
1115 }
1116 if (file.lock (fd, sizeof sf))
1117 goto err;
1118 if (read (fd, &sf, sizeof sf) != sizeof sf)
1119 goto err;
1120 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, sf.value, wasopen);
1121 file.unlock (fd);
1122 if (sem == SEM_FAILED)
1123 goto err;
1124 /* If wasopen is set, the semaphore was already opened and we already have
1125 an open file descriptor pointing to the file. This means, we have to
1126 close the file descriptor created in this call. It won't be stored
1127 anywhere anyway. */
1128 if (wasopen)
1129 close (fd);
1130 return sem;
1131
1132 err:
1133 /* Don't let following function calls change errno */
1134 save_errno save;
1135
1136 file.unlock (fd);
1137 if (created)
1138 unlink (semname);
1139 if (sem != SEM_FAILED)
1140 semaphore::close (sem);
1141 if (fd >= 0)
1142 close (fd);
1143 return SEM_FAILED;
1144 }
1145
1146 int
1147 _sem_close (sem_t *sem, bool do_close)
1148 {
1149 sem_finfo sf;
1150 int fd, ret = -1;
1151 ipc_flock file;
1152
1153 if (semaphore::getinternal (sem, &fd, &sf.hash, &sf.luid, &sf.value) == -1)
1154 return -1;
1155 if (!file.lock (fd, sizeof sf)
1156 && lseek64 (fd, 0LL, SEEK_SET) != (_off64_t) -1
1157 && write (fd, &sf, sizeof sf) == sizeof sf)
1158 ret = do_close ? semaphore::close (sem) : 0;
1159
1160 /* Don't let following function calls change errno */
1161 save_errno save;
1162 file.unlock (fd);
1163 close (fd);
1164
1165 return ret;
1166 }
1167
1168 extern "C" int
1169 sem_close (sem_t *sem)
1170 {
1171 return _sem_close (sem, true);
1172 }
1173
1174 extern "C" int
1175 sem_unlink (const char *name)
1176 {
1177 size_t len = strlen (name);
1178 char semname[ipc_names[semaphore].prefix_len + len];
1179
1180 if (!check_path (semname, semaphore, name, len))
1181 return -1;
1182 if (unlink (semname) == -1)
1183 return -1;
1184 return 0;
1185 }
This page took 0.089378 seconds and 6 git commands to generate.