]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
Throughout, update copyrights to reflect dates which correspond to main-branch
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 Copyright 2007, 2008, 2009, 2010, 2011, 2012 Red Hat, Inc.
4
5 This file is part of Cygwin.
6
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9 details. */
10
11 #include "winsup.h"
12 #include "shared_info.h"
13 #include "thread.h"
14 #include "path.h"
15 #include "cygtls.h"
16 #include "fhandler.h"
17 #include "dtable.h"
18 #include "cygheap.h"
19 #include "sigproc.h"
20 #include "ntdll.h"
21 #include <sys/mman.h>
22 #include <sys/param.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 #include <mqueue.h>
26 #include <semaphore.h>
27
28 /* The prefix_len is the length of the path prefix ncluding trailing "/"
29 (or "/sem." for semaphores) as well as the trailing NUL. */
30 static struct
31 {
32 const char *prefix;
33 const size_t prefix_len;
34 const char *description;
35 } ipc_names[] = {
36 { "/dev/shm", 10, "POSIX shared memory object" },
37 { "/dev/mqueue", 13, "POSIX message queue" },
38 { "/dev/shm", 14, "POSIX semaphore" }
39 };
40
41 enum ipc_type_t
42 {
43 shmem,
44 mqueue,
45 semaphore
46 };
47
48 static bool
49 check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
50 {
51 /* Note that we require the existance of the appropriate /dev subdirectories
52 for POSIX IPC object support, similar to Linux (which supports the
53 directories, but doesn't require to mount them). We don't create
54 these directory here, that's the task of the installer. But we check
55 for existance and give ample warning. */
56 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
57 if (path.error || !path.exists () || !path.isdir ())
58 {
59 small_printf (
60 "Warning: '%s' does not exists or is not a directory.\n\n"
61 "%ss require the existance of this directory.\n"
62 "Create the directory '%s' and set the permissions to 01777.\n"
63 "For instance on the command line: mkdir -m 01777 %s\n",
64 ipc_names[type].prefix, ipc_names[type].description,
65 ipc_names[type].prefix, ipc_names[type].prefix);
66 set_errno (EINVAL);
67 return false;
68 }
69 /* Name must not be empty, or just be a single slash, or start with more
70 than one slash. Same for backslash.
71 Apart from handling backslash like slash, the naming rules are identical
72 to Linux, including the names and requirements for subdirectories, if
73 the name contains further slashes. */
74 if (!name || (strchr ("/\\", name[0])
75 && (!name[1] || strchr ("/\\", name[1]))))
76 {
77 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
78 set_errno (EINVAL);
79 return false;
80 }
81 /* Skip leading (back-)slash. */
82 if (strchr ("/\\", name[0]))
83 ++name;
84 if (len > PATH_MAX - ipc_names[type].prefix_len)
85 {
86 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
87 set_errno (ENAMETOOLONG);
88 return false;
89 }
90 __small_sprintf (res_name, "%s/%s%s", ipc_names[type].prefix,
91 type == semaphore ? "sem." : "",
92 name);
93 return true;
94 }
95
96 static int
97 ipc_mutex_init (HANDLE *pmtx, const char *name)
98 {
99 WCHAR buf[MAX_PATH];
100 UNICODE_STRING uname;
101 OBJECT_ATTRIBUTES attr;
102 NTSTATUS status;
103
104 __small_swprintf (buf, L"mqueue/mtx_%s", name);
105 RtlInitUnicodeString (&uname, buf);
106 InitializeObjectAttributes (&attr, &uname,
107 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
108 get_shared_parent_dir (),
109 everyone_sd (CYG_MUTANT_ACCESS));
110 status = NtCreateMutant (pmtx, CYG_MUTANT_ACCESS, &attr, FALSE);
111 if (!NT_SUCCESS (status))
112 {
113 debug_printf ("NtCreateMutant: %p", status);
114 return geterrno_from_win_error (RtlNtStatusToDosError (status));
115 }
116 return 0;
117 }
118
119 static int
120 ipc_mutex_lock (HANDLE mtx)
121 {
122 switch (cygwait (mtx, cw_infinite, cw_sig_eintr | cw_cancel | cw_cancel_self))
123 {
124 case WAIT_OBJECT_0:
125 case WAIT_ABANDONED_0:
126 return 0;
127 case WAIT_SIGNALED:
128 set_errno (EINTR);
129 return 1;
130 default:
131 break;
132 }
133 return geterrno_from_win_error ();
134 }
135
136 static inline int
137 ipc_mutex_unlock (HANDLE mtx)
138 {
139 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
140 }
141
142 static inline int
143 ipc_mutex_close (HANDLE mtx)
144 {
145 return CloseHandle (mtx) ? 0 : geterrno_from_win_error ();
146 }
147
148 static int
149 ipc_cond_init (HANDLE *pevt, const char *name, char sr)
150 {
151 WCHAR buf[MAX_PATH];
152 UNICODE_STRING uname;
153 OBJECT_ATTRIBUTES attr;
154 NTSTATUS status;
155
156 __small_swprintf (buf, L"mqueue/evt_%s%c", name, sr);
157 RtlInitUnicodeString (&uname, buf);
158 InitializeObjectAttributes (&attr, &uname,
159 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
160 get_shared_parent_dir (),
161 everyone_sd (CYG_EVENT_ACCESS));
162 status = NtCreateEvent (pevt, CYG_EVENT_ACCESS, &attr,
163 NotificationEvent, FALSE);
164 if (!NT_SUCCESS (status))
165 {
166 debug_printf ("NtCreateEvent: %p", status);
167 return geterrno_from_win_error (RtlNtStatusToDosError (status));
168 }
169 return 0;
170 }
171
172 static int
173 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
174 {
175 HANDLE w4[4] = { evt, };
176 DWORD cnt = 2;
177 DWORD timer_idx = 0;
178 int ret = 0;
179
180 set_signal_arrived here (w4[1]);
181 if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
182 ++cnt;
183 if (abstime)
184 {
185 if (abstime->tv_sec < 0
186 || abstime->tv_nsec < 0
187 || abstime->tv_nsec > 999999999)
188 return EINVAL;
189
190 /* If a timeout is set, we create a waitable timer to wait for.
191 This is the easiest way to handle the absolute timeout value, given
192 that NtSetTimer also takes absolute times and given the double
193 dependency on evt *and* mtx, which requires to call WFMO twice. */
194 NTSTATUS status;
195 LARGE_INTEGER duetime;
196
197 timer_idx = cnt++;
198 status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
199 NotificationTimer);
200 if (!NT_SUCCESS (status))
201 return geterrno_from_nt_status (status);
202 timespec_to_filetime (abstime, (FILETIME *) &duetime);
203 status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
204 if (!NT_SUCCESS (status))
205 {
206 NtClose (w4[timer_idx]);
207 return geterrno_from_nt_status (status);
208 }
209 }
210 ResetEvent (evt);
211 if ((ret = ipc_mutex_unlock (mtx)) != 0)
212 return ret;
213 /* Everything's set up, so now wait for the event to be signalled. */
214 restart1:
215 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
216 {
217 case WAIT_OBJECT_0:
218 break;
219 case WAIT_OBJECT_0 + 1:
220 if (_my_tls.call_signal_handler ())
221 goto restart1;
222 ret = EINTR;
223 break;
224 case WAIT_OBJECT_0 + 2:
225 if (timer_idx != 2)
226 pthread::static_cancel_self ();
227 /*FALLTHRU*/
228 case WAIT_OBJECT_0 + 3:
229 ret = ETIMEDOUT;
230 break;
231 default:
232 ret = geterrno_from_win_error ();
233 break;
234 }
235 if (ret == 0)
236 {
237 /* At this point we need to lock the mutex. The wait is practically
238 the same as before, just that we now wait on the mutex instead of the
239 event. */
240 restart2:
241 w4[0] = mtx;
242 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
243 {
244 case WAIT_OBJECT_0:
245 case WAIT_ABANDONED_0:
246 break;
247 case WAIT_OBJECT_0 + 1:
248 if (_my_tls.call_signal_handler ())
249 goto restart2;
250 ret = EINTR;
251 break;
252 case WAIT_OBJECT_0 + 2:
253 if (timer_idx != 2)
254 pthread_testcancel ();
255 /*FALLTHRU*/
256 case WAIT_OBJECT_0 + 3:
257 ret = ETIMEDOUT;
258 break;
259 default:
260 ret = geterrno_from_win_error ();
261 break;
262 }
263 }
264 if (timer_idx)
265 {
266 if (ret != ETIMEDOUT)
267 NtCancelTimer (w4[timer_idx], NULL);
268 NtClose (w4[timer_idx]);
269 }
270 return ret;
271 }
272
273 static inline void
274 ipc_cond_signal (HANDLE evt)
275 {
276 SetEvent (evt);
277 }
278
279 static inline void
280 ipc_cond_close (HANDLE evt)
281 {
282 CloseHandle (evt);
283 }
284
285 class ipc_flock
286 {
287 struct __flock64 fl;
288
289 public:
290 ipc_flock () { memset (&fl, 0, sizeof fl); }
291
292 int lock (int fd, size_t size)
293 {
294 fl.l_type = F_WRLCK;
295 fl.l_whence = SEEK_SET;
296 fl.l_start = 0;
297 fl.l_len = size;
298 return fcntl64 (fd, F_SETLKW, &fl);
299 }
300 int unlock (int fd)
301 {
302 if (!fl.l_len)
303 return 0;
304 fl.l_type = F_UNLCK;
305 return fcntl64 (fd, F_SETLKW, &fl);
306 }
307 };
308
309 /* POSIX shared memory object implementation. */
310
311 extern "C" int
312 shm_open (const char *name, int oflag, mode_t mode)
313 {
314 size_t len = strlen (name);
315 char shmname[ipc_names[shmem].prefix_len + len];
316
317 if (!check_path (shmname, shmem, name, len))
318 return -1;
319
320 /* Check for valid flags. */
321 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
322 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
323 {
324 debug_printf ("Invalid oflag 0%o", oflag);
325 set_errno (EINVAL);
326 return -1;
327 }
328
329 return open (shmname, oflag | O_CLOEXEC, mode & 0777);
330 }
331
332 extern "C" int
333 shm_unlink (const char *name)
334 {
335 size_t len = strlen (name);
336 char shmname[ipc_names[shmem].prefix_len + len];
337
338 if (!check_path (shmname, shmem, name, len))
339 return -1;
340
341 return unlink (shmname);
342 }
343
344 /* The POSIX message queue implementation is based on W. Richard STEVENS
345 implementation, just tweaked for Cygwin. The main change is
346 the usage of Windows mutexes and events instead of using the pthread
347 synchronization objects. The pathname is massaged so that the
348 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
349 are implemented additionally. */
350
351 struct mq_hdr
352 {
353 struct mq_attr mqh_attr; /* the queue's attributes */
354 long mqh_head; /* index of first message */
355 long mqh_free; /* index of first free message */
356 long mqh_nwait; /* #threads blocked in mq_receive() */
357 pid_t mqh_pid; /* nonzero PID if mqh_event set */
358 char mqh_uname[36]; /* unique name used to identify synchronization
359 objects connected to this queue */
360 struct sigevent mqh_event; /* for mq_notify() */
361 };
362
363 struct msg_hdr
364 {
365 long msg_next; /* index of next on linked list */
366 ssize_t msg_len; /* actual length */
367 unsigned int msg_prio; /* priority */
368 };
369
370 struct mq_info
371 {
372 struct mq_hdr *mqi_hdr; /* start of mmap'ed region */
373 unsigned long mqi_magic; /* magic number if open */
374 int mqi_flags; /* flags for this process */
375 HANDLE mqi_lock; /* mutex lock */
376 HANDLE mqi_waitsend; /* and condition variable for full queue */
377 HANDLE mqi_waitrecv; /* and condition variable for empty queue */
378 };
379
380 #define MQI_MAGIC 0x98765432UL
381
382 #define MSGSIZE(i) roundup((i), sizeof(long))
383
384 #define MAX_TRIES 10 /* for waiting for initialization */
385
386 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
387
388 extern "C" _off64_t lseek64 (int, _off64_t, int);
389 extern "C" void *mmap64 (void *, size_t, int, int, int, _off64_t);
390
391 extern "C" mqd_t
392 mq_open (const char *name, int oflag, ...)
393 {
394 int i, fd = -1, nonblock, created;
395 long msgsize, index;
396 _off64_t filesize = 0;
397 va_list ap;
398 mode_t mode;
399 int8_t *mptr;
400 struct __stat64 statbuff;
401 struct mq_hdr *mqhdr;
402 struct msg_hdr *msghdr;
403 struct mq_attr *attr;
404 struct mq_info *mqinfo;
405 LUID luid;
406
407 size_t len = strlen (name);
408 char mqname[ipc_names[mqueue].prefix_len + len];
409
410 if (!check_path (mqname, mqueue, name, len))
411 return (mqd_t) -1;
412
413 myfault efault;
414 if (efault.faulted (EFAULT))
415 return (mqd_t) -1;
416
417 oflag &= (O_CREAT | O_EXCL | O_NONBLOCK);
418 created = 0;
419 nonblock = oflag & O_NONBLOCK;
420 oflag &= ~O_NONBLOCK;
421 mptr = (int8_t *) MAP_FAILED;
422 mqinfo = NULL;
423
424 again:
425 if (oflag & O_CREAT)
426 {
427 va_start (ap, oflag); /* init ap to final named argument */
428 mode = va_arg (ap, mode_t) & ~S_IXUSR;
429 attr = va_arg (ap, struct mq_attr *);
430 va_end (ap);
431
432 /* Open and specify O_EXCL and user-execute */
433 fd = open (mqname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
434 if (fd < 0)
435 {
436 if (errno == EEXIST && (oflag & O_EXCL) == 0)
437 goto exists; /* already exists, OK */
438 return (mqd_t) -1;
439 }
440 created = 1;
441 /* First one to create the file initializes it */
442 if (attr == NULL)
443 attr = &defattr;
444 else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
445 {
446 set_errno (EINVAL);
447 goto err;
448 }
449 /* Calculate and set the file size */
450 msgsize = MSGSIZE (attr->mq_msgsize);
451 filesize = sizeof (struct mq_hdr)
452 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
453 if (lseek64 (fd, filesize - 1, SEEK_SET) == -1)
454 goto err;
455 if (write (fd, "", 1) == -1)
456 goto err;
457
458 /* Memory map the file */
459 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
460 MAP_SHARED, fd, 0);
461 if (mptr == (int8_t *) MAP_FAILED)
462 goto err;
463
464 /* Allocate one mq_info{} for the queue */
465 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
466 goto err;
467 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
468 mqinfo->mqi_magic = MQI_MAGIC;
469 mqinfo->mqi_flags = nonblock;
470
471 /* Initialize header at beginning of file */
472 /* Create free list with all messages on it */
473 mqhdr->mqh_attr.mq_flags = 0;
474 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
475 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
476 mqhdr->mqh_attr.mq_curmsgs = 0;
477 mqhdr->mqh_nwait = 0;
478 mqhdr->mqh_pid = 0;
479 NtAllocateLocallyUniqueId (&luid);
480 __small_sprintf (mqhdr->mqh_uname, "%016X%08x%08x",
481 hash_path_name (0,mqname),
482 luid.HighPart, luid.LowPart);
483 mqhdr->mqh_head = 0;
484 index = sizeof (struct mq_hdr);
485 mqhdr->mqh_free = index;
486 for (i = 0; i < attr->mq_maxmsg - 1; i++)
487 {
488 msghdr = (struct msg_hdr *) &mptr[index];
489 index += sizeof (struct msg_hdr) + msgsize;
490 msghdr->msg_next = index;
491 }
492 msghdr = (struct msg_hdr *) &mptr[index];
493 msghdr->msg_next = 0; /* end of free list */
494
495 /* Initialize mutex & condition variables */
496 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
497 if (i != 0)
498 goto pthreaderr;
499
500 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
501 if (i != 0)
502 goto pthreaderr;
503
504 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
505 if (i != 0)
506 goto pthreaderr;
507
508 /* Initialization complete, turn off user-execute bit */
509 if (fchmod (fd, mode) == -1)
510 goto err;
511 close (fd);
512 return ((mqd_t) mqinfo);
513 }
514
515 exists:
516 /* Open the file then memory map */
517 if ((fd = open (mqname, O_RDWR | O_CLOEXEC)) < 0)
518 {
519 if (errno == ENOENT && (oflag & O_CREAT))
520 goto again;
521 goto err;
522 }
523 /* Make certain initialization is complete */
524 for (i = 0; i < MAX_TRIES; i++)
525 {
526 if (stat64 (mqname, &statbuff) == -1)
527 {
528 if (errno == ENOENT && (oflag & O_CREAT))
529 {
530 close (fd);
531 fd = -1;
532 goto again;
533 }
534 goto err;
535 }
536 if ((statbuff.st_mode & S_IXUSR) == 0)
537 break;
538 sleep (1);
539 }
540 if (i == MAX_TRIES)
541 {
542 set_errno (ETIMEDOUT);
543 goto err;
544 }
545
546 filesize = statbuff.st_size;
547 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
548 MAP_SHARED, fd, 0);
549 if (mptr == (int8_t *) MAP_FAILED)
550 goto err;
551 close (fd);
552 fd = -1;
553
554 /* Allocate one mq_info{} for each open */
555 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
556 goto err;
557 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
558 mqinfo->mqi_magic = MQI_MAGIC;
559 mqinfo->mqi_flags = nonblock;
560
561 /* Initialize mutex & condition variable */
562 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
563 if (i != 0)
564 goto pthreaderr;
565
566 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
567 if (i != 0)
568 goto pthreaderr;
569
570 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
571 if (i != 0)
572 goto pthreaderr;
573
574 return (mqd_t) mqinfo;
575
576 pthreaderr:
577 errno = i;
578 err:
579 /* Don't let following function calls change errno */
580 save_errno save;
581
582 if (created)
583 unlink (mqname);
584 if (mptr != (int8_t *) MAP_FAILED)
585 munmap((void *) mptr, (size_t) filesize);
586 if (mqinfo)
587 {
588 if (mqinfo->mqi_lock)
589 ipc_mutex_close (mqinfo->mqi_lock);
590 if (mqinfo->mqi_waitsend)
591 ipc_cond_close (mqinfo->mqi_waitsend);
592 if (mqinfo->mqi_waitrecv)
593 ipc_cond_close (mqinfo->mqi_waitrecv);
594 free (mqinfo);
595 }
596 if (fd >= 0)
597 close (fd);
598 return (mqd_t) -1;
599 }
600
601 extern "C" int
602 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
603 {
604 int n;
605 struct mq_hdr *mqhdr;
606 struct mq_attr *attr;
607 struct mq_info *mqinfo;
608
609 myfault efault;
610 if (efault.faulted (EBADF))
611 return -1;
612
613 mqinfo = (struct mq_info *) mqd;
614 if (mqinfo->mqi_magic != MQI_MAGIC)
615 {
616 set_errno (EBADF);
617 return -1;
618 }
619 mqhdr = mqinfo->mqi_hdr;
620 attr = &mqhdr->mqh_attr;
621 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
622 {
623 errno = n;
624 return -1;
625 }
626 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
627 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
628 mqstat->mq_msgsize = attr->mq_msgsize;
629 mqstat->mq_curmsgs = attr->mq_curmsgs;
630
631 ipc_mutex_unlock (mqinfo->mqi_lock);
632 return 0;
633 }
634
635 extern "C" int
636 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
637 {
638 int n;
639 struct mq_hdr *mqhdr;
640 struct mq_attr *attr;
641 struct mq_info *mqinfo;
642
643 myfault efault;
644 if (efault.faulted (EBADF))
645 return -1;
646
647 mqinfo = (struct mq_info *) mqd;
648 if (mqinfo->mqi_magic != MQI_MAGIC)
649 {
650 set_errno (EBADF);
651 return -1;
652 }
653 mqhdr = mqinfo->mqi_hdr;
654 attr = &mqhdr->mqh_attr;
655 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
656 {
657 errno = n;
658 return -1;
659 }
660
661 if (omqstat != NULL)
662 {
663 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
664 omqstat->mq_maxmsg = attr->mq_maxmsg;
665 omqstat->mq_msgsize = attr->mq_msgsize;
666 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
667 }
668
669 if (mqstat->mq_flags & O_NONBLOCK)
670 mqinfo->mqi_flags |= O_NONBLOCK;
671 else
672 mqinfo->mqi_flags &= ~O_NONBLOCK;
673
674 ipc_mutex_unlock (mqinfo->mqi_lock);
675 return 0;
676 }
677
678 extern "C" int
679 mq_notify (mqd_t mqd, const struct sigevent *notification)
680 {
681 int n;
682 pid_t pid;
683 struct mq_hdr *mqhdr;
684 struct mq_info *mqinfo;
685
686 myfault efault;
687 if (efault.faulted (EBADF))
688 return -1;
689
690 mqinfo = (struct mq_info *) mqd;
691 if (mqinfo->mqi_magic != MQI_MAGIC)
692 {
693 set_errno (EBADF);
694 return -1;
695 }
696 mqhdr = mqinfo->mqi_hdr;
697 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
698 {
699 errno = n;
700 return -1;
701 }
702
703 pid = getpid ();
704 if (!notification)
705 {
706 if (mqhdr->mqh_pid == pid)
707 mqhdr->mqh_pid = 0; /* unregister calling process */
708 }
709 else
710 {
711 if (mqhdr->mqh_pid != 0)
712 {
713 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
714 {
715 set_errno (EBUSY);
716 ipc_mutex_unlock (mqinfo->mqi_lock);
717 return -1;
718 }
719 }
720 mqhdr->mqh_pid = pid;
721 mqhdr->mqh_event = *notification;
722 }
723 ipc_mutex_unlock (mqinfo->mqi_lock);
724 return 0;
725 }
726
727 static int
728 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
729 const struct timespec *abstime)
730 {
731 int n;
732 long index, freeindex;
733 int8_t *mptr;
734 struct sigevent *sigev;
735 struct mq_hdr *mqhdr;
736 struct mq_attr *attr;
737 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
738 struct mq_info *mqinfo;
739
740 pthread_testcancel ();
741
742 myfault efault;
743 if (efault.faulted (EBADF))
744 return -1;
745
746 mqinfo = (struct mq_info *) mqd;
747 if (mqinfo->mqi_magic != MQI_MAGIC)
748 {
749 set_errno (EBADF);
750 return -1;
751 }
752 if (prio > MQ_PRIO_MAX)
753 {
754 set_errno (EINVAL);
755 return -1;
756 }
757
758 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
759 mptr = (int8_t *) mqhdr; /* byte pointer */
760 attr = &mqhdr->mqh_attr;
761 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
762 {
763 errno = n;
764 return -1;
765 }
766
767 if (len > (size_t) attr->mq_msgsize)
768 {
769 set_errno (EMSGSIZE);
770 goto err;
771 }
772 if (attr->mq_curmsgs == 0)
773 {
774 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
775 {
776 sigev = &mqhdr->mqh_event;
777 if (sigev->sigev_notify == SIGEV_SIGNAL)
778 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, sigev->sigev_value);
779 mqhdr->mqh_pid = 0; /* unregister */
780 }
781 }
782 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
783 {
784 /* Queue is full */
785 if (mqinfo->mqi_flags & O_NONBLOCK)
786 {
787 set_errno (EAGAIN);
788 goto err;
789 }
790 /* Wait for room for one message on the queue */
791 while (attr->mq_curmsgs >= attr->mq_maxmsg)
792 {
793 int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend, mqinfo->mqi_lock,
794 abstime);
795 if (ret != 0)
796 {
797 set_errno (ret);
798 return -1;
799 }
800 }
801 }
802
803 /* nmsghdr will point to new message */
804 if ((freeindex = mqhdr->mqh_free) == 0)
805 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
806
807 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
808 nmsghdr->msg_prio = prio;
809 nmsghdr->msg_len = len;
810 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
811 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
812
813 /* Find right place for message in linked list */
814 index = mqhdr->mqh_head;
815 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
816 while (index)
817 {
818 msghdr = (struct msg_hdr *) &mptr[index];
819 if (prio > msghdr->msg_prio)
820 {
821 nmsghdr->msg_next = index;
822 pmsghdr->msg_next = freeindex;
823 break;
824 }
825 index = msghdr->msg_next;
826 pmsghdr = msghdr;
827 }
828 if (index == 0)
829 {
830 /* Queue was empty or new goes at end of list */
831 pmsghdr->msg_next = freeindex;
832 nmsghdr->msg_next = 0;
833 }
834 /* Wake up anyone blocked in mq_receive waiting for a message */
835 if (attr->mq_curmsgs == 0)
836 ipc_cond_signal (mqinfo->mqi_waitrecv);
837 attr->mq_curmsgs++;
838
839 ipc_mutex_unlock (mqinfo->mqi_lock);
840 return 0;
841
842 err:
843 ipc_mutex_unlock (mqinfo->mqi_lock);
844 return -1;
845 }
846
847 extern "C" int
848 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
849 {
850 return _mq_send (mqd, ptr, len, prio, NULL);
851 }
852
853 extern "C" int
854 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
855 const struct timespec *abstime)
856 {
857 return _mq_send (mqd, ptr, len, prio, abstime);
858 }
859
860 static ssize_t
861 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
862 const struct timespec *abstime)
863 {
864 int n;
865 long index;
866 int8_t *mptr;
867 ssize_t len;
868 struct mq_hdr *mqhdr;
869 struct mq_attr *attr;
870 struct msg_hdr *msghdr;
871 struct mq_info *mqinfo;
872
873 pthread_testcancel ();
874
875 myfault efault;
876 if (efault.faulted (EBADF))
877 return -1;
878
879 mqinfo = (struct mq_info *) mqd;
880 if (mqinfo->mqi_magic != MQI_MAGIC)
881 {
882 set_errno (EBADF);
883 return -1;
884 }
885 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
886 mptr = (int8_t *) mqhdr; /* byte pointer */
887 attr = &mqhdr->mqh_attr;
888 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
889 {
890 errno = n;
891 return -1;
892 }
893
894 if (maxlen < (size_t) attr->mq_msgsize)
895 {
896 set_errno (EMSGSIZE);
897 goto err;
898 }
899 if (attr->mq_curmsgs == 0) /* queue is empty */
900 {
901 if (mqinfo->mqi_flags & O_NONBLOCK)
902 {
903 set_errno (EAGAIN);
904 goto err;
905 }
906 /* Wait for a message to be placed onto queue */
907 mqhdr->mqh_nwait++;
908 while (attr->mq_curmsgs == 0)
909 {
910 int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv, mqinfo->mqi_lock,
911 abstime);
912 if (ret != 0)
913 {
914 set_errno (ret);
915 return -1;
916 }
917 }
918 mqhdr->mqh_nwait--;
919 }
920
921 if ((index = mqhdr->mqh_head) == 0)
922 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
923
924 msghdr = (struct msg_hdr *) &mptr[index];
925 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
926 len = msghdr->msg_len;
927 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
928 if (priop != NULL)
929 *priop = msghdr->msg_prio;
930
931 /* Just-read message goes to front of free list */
932 msghdr->msg_next = mqhdr->mqh_free;
933 mqhdr->mqh_free = index;
934
935 /* Wake up anyone blocked in mq_send waiting for room */
936 if (attr->mq_curmsgs == attr->mq_maxmsg)
937 ipc_cond_signal (mqinfo->mqi_waitsend);
938 attr->mq_curmsgs--;
939
940 ipc_mutex_unlock (mqinfo->mqi_lock);
941 return len;
942
943 err:
944 ipc_mutex_unlock (mqinfo->mqi_lock);
945 return -1;
946 }
947
948 extern "C" ssize_t
949 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
950 {
951 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
952 }
953
954 extern "C" ssize_t
955 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
956 const struct timespec *abstime)
957 {
958 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
959 }
960
961 extern "C" int
962 mq_close (mqd_t mqd)
963 {
964 long msgsize, filesize;
965 struct mq_hdr *mqhdr;
966 struct mq_attr *attr;
967 struct mq_info *mqinfo;
968
969 myfault efault;
970 if (efault.faulted (EBADF))
971 return -1;
972
973 mqinfo = (struct mq_info *) mqd;
974 if (mqinfo->mqi_magic != MQI_MAGIC)
975 {
976 set_errno (EBADF);
977 return -1;
978 }
979 mqhdr = mqinfo->mqi_hdr;
980 attr = &mqhdr->mqh_attr;
981
982 if (mq_notify (mqd, NULL)) /* unregister calling process */
983 return -1;
984
985 msgsize = MSGSIZE (attr->mq_msgsize);
986 filesize = sizeof (struct mq_hdr)
987 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
988 if (munmap (mqinfo->mqi_hdr, filesize) == -1)
989 return -1;
990
991 mqinfo->mqi_magic = 0; /* just in case */
992 ipc_cond_close (mqinfo->mqi_waitsend);
993 ipc_cond_close (mqinfo->mqi_waitrecv);
994 ipc_mutex_close (mqinfo->mqi_lock);
995 free (mqinfo);
996 return 0;
997 }
998
999 extern "C" int
1000 mq_unlink (const char *name)
1001 {
1002 size_t len = strlen (name);
1003 char mqname[ipc_names[mqueue].prefix_len + len];
1004
1005 if (!check_path (mqname, mqueue, name, len))
1006 return -1;
1007 if (unlink (mqname) == -1)
1008 return -1;
1009 return 0;
1010 }
1011
1012 /* POSIX named semaphore implementation. Loosely based on Richard W. STEPHENS
1013 implementation as far as sem_open is concerned, but under the hood using
1014 the already existing semaphore class in thread.cc. Using a file backed
1015 solution allows to implement kernel persistent named semaphores. */
1016
1017 struct sem_finfo
1018 {
1019 unsigned int value;
1020 unsigned long long hash;
1021 LUID luid;
1022 };
1023
1024 extern "C" sem_t *
1025 sem_open (const char *name, int oflag, ...)
1026 {
1027 int i, fd = -1, created;
1028 va_list ap;
1029 mode_t mode = 0;
1030 unsigned int value = 0;
1031 struct __stat64 statbuff;
1032 sem_t *sem = SEM_FAILED;
1033 sem_finfo sf;
1034 bool wasopen = false;
1035 ipc_flock file;
1036
1037 size_t len = strlen (name);
1038 char semname[ipc_names[semaphore].prefix_len + len];
1039
1040 if (!check_path (semname, semaphore, name, len))
1041 return SEM_FAILED;
1042
1043 myfault efault;
1044 if (efault.faulted (EFAULT))
1045 return SEM_FAILED;
1046
1047 created = 0;
1048 oflag &= (O_CREAT | O_EXCL);
1049
1050 again:
1051 if (oflag & O_CREAT)
1052 {
1053 va_start (ap, oflag); /* init ap to final named argument */
1054 mode = va_arg (ap, mode_t) & ~S_IXUSR;
1055 value = va_arg (ap, unsigned int);
1056 va_end (ap);
1057
1058 /* Open and specify O_EXCL and user-execute */
1059 fd = open (semname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
1060 if (fd < 0)
1061 {
1062 if (errno == EEXIST && (oflag & O_EXCL) == 0)
1063 goto exists; /* already exists, OK */
1064 return SEM_FAILED;
1065 }
1066 created = 1;
1067 /* First one to create the file initializes it. */
1068 NtAllocateLocallyUniqueId (&sf.luid);
1069 sf.value = value;
1070 sf.hash = hash_path_name (0, semname);
1071 if (write (fd, &sf, sizeof sf) != sizeof sf)
1072 goto err;
1073 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, value, wasopen);
1074 if (sem == SEM_FAILED)
1075 goto err;
1076 /* Initialization complete, turn off user-execute bit */
1077 if (fchmod (fd, mode) == -1)
1078 goto err;
1079 /* Don't close (fd); */
1080 return sem;
1081 }
1082
1083 exists:
1084 /* Open the file and fetch the semaphore name. */
1085 if ((fd = open (semname, O_RDWR | O_CLOEXEC)) < 0)
1086 {
1087 if (errno == ENOENT && (oflag & O_CREAT))
1088 goto again;
1089 goto err;
1090 }
1091 /* Make certain initialization is complete */
1092 for (i = 0; i < MAX_TRIES; i++)
1093 {
1094 if (stat64 (semname, &statbuff) == -1)
1095 {
1096 if (errno == ENOENT && (oflag & O_CREAT))
1097 {
1098 close (fd);
1099 fd = -1;
1100 goto again;
1101 }
1102 goto err;
1103 }
1104 if ((statbuff.st_mode & S_IXUSR) == 0)
1105 break;
1106 sleep (1);
1107 }
1108 if (i == MAX_TRIES)
1109 {
1110 set_errno (ETIMEDOUT);
1111 goto err;
1112 }
1113 if (file.lock (fd, sizeof sf))
1114 goto err;
1115 if (read (fd, &sf, sizeof sf) != sizeof sf)
1116 goto err;
1117 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, sf.value, wasopen);
1118 file.unlock (fd);
1119 if (sem == SEM_FAILED)
1120 goto err;
1121 /* If wasopen is set, the semaphore was already opened and we already have
1122 an open file descriptor pointing to the file. This means, we have to
1123 close the file descriptor created in this call. It won't be stored
1124 anywhere anyway. */
1125 if (wasopen)
1126 close (fd);
1127 return sem;
1128
1129 err:
1130 /* Don't let following function calls change errno */
1131 save_errno save;
1132
1133 file.unlock (fd);
1134 if (created)
1135 unlink (semname);
1136 if (sem != SEM_FAILED)
1137 semaphore::close (sem);
1138 if (fd >= 0)
1139 close (fd);
1140 return SEM_FAILED;
1141 }
1142
1143 int
1144 _sem_close (sem_t *sem, bool do_close)
1145 {
1146 sem_finfo sf;
1147 int fd, ret = -1;
1148 ipc_flock file;
1149
1150 if (semaphore::getinternal (sem, &fd, &sf.hash, &sf.luid, &sf.value) == -1)
1151 return -1;
1152 if (!file.lock (fd, sizeof sf)
1153 && lseek64 (fd, 0LL, SEEK_SET) != (_off64_t) -1
1154 && write (fd, &sf, sizeof sf) == sizeof sf)
1155 ret = do_close ? semaphore::close (sem) : 0;
1156
1157 /* Don't let following function calls change errno */
1158 save_errno save;
1159 file.unlock (fd);
1160 close (fd);
1161
1162 return ret;
1163 }
1164
1165 extern "C" int
1166 sem_close (sem_t *sem)
1167 {
1168 return _sem_close (sem, true);
1169 }
1170
1171 extern "C" int
1172 sem_unlink (const char *name)
1173 {
1174 size_t len = strlen (name);
1175 char semname[ipc_names[semaphore].prefix_len + len];
1176
1177 if (!check_path (semname, semaphore, name, len))
1178 return -1;
1179 if (unlink (semname) == -1)
1180 return -1;
1181 return 0;
1182 }
This page took 0.088392 seconds and 5 git commands to generate.