]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
* posix_ipc.cc (ipc_mutex_init): Call NtCreateMutant to make sure the
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 Copyright 2007, 2008, 2009, 2010 Red Hat, Inc.
4
5 This file is part of Cygwin.
6
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9 details. */
10
11 #include "winsup.h"
12 #include "shared_info.h"
13 #include "thread.h"
14 #include "path.h"
15 #include "cygtls.h"
16 #include "fhandler.h"
17 #include "dtable.h"
18 #include "cygheap.h"
19 #include "sigproc.h"
20 #include "ntdll.h"
21 #include <sys/mman.h>
22 #include <sys/param.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 #include <mqueue.h>
26 #include <semaphore.h>
27
28 /* The prefix_len is the length of the path prefix ncluding trailing "/"
29 (or "/sem." for semaphores) as well as the trailing NUL. */
30 static struct
31 {
32 const char *prefix;
33 const size_t prefix_len;
34 const char *description;
35 } ipc_names[] = {
36 { "/dev/shm", 10, "POSIX shared memory object" },
37 { "/dev/mqueue", 13, "POSIX message queue" },
38 { "/dev/shm", 14, "POSIX semaphore" }
39 };
40
41 enum ipc_type_t
42 {
43 shmem,
44 mqueue,
45 semaphore
46 };
47
48 static bool
49 check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
50 {
51 /* Note that we require the existance of the appropriate /dev subdirectories
52 for POSIX IPC object support, similar to Linux (which supports the
53 directories, but doesn't require to mount them). We don't create
54 these directory here, that's the task of the installer. But we check
55 for existance and give ample warning. */
56 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
57 if (path.error || !path.exists () || !path.isdir ())
58 {
59 small_printf (
60 "Warning: '%s' does not exists or is not a directory.\n\n"
61 "%ss require the existance of this directory.\n"
62 "Create the directory '%s' and set the permissions to 01777.\n"
63 "For instance on the command line: mkdir -m 01777 %s\n",
64 ipc_names[type].prefix, ipc_names[type].description,
65 ipc_names[type].prefix, ipc_names[type].prefix);
66 set_errno (EINVAL);
67 return false;
68 }
69 /* Name must not be empty, or just be a single slash, or start with more
70 than one slash. Same for backslash.
71 Apart from handling backslash like slash, the naming rules are identical
72 to Linux, including the names and requirements for subdirectories, if
73 the name contains further slashes. */
74 if (!name || (strchr ("/\\", name[0])
75 && (!name[1] || strchr ("/\\", name[1]))))
76 {
77 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
78 set_errno (EINVAL);
79 return false;
80 }
81 /* Skip leading (back-)slash. */
82 if (strchr ("/\\", name[0]))
83 ++name;
84 if (len > PATH_MAX - ipc_names[type].prefix_len)
85 {
86 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
87 set_errno (ENAMETOOLONG);
88 return false;
89 }
90 __small_sprintf (res_name, "%s/%s%s", ipc_names[type].prefix,
91 type == semaphore ? "sem." : "",
92 name);
93 return true;
94 }
95
96 static int
97 ipc_mutex_init (HANDLE *pmtx, const char *name)
98 {
99 WCHAR buf[MAX_PATH];
100 UNICODE_STRING uname;
101 OBJECT_ATTRIBUTES attr;
102 NTSTATUS status;
103
104 __small_swprintf (buf, L"mqueue/mtx_%s", name);
105 RtlInitUnicodeString (&uname, buf);
106 InitializeObjectAttributes (&attr, &uname,
107 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
108 get_shared_parent_dir (),
109 everyone_sd (CYG_MUTANT_ACCESS));
110 status = NtCreateMutant (pmtx, CYG_MUTANT_ACCESS, &attr, FALSE);
111 if (!NT_SUCCESS (status))
112 {
113 debug_printf ("NtCreateMutant: %p", status);
114 return geterrno_from_win_error (RtlNtStatusToDosError (status));
115 }
116 return 0;
117 }
118
119 static int
120 ipc_mutex_lock (HANDLE mtx)
121 {
122 HANDLE h[2] = { mtx, signal_arrived };
123
124 switch (WaitForMultipleObjects (2, h, FALSE, INFINITE))
125 {
126 case WAIT_OBJECT_0:
127 case WAIT_ABANDONED_0:
128 return 0;
129 case WAIT_OBJECT_0 + 1:
130 set_errno (EINTR);
131 return 1;
132 default:
133 break;
134 }
135 return geterrno_from_win_error ();
136 }
137
138 static inline int
139 ipc_mutex_unlock (HANDLE mtx)
140 {
141 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
142 }
143
144 static inline int
145 ipc_mutex_close (HANDLE mtx)
146 {
147 return CloseHandle (mtx) ? 0 : geterrno_from_win_error ();
148 }
149
150 static int
151 ipc_cond_init (HANDLE *pevt, const char *name, char sr)
152 {
153 WCHAR buf[MAX_PATH];
154 UNICODE_STRING uname;
155 OBJECT_ATTRIBUTES attr;
156 NTSTATUS status;
157
158 __small_swprintf (buf, L"mqueue/evt_%s%c", name, sr);
159 RtlInitUnicodeString (&uname, buf);
160 InitializeObjectAttributes (&attr, &uname,
161 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
162 get_shared_parent_dir (),
163 everyone_sd (CYG_EVENT_ACCESS));
164 status = NtCreateEvent (pevt, CYG_EVENT_ACCESS, &attr,
165 NotificationEvent, FALSE);
166 if (!NT_SUCCESS (status))
167 {
168 debug_printf ("NtCreateEvent: %p", status);
169 return geterrno_from_win_error (RtlNtStatusToDosError (status));
170 }
171 return 0;
172 }
173
174 static int
175 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
176 {
177 struct timeval tv;
178 DWORD timeout;
179 HANDLE h[2] = { mtx, evt };
180
181 if (!abstime)
182 timeout = INFINITE;
183 else if (abstime->tv_sec < 0
184 || abstime->tv_nsec < 0
185 || abstime->tv_nsec > 999999999)
186 return EINVAL;
187 else
188 {
189 gettimeofday (&tv, NULL);
190 /* Check for immediate timeout. */
191 if (tv.tv_sec > abstime->tv_sec
192 || (tv.tv_sec == abstime->tv_sec
193 && tv.tv_usec > abstime->tv_nsec / 1000))
194 return ETIMEDOUT;
195 timeout = (abstime->tv_sec - tv.tv_sec) * 1000;
196 timeout += (abstime->tv_nsec / 1000 - tv.tv_usec) / 1000;
197 }
198 ResetEvent (evt);
199 if (ipc_mutex_unlock (mtx))
200 return -1;
201 switch (WaitForMultipleObjects (2, h, TRUE, timeout))
202 {
203 case WAIT_OBJECT_0:
204 case WAIT_ABANDONED_0:
205 return 0;
206 case WAIT_TIMEOUT:
207 ipc_mutex_lock (mtx);
208 return ETIMEDOUT;
209 default:
210 break;
211 }
212 return geterrno_from_win_error ();
213 }
214
215 static inline int
216 ipc_cond_signal (HANDLE evt)
217 {
218 return SetEvent (evt) ? 0 : geterrno_from_win_error ();
219 }
220
221 static inline int
222 ipc_cond_close (HANDLE evt)
223 {
224 return CloseHandle (evt) ? 0 : geterrno_from_win_error ();
225 }
226
227 class ipc_flock
228 {
229 struct __flock64 fl;
230
231 public:
232 ipc_flock () { memset (&fl, 0, sizeof fl); }
233
234 int lock (int fd, size_t size)
235 {
236 fl.l_type = F_WRLCK;
237 fl.l_whence = SEEK_SET;
238 fl.l_start = 0;
239 fl.l_len = size;
240 return fcntl64 (fd, F_SETLKW, &fl);
241 }
242 int unlock (int fd)
243 {
244 if (!fl.l_len)
245 return 0;
246 fl.l_type = F_UNLCK;
247 return fcntl64 (fd, F_SETLKW, &fl);
248 }
249 };
250
251 /* POSIX shared memory object implementation. */
252
253 extern "C" int
254 shm_open (const char *name, int oflag, mode_t mode)
255 {
256 size_t len = strlen (name);
257 char shmname[ipc_names[shmem].prefix_len + len];
258
259 if (!check_path (shmname, shmem, name, len))
260 return -1;
261
262 /* Check for valid flags. */
263 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
264 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
265 {
266 debug_printf ("Invalid oflag 0%o", oflag);
267 set_errno (EINVAL);
268 return -1;
269 }
270
271 return open (shmname, oflag | O_CLOEXEC, mode & 0777);
272 }
273
274 extern "C" int
275 shm_unlink (const char *name)
276 {
277 size_t len = strlen (name);
278 char shmname[ipc_names[shmem].prefix_len + len];
279
280 if (!check_path (shmname, shmem, name, len))
281 return -1;
282
283 return unlink (shmname);
284 }
285
286 /* The POSIX message queue implementation is based on W. Richard STEVENS
287 implementation, just tweaked for Cygwin. The main change is
288 the usage of Windows mutexes and events instead of using the pthread
289 synchronization objects. The pathname is massaged so that the
290 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
291 are implemented additionally. */
292
293 struct mq_hdr
294 {
295 struct mq_attr mqh_attr; /* the queue's attributes */
296 long mqh_head; /* index of first message */
297 long mqh_free; /* index of first free message */
298 long mqh_nwait; /* #threads blocked in mq_receive() */
299 pid_t mqh_pid; /* nonzero PID if mqh_event set */
300 char mqh_uname[36]; /* unique name used to identify synchronization
301 objects connected to this queue */
302 struct sigevent mqh_event; /* for mq_notify() */
303 };
304
305 struct msg_hdr
306 {
307 long msg_next; /* index of next on linked list */
308 ssize_t msg_len; /* actual length */
309 unsigned int msg_prio; /* priority */
310 };
311
312 struct mq_info
313 {
314 struct mq_hdr *mqi_hdr; /* start of mmap'ed region */
315 unsigned long mqi_magic; /* magic number if open */
316 int mqi_flags; /* flags for this process */
317 HANDLE mqi_lock; /* mutex lock */
318 HANDLE mqi_waitsend; /* and condition variable for full queue */
319 HANDLE mqi_waitrecv; /* and condition variable for empty queue */
320 };
321
322 #define MQI_MAGIC 0x98765432UL
323
324 #define MSGSIZE(i) roundup((i), sizeof(long))
325
326 #define MAX_TRIES 10 /* for waiting for initialization */
327
328 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
329
330 extern "C" _off64_t lseek64 (int, _off64_t, int);
331 extern "C" void *mmap64 (void *, size_t, int, int, int, _off64_t);
332
333 extern "C" mqd_t
334 mq_open (const char *name, int oflag, ...)
335 {
336 int i, fd = -1, nonblock, created;
337 long msgsize, index;
338 _off64_t filesize = 0;
339 va_list ap;
340 mode_t mode;
341 int8_t *mptr;
342 struct __stat64 statbuff;
343 struct mq_hdr *mqhdr;
344 struct msg_hdr *msghdr;
345 struct mq_attr *attr;
346 struct mq_info *mqinfo;
347 LUID luid;
348
349 size_t len = strlen (name);
350 char mqname[ipc_names[mqueue].prefix_len + len];
351
352 if (!check_path (mqname, mqueue, name, len))
353 return (mqd_t) -1;
354
355 myfault efault;
356 if (efault.faulted (EFAULT))
357 return (mqd_t) -1;
358
359 oflag &= (O_CREAT | O_EXCL | O_NONBLOCK);
360 created = 0;
361 nonblock = oflag & O_NONBLOCK;
362 oflag &= ~O_NONBLOCK;
363 mptr = (int8_t *) MAP_FAILED;
364 mqinfo = NULL;
365
366 again:
367 if (oflag & O_CREAT)
368 {
369 va_start (ap, oflag); /* init ap to final named argument */
370 mode = va_arg (ap, mode_t) & ~S_IXUSR;
371 attr = va_arg (ap, struct mq_attr *);
372 va_end (ap);
373
374 /* Open and specify O_EXCL and user-execute */
375 fd = open (mqname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
376 if (fd < 0)
377 {
378 if (errno == EEXIST && (oflag & O_EXCL) == 0)
379 goto exists; /* already exists, OK */
380 return (mqd_t) -1;
381 }
382 created = 1;
383 /* First one to create the file initializes it */
384 if (attr == NULL)
385 attr = &defattr;
386 else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
387 {
388 set_errno (EINVAL);
389 goto err;
390 }
391 /* Calculate and set the file size */
392 msgsize = MSGSIZE (attr->mq_msgsize);
393 filesize = sizeof (struct mq_hdr)
394 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
395 if (lseek64 (fd, filesize - 1, SEEK_SET) == -1)
396 goto err;
397 if (write (fd, "", 1) == -1)
398 goto err;
399
400 /* Memory map the file */
401 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
402 MAP_SHARED, fd, 0);
403 if (mptr == (int8_t *) MAP_FAILED)
404 goto err;
405
406 /* Allocate one mq_info{} for the queue */
407 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
408 goto err;
409 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
410 mqinfo->mqi_magic = MQI_MAGIC;
411 mqinfo->mqi_flags = nonblock;
412
413 /* Initialize header at beginning of file */
414 /* Create free list with all messages on it */
415 mqhdr->mqh_attr.mq_flags = 0;
416 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
417 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
418 mqhdr->mqh_attr.mq_curmsgs = 0;
419 mqhdr->mqh_nwait = 0;
420 mqhdr->mqh_pid = 0;
421 if (!AllocateLocallyUniqueId (&luid))
422 {
423 __seterrno ();
424 goto err;
425 }
426 __small_sprintf (mqhdr->mqh_uname, "%016X%08x%08x",
427 hash_path_name (0,mqname),
428 luid.HighPart, luid.LowPart);
429 mqhdr->mqh_head = 0;
430 index = sizeof (struct mq_hdr);
431 mqhdr->mqh_free = index;
432 for (i = 0; i < attr->mq_maxmsg - 1; i++)
433 {
434 msghdr = (struct msg_hdr *) &mptr[index];
435 index += sizeof (struct msg_hdr) + msgsize;
436 msghdr->msg_next = index;
437 }
438 msghdr = (struct msg_hdr *) &mptr[index];
439 msghdr->msg_next = 0; /* end of free list */
440
441 /* Initialize mutex & condition variables */
442 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
443 if (i != 0)
444 goto pthreaderr;
445
446 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
447 if (i != 0)
448 goto pthreaderr;
449
450 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
451 if (i != 0)
452 goto pthreaderr;
453
454 /* Initialization complete, turn off user-execute bit */
455 if (fchmod (fd, mode) == -1)
456 goto err;
457 close (fd);
458 return ((mqd_t) mqinfo);
459 }
460
461 exists:
462 /* Open the file then memory map */
463 if ((fd = open (mqname, O_RDWR | O_CLOEXEC)) < 0)
464 {
465 if (errno == ENOENT && (oflag & O_CREAT))
466 goto again;
467 goto err;
468 }
469 /* Make certain initialization is complete */
470 for (i = 0; i < MAX_TRIES; i++)
471 {
472 if (stat64 (mqname, &statbuff) == -1)
473 {
474 if (errno == ENOENT && (oflag & O_CREAT))
475 {
476 close (fd);
477 fd = -1;
478 goto again;
479 }
480 goto err;
481 }
482 if ((statbuff.st_mode & S_IXUSR) == 0)
483 break;
484 sleep (1);
485 }
486 if (i == MAX_TRIES)
487 {
488 set_errno (ETIMEDOUT);
489 goto err;
490 }
491
492 filesize = statbuff.st_size;
493 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
494 MAP_SHARED, fd, 0);
495 if (mptr == (int8_t *) MAP_FAILED)
496 goto err;
497 close (fd);
498 fd = -1;
499
500 /* Allocate one mq_info{} for each open */
501 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
502 goto err;
503 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
504 mqinfo->mqi_magic = MQI_MAGIC;
505 mqinfo->mqi_flags = nonblock;
506
507 /* Initialize mutex & condition variable */
508 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
509 if (i != 0)
510 goto pthreaderr;
511
512 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
513 if (i != 0)
514 goto pthreaderr;
515
516 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
517 if (i != 0)
518 goto pthreaderr;
519
520 return (mqd_t) mqinfo;
521
522 pthreaderr:
523 errno = i;
524 err:
525 /* Don't let following function calls change errno */
526 save_errno save;
527
528 if (created)
529 unlink (mqname);
530 if (mptr != (int8_t *) MAP_FAILED)
531 munmap((void *) mptr, (size_t) filesize);
532 if (mqinfo)
533 {
534 if (mqinfo->mqi_lock)
535 ipc_mutex_close (mqinfo->mqi_lock);
536 if (mqinfo->mqi_waitsend)
537 ipc_cond_close (mqinfo->mqi_waitsend);
538 if (mqinfo->mqi_waitrecv)
539 ipc_cond_close (mqinfo->mqi_waitrecv);
540 free (mqinfo);
541 }
542 if (fd >= 0)
543 close (fd);
544 return (mqd_t) -1;
545 }
546
547 extern "C" int
548 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
549 {
550 int n;
551 struct mq_hdr *mqhdr;
552 struct mq_attr *attr;
553 struct mq_info *mqinfo;
554
555 myfault efault;
556 if (efault.faulted (EBADF))
557 return -1;
558
559 mqinfo = (struct mq_info *) mqd;
560 if (mqinfo->mqi_magic != MQI_MAGIC)
561 {
562 set_errno (EBADF);
563 return -1;
564 }
565 mqhdr = mqinfo->mqi_hdr;
566 attr = &mqhdr->mqh_attr;
567 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
568 {
569 errno = n;
570 return -1;
571 }
572 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
573 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
574 mqstat->mq_msgsize = attr->mq_msgsize;
575 mqstat->mq_curmsgs = attr->mq_curmsgs;
576
577 ipc_mutex_unlock (mqinfo->mqi_lock);
578 return 0;
579 }
580
581 extern "C" int
582 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
583 {
584 int n;
585 struct mq_hdr *mqhdr;
586 struct mq_attr *attr;
587 struct mq_info *mqinfo;
588
589 myfault efault;
590 if (efault.faulted (EBADF))
591 return -1;
592
593 mqinfo = (struct mq_info *) mqd;
594 if (mqinfo->mqi_magic != MQI_MAGIC)
595 {
596 set_errno (EBADF);
597 return -1;
598 }
599 mqhdr = mqinfo->mqi_hdr;
600 attr = &mqhdr->mqh_attr;
601 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
602 {
603 errno = n;
604 return -1;
605 }
606
607 if (omqstat != NULL)
608 {
609 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
610 omqstat->mq_maxmsg = attr->mq_maxmsg;
611 omqstat->mq_msgsize = attr->mq_msgsize;
612 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
613 }
614
615 if (mqstat->mq_flags & O_NONBLOCK)
616 mqinfo->mqi_flags |= O_NONBLOCK;
617 else
618 mqinfo->mqi_flags &= ~O_NONBLOCK;
619
620 ipc_mutex_unlock (mqinfo->mqi_lock);
621 return 0;
622 }
623
624 extern "C" int
625 mq_notify (mqd_t mqd, const struct sigevent *notification)
626 {
627 int n;
628 pid_t pid;
629 struct mq_hdr *mqhdr;
630 struct mq_info *mqinfo;
631
632 myfault efault;
633 if (efault.faulted (EBADF))
634 return -1;
635
636 mqinfo = (struct mq_info *) mqd;
637 if (mqinfo->mqi_magic != MQI_MAGIC)
638 {
639 set_errno (EBADF);
640 return -1;
641 }
642 mqhdr = mqinfo->mqi_hdr;
643 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
644 {
645 errno = n;
646 return -1;
647 }
648
649 pid = getpid ();
650 if (!notification)
651 {
652 if (mqhdr->mqh_pid == pid)
653 mqhdr->mqh_pid = 0; /* unregister calling process */
654 }
655 else
656 {
657 if (mqhdr->mqh_pid != 0)
658 {
659 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
660 {
661 set_errno (EBUSY);
662 ipc_mutex_unlock (mqinfo->mqi_lock);
663 return -1;
664 }
665 }
666 mqhdr->mqh_pid = pid;
667 mqhdr->mqh_event = *notification;
668 }
669 ipc_mutex_unlock (mqinfo->mqi_lock);
670 return 0;
671 }
672
673 static int
674 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
675 const struct timespec *abstime)
676 {
677 int n;
678 long index, freeindex;
679 int8_t *mptr;
680 struct sigevent *sigev;
681 struct mq_hdr *mqhdr;
682 struct mq_attr *attr;
683 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
684 struct mq_info *mqinfo;
685
686 myfault efault;
687 if (efault.faulted (EBADF))
688 return -1;
689
690 mqinfo = (struct mq_info *) mqd;
691 if (mqinfo->mqi_magic != MQI_MAGIC)
692 {
693 set_errno (EBADF);
694 return -1;
695 }
696 if (prio > MQ_PRIO_MAX)
697 {
698 set_errno (EINVAL);
699 return -1;
700 }
701
702 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
703 mptr = (int8_t *) mqhdr; /* byte pointer */
704 attr = &mqhdr->mqh_attr;
705 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
706 {
707 errno = n;
708 return -1;
709 }
710
711 if (len > (size_t) attr->mq_msgsize)
712 {
713 set_errno (EMSGSIZE);
714 goto err;
715 }
716 if (attr->mq_curmsgs == 0)
717 {
718 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
719 {
720 sigev = &mqhdr->mqh_event;
721 if (sigev->sigev_notify == SIGEV_SIGNAL)
722 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, sigev->sigev_value);
723 mqhdr->mqh_pid = 0; /* unregister */
724 }
725 }
726 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
727 {
728 /* Queue is full */
729 if (mqinfo->mqi_flags & O_NONBLOCK)
730 {
731 set_errno (EAGAIN);
732 goto err;
733 }
734 /* Wait for room for one message on the queue */
735 while (attr->mq_curmsgs >= attr->mq_maxmsg)
736 ipc_cond_timedwait (mqinfo->mqi_waitsend, mqinfo->mqi_lock, abstime);
737 }
738
739 /* nmsghdr will point to new message */
740 if ((freeindex = mqhdr->mqh_free) == 0)
741 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
742
743 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
744 nmsghdr->msg_prio = prio;
745 nmsghdr->msg_len = len;
746 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
747 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
748
749 /* Find right place for message in linked list */
750 index = mqhdr->mqh_head;
751 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
752 while (index)
753 {
754 msghdr = (struct msg_hdr *) &mptr[index];
755 if (prio > msghdr->msg_prio)
756 {
757 nmsghdr->msg_next = index;
758 pmsghdr->msg_next = freeindex;
759 break;
760 }
761 index = msghdr->msg_next;
762 pmsghdr = msghdr;
763 }
764 if (index == 0)
765 {
766 /* Queue was empty or new goes at end of list */
767 pmsghdr->msg_next = freeindex;
768 nmsghdr->msg_next = 0;
769 }
770 /* Wake up anyone blocked in mq_receive waiting for a message */
771 if (attr->mq_curmsgs == 0)
772 ipc_cond_signal (mqinfo->mqi_waitrecv);
773 attr->mq_curmsgs++;
774
775 ipc_mutex_unlock (mqinfo->mqi_lock);
776 return 0;
777
778 err:
779 ipc_mutex_unlock (mqinfo->mqi_lock);
780 return -1;
781 }
782
783 extern "C" int
784 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
785 {
786 return _mq_send (mqd, ptr, len, prio, NULL);
787 }
788
789 extern "C" int
790 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
791 const struct timespec *abstime)
792 {
793 return _mq_send (mqd, ptr, len, prio, abstime);
794 }
795
796 static ssize_t
797 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
798 const struct timespec *abstime)
799 {
800 int n;
801 long index;
802 int8_t *mptr;
803 ssize_t len;
804 struct mq_hdr *mqhdr;
805 struct mq_attr *attr;
806 struct msg_hdr *msghdr;
807 struct mq_info *mqinfo;
808
809 myfault efault;
810 if (efault.faulted (EBADF))
811 return -1;
812
813 mqinfo = (struct mq_info *) mqd;
814 if (mqinfo->mqi_magic != MQI_MAGIC)
815 {
816 set_errno (EBADF);
817 return -1;
818 }
819 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
820 mptr = (int8_t *) mqhdr; /* byte pointer */
821 attr = &mqhdr->mqh_attr;
822 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
823 {
824 errno = n;
825 return -1;
826 }
827
828 if (maxlen < (size_t) attr->mq_msgsize)
829 {
830 set_errno (EMSGSIZE);
831 goto err;
832 }
833 if (attr->mq_curmsgs == 0) /* queue is empty */
834 {
835 if (mqinfo->mqi_flags & O_NONBLOCK)
836 {
837 set_errno (EAGAIN);
838 goto err;
839 }
840 /* Wait for a message to be placed onto queue */
841 mqhdr->mqh_nwait++;
842 while (attr->mq_curmsgs == 0)
843 ipc_cond_timedwait (mqinfo->mqi_waitrecv, mqinfo->mqi_lock, abstime);
844 mqhdr->mqh_nwait--;
845 }
846
847 if ((index = mqhdr->mqh_head) == 0)
848 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
849
850 msghdr = (struct msg_hdr *) &mptr[index];
851 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
852 len = msghdr->msg_len;
853 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
854 if (priop != NULL)
855 *priop = msghdr->msg_prio;
856
857 /* Just-read message goes to front of free list */
858 msghdr->msg_next = mqhdr->mqh_free;
859 mqhdr->mqh_free = index;
860
861 /* Wake up anyone blocked in mq_send waiting for room */
862 if (attr->mq_curmsgs == attr->mq_maxmsg)
863 ipc_cond_signal (mqinfo->mqi_waitsend);
864 attr->mq_curmsgs--;
865
866 ipc_mutex_unlock (mqinfo->mqi_lock);
867 return len;
868
869 err:
870 ipc_mutex_unlock (mqinfo->mqi_lock);
871 return -1;
872 }
873
874 extern "C" ssize_t
875 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
876 {
877 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
878 }
879
880 extern "C" ssize_t
881 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
882 const struct timespec *abstime)
883 {
884 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
885 }
886
887 extern "C" int
888 mq_close (mqd_t mqd)
889 {
890 long msgsize, filesize;
891 struct mq_hdr *mqhdr;
892 struct mq_attr *attr;
893 struct mq_info *mqinfo;
894
895 myfault efault;
896 if (efault.faulted (EBADF))
897 return -1;
898
899 mqinfo = (struct mq_info *) mqd;
900 if (mqinfo->mqi_magic != MQI_MAGIC)
901 {
902 set_errno (EBADF);
903 return -1;
904 }
905 mqhdr = mqinfo->mqi_hdr;
906 attr = &mqhdr->mqh_attr;
907
908 if (mq_notify (mqd, NULL)) /* unregister calling process */
909 return -1;
910
911 msgsize = MSGSIZE (attr->mq_msgsize);
912 filesize = sizeof (struct mq_hdr)
913 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
914 if (munmap (mqinfo->mqi_hdr, filesize) == -1)
915 return -1;
916
917 mqinfo->mqi_magic = 0; /* just in case */
918 ipc_cond_close (mqinfo->mqi_waitsend);
919 ipc_cond_close (mqinfo->mqi_waitrecv);
920 ipc_mutex_close (mqinfo->mqi_lock);
921 free (mqinfo);
922 return 0;
923 }
924
925 extern "C" int
926 mq_unlink (const char *name)
927 {
928 size_t len = strlen (name);
929 char mqname[ipc_names[mqueue].prefix_len + len];
930
931 if (!check_path (mqname, mqueue, name, len))
932 return -1;
933 if (unlink (mqname) == -1)
934 return -1;
935 return 0;
936 }
937
938 /* POSIX named semaphore implementation. Loosely based on Richard W. STEPHENS
939 implementation as far as sem_open is concerned, but under the hood using
940 the already existing semaphore class in thread.cc. Using a file backed
941 solution allows to implement kernel persistent named semaphores. */
942
943 struct sem_finfo
944 {
945 unsigned int value;
946 unsigned long long hash;
947 LUID luid;
948 };
949
950 extern "C" sem_t *
951 sem_open (const char *name, int oflag, ...)
952 {
953 int i, fd = -1, created;
954 va_list ap;
955 mode_t mode = 0;
956 unsigned int value = 0;
957 struct __stat64 statbuff;
958 sem_t *sem = SEM_FAILED;
959 sem_finfo sf;
960 bool wasopen = false;
961 ipc_flock file;
962
963 size_t len = strlen (name);
964 char semname[ipc_names[semaphore].prefix_len + len];
965
966 if (!check_path (semname, semaphore, name, len))
967 return SEM_FAILED;
968
969 myfault efault;
970 if (efault.faulted (EFAULT))
971 return SEM_FAILED;
972
973 created = 0;
974 oflag &= (O_CREAT | O_EXCL);
975
976 again:
977 if (oflag & O_CREAT)
978 {
979 va_start (ap, oflag); /* init ap to final named argument */
980 mode = va_arg (ap, mode_t) & ~S_IXUSR;
981 value = va_arg (ap, unsigned int);
982 va_end (ap);
983
984 /* Open and specify O_EXCL and user-execute */
985 fd = open (semname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
986 if (fd < 0)
987 {
988 if (errno == EEXIST && (oflag & O_EXCL) == 0)
989 goto exists; /* already exists, OK */
990 return SEM_FAILED;
991 }
992 created = 1;
993 /* First one to create the file initializes it. */
994 if (!AllocateLocallyUniqueId (&sf.luid))
995 {
996 __seterrno ();
997 goto err;
998 }
999 sf.value = value;
1000 sf.hash = hash_path_name (0, semname);
1001 if (write (fd, &sf, sizeof sf) != sizeof sf)
1002 goto err;
1003 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, value, wasopen);
1004 if (sem == SEM_FAILED)
1005 goto err;
1006 /* Initialization complete, turn off user-execute bit */
1007 if (fchmod (fd, mode) == -1)
1008 goto err;
1009 /* Don't close (fd); */
1010 return sem;
1011 }
1012
1013 exists:
1014 /* Open the file and fetch the semaphore name. */
1015 if ((fd = open (semname, O_RDWR | O_CLOEXEC)) < 0)
1016 {
1017 if (errno == ENOENT && (oflag & O_CREAT))
1018 goto again;
1019 goto err;
1020 }
1021 /* Make certain initialization is complete */
1022 for (i = 0; i < MAX_TRIES; i++)
1023 {
1024 if (stat64 (semname, &statbuff) == -1)
1025 {
1026 if (errno == ENOENT && (oflag & O_CREAT))
1027 {
1028 close (fd);
1029 fd = -1;
1030 goto again;
1031 }
1032 goto err;
1033 }
1034 if ((statbuff.st_mode & S_IXUSR) == 0)
1035 break;
1036 sleep (1);
1037 }
1038 if (i == MAX_TRIES)
1039 {
1040 set_errno (ETIMEDOUT);
1041 goto err;
1042 }
1043 if (file.lock (fd, sizeof sf))
1044 goto err;
1045 if (read (fd, &sf, sizeof sf) != sizeof sf)
1046 goto err;
1047 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, sf.value, wasopen);
1048 file.unlock (fd);
1049 if (sem == SEM_FAILED)
1050 goto err;
1051 /* If wasopen is set, the semaphore was already opened and we already have
1052 an open file descriptor pointing to the file. This means, we have to
1053 close the file descriptor created in this call. It won't be stored
1054 anywhere anyway. */
1055 if (wasopen)
1056 close (fd);
1057 return sem;
1058
1059 err:
1060 /* Don't let following function calls change errno */
1061 save_errno save;
1062
1063 file.unlock (fd);
1064 if (created)
1065 unlink (semname);
1066 if (sem != SEM_FAILED)
1067 semaphore::close (sem);
1068 if (fd >= 0)
1069 close (fd);
1070 return SEM_FAILED;
1071 }
1072
1073 int
1074 _sem_close (sem_t *sem, bool do_close)
1075 {
1076 sem_finfo sf;
1077 int fd, ret = -1;
1078 ipc_flock file;
1079
1080 if (semaphore::getinternal (sem, &fd, &sf.hash, &sf.luid, &sf.value) == -1)
1081 return -1;
1082 if (!file.lock (fd, sizeof sf)
1083 && lseek64 (fd, 0LL, SEEK_SET) != (_off64_t) -1
1084 && write (fd, &sf, sizeof sf) == sizeof sf)
1085 ret = do_close ? semaphore::close (sem) : 0;
1086
1087 /* Don't let following function calls change errno */
1088 save_errno save;
1089 file.unlock (fd);
1090 close (fd);
1091
1092 return ret;
1093 }
1094
1095 extern "C" int
1096 sem_close (sem_t *sem)
1097 {
1098 return _sem_close (sem, true);
1099 }
1100
1101 extern "C" int
1102 sem_unlink (const char *name)
1103 {
1104 size_t len = strlen (name);
1105 char semname[ipc_names[semaphore].prefix_len + len];
1106
1107 if (!check_path (semname, semaphore, name, len))
1108 return -1;
1109 if (unlink (semname) == -1)
1110 return -1;
1111 return 0;
1112 }
This page took 0.08828 seconds and 6 git commands to generate.