]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
* posix_ipc.cc (ipc_cond_timedwait): If ipc_mutex_unlock fails, return
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 Copyright 2007, 2008, 2009, 2010, 2011 Red Hat, Inc.
4
5 This file is part of Cygwin.
6
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9 details. */
10
11 #include "winsup.h"
12 #include "shared_info.h"
13 #include "thread.h"
14 #include "path.h"
15 #include "cygtls.h"
16 #include "fhandler.h"
17 #include "dtable.h"
18 #include "cygheap.h"
19 #include "sigproc.h"
20 #include "ntdll.h"
21 #include <sys/mman.h>
22 #include <sys/param.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 #include <mqueue.h>
26 #include <semaphore.h>
27
28 /* The prefix_len is the length of the path prefix ncluding trailing "/"
29 (or "/sem." for semaphores) as well as the trailing NUL. */
30 static struct
31 {
32 const char *prefix;
33 const size_t prefix_len;
34 const char *description;
35 } ipc_names[] = {
36 { "/dev/shm", 10, "POSIX shared memory object" },
37 { "/dev/mqueue", 13, "POSIX message queue" },
38 { "/dev/shm", 14, "POSIX semaphore" }
39 };
40
41 enum ipc_type_t
42 {
43 shmem,
44 mqueue,
45 semaphore
46 };
47
48 static bool
49 check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
50 {
51 /* Note that we require the existance of the appropriate /dev subdirectories
52 for POSIX IPC object support, similar to Linux (which supports the
53 directories, but doesn't require to mount them). We don't create
54 these directory here, that's the task of the installer. But we check
55 for existance and give ample warning. */
56 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
57 if (path.error || !path.exists () || !path.isdir ())
58 {
59 small_printf (
60 "Warning: '%s' does not exists or is not a directory.\n\n"
61 "%ss require the existance of this directory.\n"
62 "Create the directory '%s' and set the permissions to 01777.\n"
63 "For instance on the command line: mkdir -m 01777 %s\n",
64 ipc_names[type].prefix, ipc_names[type].description,
65 ipc_names[type].prefix, ipc_names[type].prefix);
66 set_errno (EINVAL);
67 return false;
68 }
69 /* Name must not be empty, or just be a single slash, or start with more
70 than one slash. Same for backslash.
71 Apart from handling backslash like slash, the naming rules are identical
72 to Linux, including the names and requirements for subdirectories, if
73 the name contains further slashes. */
74 if (!name || (strchr ("/\\", name[0])
75 && (!name[1] || strchr ("/\\", name[1]))))
76 {
77 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
78 set_errno (EINVAL);
79 return false;
80 }
81 /* Skip leading (back-)slash. */
82 if (strchr ("/\\", name[0]))
83 ++name;
84 if (len > PATH_MAX - ipc_names[type].prefix_len)
85 {
86 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
87 set_errno (ENAMETOOLONG);
88 return false;
89 }
90 __small_sprintf (res_name, "%s/%s%s", ipc_names[type].prefix,
91 type == semaphore ? "sem." : "",
92 name);
93 return true;
94 }
95
96 static int
97 ipc_mutex_init (HANDLE *pmtx, const char *name)
98 {
99 WCHAR buf[MAX_PATH];
100 UNICODE_STRING uname;
101 OBJECT_ATTRIBUTES attr;
102 NTSTATUS status;
103
104 __small_swprintf (buf, L"mqueue/mtx_%s", name);
105 RtlInitUnicodeString (&uname, buf);
106 InitializeObjectAttributes (&attr, &uname,
107 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
108 get_shared_parent_dir (),
109 everyone_sd (CYG_MUTANT_ACCESS));
110 status = NtCreateMutant (pmtx, CYG_MUTANT_ACCESS, &attr, FALSE);
111 if (!NT_SUCCESS (status))
112 {
113 debug_printf ("NtCreateMutant: %p", status);
114 return geterrno_from_win_error (RtlNtStatusToDosError (status));
115 }
116 return 0;
117 }
118
119 static int
120 ipc_mutex_lock (HANDLE mtx)
121 {
122 HANDLE h[2] = { mtx, signal_arrived };
123
124 switch (WaitForMultipleObjects (2, h, FALSE, INFINITE))
125 {
126 case WAIT_OBJECT_0:
127 case WAIT_ABANDONED_0:
128 return 0;
129 case WAIT_OBJECT_0 + 1:
130 set_errno (EINTR);
131 return 1;
132 default:
133 break;
134 }
135 return geterrno_from_win_error ();
136 }
137
138 static inline int
139 ipc_mutex_unlock (HANDLE mtx)
140 {
141 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
142 }
143
144 static inline int
145 ipc_mutex_close (HANDLE mtx)
146 {
147 return CloseHandle (mtx) ? 0 : geterrno_from_win_error ();
148 }
149
150 static int
151 ipc_cond_init (HANDLE *pevt, const char *name, char sr)
152 {
153 WCHAR buf[MAX_PATH];
154 UNICODE_STRING uname;
155 OBJECT_ATTRIBUTES attr;
156 NTSTATUS status;
157
158 __small_swprintf (buf, L"mqueue/evt_%s%c", name, sr);
159 RtlInitUnicodeString (&uname, buf);
160 InitializeObjectAttributes (&attr, &uname,
161 OBJ_INHERIT | OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
162 get_shared_parent_dir (),
163 everyone_sd (CYG_EVENT_ACCESS));
164 status = NtCreateEvent (pevt, CYG_EVENT_ACCESS, &attr,
165 NotificationEvent, FALSE);
166 if (!NT_SUCCESS (status))
167 {
168 debug_printf ("NtCreateEvent: %p", status);
169 return geterrno_from_win_error (RtlNtStatusToDosError (status));
170 }
171 return 0;
172 }
173
174 static int
175 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
176 {
177 struct timeval tv;
178 DWORD timeout;
179 HANDLE h[2] = { mtx, evt };
180 int err;
181
182 if (!abstime)
183 timeout = INFINITE;
184 else if (abstime->tv_sec < 0
185 || abstime->tv_nsec < 0
186 || abstime->tv_nsec > 999999999)
187 return EINVAL;
188 else
189 {
190 gettimeofday (&tv, NULL);
191 /* Check for immediate timeout. */
192 if (tv.tv_sec > abstime->tv_sec
193 || (tv.tv_sec == abstime->tv_sec
194 && tv.tv_usec > abstime->tv_nsec / 1000))
195 return ETIMEDOUT;
196 timeout = (abstime->tv_sec - tv.tv_sec) * 1000;
197 timeout += (abstime->tv_nsec / 1000 - tv.tv_usec) / 1000;
198 }
199 ResetEvent (evt);
200 if ((err = ipc_mutex_unlock (mtx)) != 0)
201 return err;
202 switch (WaitForMultipleObjects (2, h, TRUE, timeout))
203 {
204 case WAIT_OBJECT_0:
205 case WAIT_ABANDONED_0:
206 return 0;
207 case WAIT_TIMEOUT:
208 ipc_mutex_lock (mtx);
209 return ETIMEDOUT;
210 default:
211 break;
212 }
213 return geterrno_from_win_error ();
214 }
215
216 static inline int
217 ipc_cond_signal (HANDLE evt)
218 {
219 return SetEvent (evt) ? 0 : geterrno_from_win_error ();
220 }
221
222 static inline int
223 ipc_cond_close (HANDLE evt)
224 {
225 return CloseHandle (evt) ? 0 : geterrno_from_win_error ();
226 }
227
228 class ipc_flock
229 {
230 struct __flock64 fl;
231
232 public:
233 ipc_flock () { memset (&fl, 0, sizeof fl); }
234
235 int lock (int fd, size_t size)
236 {
237 fl.l_type = F_WRLCK;
238 fl.l_whence = SEEK_SET;
239 fl.l_start = 0;
240 fl.l_len = size;
241 return fcntl64 (fd, F_SETLKW, &fl);
242 }
243 int unlock (int fd)
244 {
245 if (!fl.l_len)
246 return 0;
247 fl.l_type = F_UNLCK;
248 return fcntl64 (fd, F_SETLKW, &fl);
249 }
250 };
251
252 /* POSIX shared memory object implementation. */
253
254 extern "C" int
255 shm_open (const char *name, int oflag, mode_t mode)
256 {
257 size_t len = strlen (name);
258 char shmname[ipc_names[shmem].prefix_len + len];
259
260 if (!check_path (shmname, shmem, name, len))
261 return -1;
262
263 /* Check for valid flags. */
264 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
265 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
266 {
267 debug_printf ("Invalid oflag 0%o", oflag);
268 set_errno (EINVAL);
269 return -1;
270 }
271
272 return open (shmname, oflag | O_CLOEXEC, mode & 0777);
273 }
274
275 extern "C" int
276 shm_unlink (const char *name)
277 {
278 size_t len = strlen (name);
279 char shmname[ipc_names[shmem].prefix_len + len];
280
281 if (!check_path (shmname, shmem, name, len))
282 return -1;
283
284 return unlink (shmname);
285 }
286
287 /* The POSIX message queue implementation is based on W. Richard STEVENS
288 implementation, just tweaked for Cygwin. The main change is
289 the usage of Windows mutexes and events instead of using the pthread
290 synchronization objects. The pathname is massaged so that the
291 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
292 are implemented additionally. */
293
294 struct mq_hdr
295 {
296 struct mq_attr mqh_attr; /* the queue's attributes */
297 long mqh_head; /* index of first message */
298 long mqh_free; /* index of first free message */
299 long mqh_nwait; /* #threads blocked in mq_receive() */
300 pid_t mqh_pid; /* nonzero PID if mqh_event set */
301 char mqh_uname[36]; /* unique name used to identify synchronization
302 objects connected to this queue */
303 struct sigevent mqh_event; /* for mq_notify() */
304 };
305
306 struct msg_hdr
307 {
308 long msg_next; /* index of next on linked list */
309 ssize_t msg_len; /* actual length */
310 unsigned int msg_prio; /* priority */
311 };
312
313 struct mq_info
314 {
315 struct mq_hdr *mqi_hdr; /* start of mmap'ed region */
316 unsigned long mqi_magic; /* magic number if open */
317 int mqi_flags; /* flags for this process */
318 HANDLE mqi_lock; /* mutex lock */
319 HANDLE mqi_waitsend; /* and condition variable for full queue */
320 HANDLE mqi_waitrecv; /* and condition variable for empty queue */
321 };
322
323 #define MQI_MAGIC 0x98765432UL
324
325 #define MSGSIZE(i) roundup((i), sizeof(long))
326
327 #define MAX_TRIES 10 /* for waiting for initialization */
328
329 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
330
331 extern "C" _off64_t lseek64 (int, _off64_t, int);
332 extern "C" void *mmap64 (void *, size_t, int, int, int, _off64_t);
333
334 extern "C" mqd_t
335 mq_open (const char *name, int oflag, ...)
336 {
337 int i, fd = -1, nonblock, created;
338 long msgsize, index;
339 _off64_t filesize = 0;
340 va_list ap;
341 mode_t mode;
342 int8_t *mptr;
343 struct __stat64 statbuff;
344 struct mq_hdr *mqhdr;
345 struct msg_hdr *msghdr;
346 struct mq_attr *attr;
347 struct mq_info *mqinfo;
348 LUID luid;
349
350 size_t len = strlen (name);
351 char mqname[ipc_names[mqueue].prefix_len + len];
352
353 if (!check_path (mqname, mqueue, name, len))
354 return (mqd_t) -1;
355
356 myfault efault;
357 if (efault.faulted (EFAULT))
358 return (mqd_t) -1;
359
360 oflag &= (O_CREAT | O_EXCL | O_NONBLOCK);
361 created = 0;
362 nonblock = oflag & O_NONBLOCK;
363 oflag &= ~O_NONBLOCK;
364 mptr = (int8_t *) MAP_FAILED;
365 mqinfo = NULL;
366
367 again:
368 if (oflag & O_CREAT)
369 {
370 va_start (ap, oflag); /* init ap to final named argument */
371 mode = va_arg (ap, mode_t) & ~S_IXUSR;
372 attr = va_arg (ap, struct mq_attr *);
373 va_end (ap);
374
375 /* Open and specify O_EXCL and user-execute */
376 fd = open (mqname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
377 if (fd < 0)
378 {
379 if (errno == EEXIST && (oflag & O_EXCL) == 0)
380 goto exists; /* already exists, OK */
381 return (mqd_t) -1;
382 }
383 created = 1;
384 /* First one to create the file initializes it */
385 if (attr == NULL)
386 attr = &defattr;
387 else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
388 {
389 set_errno (EINVAL);
390 goto err;
391 }
392 /* Calculate and set the file size */
393 msgsize = MSGSIZE (attr->mq_msgsize);
394 filesize = sizeof (struct mq_hdr)
395 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
396 if (lseek64 (fd, filesize - 1, SEEK_SET) == -1)
397 goto err;
398 if (write (fd, "", 1) == -1)
399 goto err;
400
401 /* Memory map the file */
402 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
403 MAP_SHARED, fd, 0);
404 if (mptr == (int8_t *) MAP_FAILED)
405 goto err;
406
407 /* Allocate one mq_info{} for the queue */
408 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
409 goto err;
410 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
411 mqinfo->mqi_magic = MQI_MAGIC;
412 mqinfo->mqi_flags = nonblock;
413
414 /* Initialize header at beginning of file */
415 /* Create free list with all messages on it */
416 mqhdr->mqh_attr.mq_flags = 0;
417 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
418 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
419 mqhdr->mqh_attr.mq_curmsgs = 0;
420 mqhdr->mqh_nwait = 0;
421 mqhdr->mqh_pid = 0;
422 if (!AllocateLocallyUniqueId (&luid))
423 {
424 __seterrno ();
425 goto err;
426 }
427 __small_sprintf (mqhdr->mqh_uname, "%016X%08x%08x",
428 hash_path_name (0,mqname),
429 luid.HighPart, luid.LowPart);
430 mqhdr->mqh_head = 0;
431 index = sizeof (struct mq_hdr);
432 mqhdr->mqh_free = index;
433 for (i = 0; i < attr->mq_maxmsg - 1; i++)
434 {
435 msghdr = (struct msg_hdr *) &mptr[index];
436 index += sizeof (struct msg_hdr) + msgsize;
437 msghdr->msg_next = index;
438 }
439 msghdr = (struct msg_hdr *) &mptr[index];
440 msghdr->msg_next = 0; /* end of free list */
441
442 /* Initialize mutex & condition variables */
443 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
444 if (i != 0)
445 goto pthreaderr;
446
447 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
448 if (i != 0)
449 goto pthreaderr;
450
451 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
452 if (i != 0)
453 goto pthreaderr;
454
455 /* Initialization complete, turn off user-execute bit */
456 if (fchmod (fd, mode) == -1)
457 goto err;
458 close (fd);
459 return ((mqd_t) mqinfo);
460 }
461
462 exists:
463 /* Open the file then memory map */
464 if ((fd = open (mqname, O_RDWR | O_CLOEXEC)) < 0)
465 {
466 if (errno == ENOENT && (oflag & O_CREAT))
467 goto again;
468 goto err;
469 }
470 /* Make certain initialization is complete */
471 for (i = 0; i < MAX_TRIES; i++)
472 {
473 if (stat64 (mqname, &statbuff) == -1)
474 {
475 if (errno == ENOENT && (oflag & O_CREAT))
476 {
477 close (fd);
478 fd = -1;
479 goto again;
480 }
481 goto err;
482 }
483 if ((statbuff.st_mode & S_IXUSR) == 0)
484 break;
485 sleep (1);
486 }
487 if (i == MAX_TRIES)
488 {
489 set_errno (ETIMEDOUT);
490 goto err;
491 }
492
493 filesize = statbuff.st_size;
494 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
495 MAP_SHARED, fd, 0);
496 if (mptr == (int8_t *) MAP_FAILED)
497 goto err;
498 close (fd);
499 fd = -1;
500
501 /* Allocate one mq_info{} for each open */
502 if (!(mqinfo = (struct mq_info *) calloc (1, sizeof (struct mq_info))))
503 goto err;
504 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
505 mqinfo->mqi_magic = MQI_MAGIC;
506 mqinfo->mqi_flags = nonblock;
507
508 /* Initialize mutex & condition variable */
509 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
510 if (i != 0)
511 goto pthreaderr;
512
513 i = ipc_cond_init (&mqinfo->mqi_waitsend, mqhdr->mqh_uname, 'S');
514 if (i != 0)
515 goto pthreaderr;
516
517 i = ipc_cond_init (&mqinfo->mqi_waitrecv, mqhdr->mqh_uname, 'R');
518 if (i != 0)
519 goto pthreaderr;
520
521 return (mqd_t) mqinfo;
522
523 pthreaderr:
524 errno = i;
525 err:
526 /* Don't let following function calls change errno */
527 save_errno save;
528
529 if (created)
530 unlink (mqname);
531 if (mptr != (int8_t *) MAP_FAILED)
532 munmap((void *) mptr, (size_t) filesize);
533 if (mqinfo)
534 {
535 if (mqinfo->mqi_lock)
536 ipc_mutex_close (mqinfo->mqi_lock);
537 if (mqinfo->mqi_waitsend)
538 ipc_cond_close (mqinfo->mqi_waitsend);
539 if (mqinfo->mqi_waitrecv)
540 ipc_cond_close (mqinfo->mqi_waitrecv);
541 free (mqinfo);
542 }
543 if (fd >= 0)
544 close (fd);
545 return (mqd_t) -1;
546 }
547
548 extern "C" int
549 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
550 {
551 int n;
552 struct mq_hdr *mqhdr;
553 struct mq_attr *attr;
554 struct mq_info *mqinfo;
555
556 myfault efault;
557 if (efault.faulted (EBADF))
558 return -1;
559
560 mqinfo = (struct mq_info *) mqd;
561 if (mqinfo->mqi_magic != MQI_MAGIC)
562 {
563 set_errno (EBADF);
564 return -1;
565 }
566 mqhdr = mqinfo->mqi_hdr;
567 attr = &mqhdr->mqh_attr;
568 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
569 {
570 errno = n;
571 return -1;
572 }
573 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
574 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
575 mqstat->mq_msgsize = attr->mq_msgsize;
576 mqstat->mq_curmsgs = attr->mq_curmsgs;
577
578 ipc_mutex_unlock (mqinfo->mqi_lock);
579 return 0;
580 }
581
582 extern "C" int
583 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
584 {
585 int n;
586 struct mq_hdr *mqhdr;
587 struct mq_attr *attr;
588 struct mq_info *mqinfo;
589
590 myfault efault;
591 if (efault.faulted (EBADF))
592 return -1;
593
594 mqinfo = (struct mq_info *) mqd;
595 if (mqinfo->mqi_magic != MQI_MAGIC)
596 {
597 set_errno (EBADF);
598 return -1;
599 }
600 mqhdr = mqinfo->mqi_hdr;
601 attr = &mqhdr->mqh_attr;
602 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
603 {
604 errno = n;
605 return -1;
606 }
607
608 if (omqstat != NULL)
609 {
610 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
611 omqstat->mq_maxmsg = attr->mq_maxmsg;
612 omqstat->mq_msgsize = attr->mq_msgsize;
613 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
614 }
615
616 if (mqstat->mq_flags & O_NONBLOCK)
617 mqinfo->mqi_flags |= O_NONBLOCK;
618 else
619 mqinfo->mqi_flags &= ~O_NONBLOCK;
620
621 ipc_mutex_unlock (mqinfo->mqi_lock);
622 return 0;
623 }
624
625 extern "C" int
626 mq_notify (mqd_t mqd, const struct sigevent *notification)
627 {
628 int n;
629 pid_t pid;
630 struct mq_hdr *mqhdr;
631 struct mq_info *mqinfo;
632
633 myfault efault;
634 if (efault.faulted (EBADF))
635 return -1;
636
637 mqinfo = (struct mq_info *) mqd;
638 if (mqinfo->mqi_magic != MQI_MAGIC)
639 {
640 set_errno (EBADF);
641 return -1;
642 }
643 mqhdr = mqinfo->mqi_hdr;
644 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
645 {
646 errno = n;
647 return -1;
648 }
649
650 pid = getpid ();
651 if (!notification)
652 {
653 if (mqhdr->mqh_pid == pid)
654 mqhdr->mqh_pid = 0; /* unregister calling process */
655 }
656 else
657 {
658 if (mqhdr->mqh_pid != 0)
659 {
660 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
661 {
662 set_errno (EBUSY);
663 ipc_mutex_unlock (mqinfo->mqi_lock);
664 return -1;
665 }
666 }
667 mqhdr->mqh_pid = pid;
668 mqhdr->mqh_event = *notification;
669 }
670 ipc_mutex_unlock (mqinfo->mqi_lock);
671 return 0;
672 }
673
674 static int
675 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
676 const struct timespec *abstime)
677 {
678 int n;
679 long index, freeindex;
680 int8_t *mptr;
681 struct sigevent *sigev;
682 struct mq_hdr *mqhdr;
683 struct mq_attr *attr;
684 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
685 struct mq_info *mqinfo;
686
687 myfault efault;
688 if (efault.faulted (EBADF))
689 return -1;
690
691 mqinfo = (struct mq_info *) mqd;
692 if (mqinfo->mqi_magic != MQI_MAGIC)
693 {
694 set_errno (EBADF);
695 return -1;
696 }
697 if (prio > MQ_PRIO_MAX)
698 {
699 set_errno (EINVAL);
700 return -1;
701 }
702
703 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
704 mptr = (int8_t *) mqhdr; /* byte pointer */
705 attr = &mqhdr->mqh_attr;
706 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
707 {
708 errno = n;
709 return -1;
710 }
711
712 if (len > (size_t) attr->mq_msgsize)
713 {
714 set_errno (EMSGSIZE);
715 goto err;
716 }
717 if (attr->mq_curmsgs == 0)
718 {
719 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
720 {
721 sigev = &mqhdr->mqh_event;
722 if (sigev->sigev_notify == SIGEV_SIGNAL)
723 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, sigev->sigev_value);
724 mqhdr->mqh_pid = 0; /* unregister */
725 }
726 }
727 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
728 {
729 /* Queue is full */
730 if (mqinfo->mqi_flags & O_NONBLOCK)
731 {
732 set_errno (EAGAIN);
733 goto err;
734 }
735 /* Wait for room for one message on the queue */
736 while (attr->mq_curmsgs >= attr->mq_maxmsg)
737 {
738 int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend, mqinfo->mqi_lock,
739 abstime);
740 if (ret != 0)
741 {
742 set_errno (ret);
743 goto err;
744 }
745 }
746 }
747
748 /* nmsghdr will point to new message */
749 if ((freeindex = mqhdr->mqh_free) == 0)
750 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
751
752 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
753 nmsghdr->msg_prio = prio;
754 nmsghdr->msg_len = len;
755 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
756 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
757
758 /* Find right place for message in linked list */
759 index = mqhdr->mqh_head;
760 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
761 while (index)
762 {
763 msghdr = (struct msg_hdr *) &mptr[index];
764 if (prio > msghdr->msg_prio)
765 {
766 nmsghdr->msg_next = index;
767 pmsghdr->msg_next = freeindex;
768 break;
769 }
770 index = msghdr->msg_next;
771 pmsghdr = msghdr;
772 }
773 if (index == 0)
774 {
775 /* Queue was empty or new goes at end of list */
776 pmsghdr->msg_next = freeindex;
777 nmsghdr->msg_next = 0;
778 }
779 /* Wake up anyone blocked in mq_receive waiting for a message */
780 if (attr->mq_curmsgs == 0)
781 ipc_cond_signal (mqinfo->mqi_waitrecv);
782 attr->mq_curmsgs++;
783
784 ipc_mutex_unlock (mqinfo->mqi_lock);
785 return 0;
786
787 err:
788 ipc_mutex_unlock (mqinfo->mqi_lock);
789 return -1;
790 }
791
792 extern "C" int
793 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
794 {
795 return _mq_send (mqd, ptr, len, prio, NULL);
796 }
797
798 extern "C" int
799 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
800 const struct timespec *abstime)
801 {
802 return _mq_send (mqd, ptr, len, prio, abstime);
803 }
804
805 static ssize_t
806 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
807 const struct timespec *abstime)
808 {
809 int n;
810 long index;
811 int8_t *mptr;
812 ssize_t len;
813 struct mq_hdr *mqhdr;
814 struct mq_attr *attr;
815 struct msg_hdr *msghdr;
816 struct mq_info *mqinfo;
817
818 myfault efault;
819 if (efault.faulted (EBADF))
820 return -1;
821
822 mqinfo = (struct mq_info *) mqd;
823 if (mqinfo->mqi_magic != MQI_MAGIC)
824 {
825 set_errno (EBADF);
826 return -1;
827 }
828 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
829 mptr = (int8_t *) mqhdr; /* byte pointer */
830 attr = &mqhdr->mqh_attr;
831 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
832 {
833 errno = n;
834 return -1;
835 }
836
837 if (maxlen < (size_t) attr->mq_msgsize)
838 {
839 set_errno (EMSGSIZE);
840 goto err;
841 }
842 if (attr->mq_curmsgs == 0) /* queue is empty */
843 {
844 if (mqinfo->mqi_flags & O_NONBLOCK)
845 {
846 set_errno (EAGAIN);
847 goto err;
848 }
849 /* Wait for a message to be placed onto queue */
850 mqhdr->mqh_nwait++;
851 while (attr->mq_curmsgs == 0)
852 {
853 int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv, mqinfo->mqi_lock,
854 abstime);
855 if (ret != 0)
856 {
857 set_errno (ret);
858 goto err;
859 }
860 }
861 mqhdr->mqh_nwait--;
862 }
863
864 if ((index = mqhdr->mqh_head) == 0)
865 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
866
867 msghdr = (struct msg_hdr *) &mptr[index];
868 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
869 len = msghdr->msg_len;
870 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
871 if (priop != NULL)
872 *priop = msghdr->msg_prio;
873
874 /* Just-read message goes to front of free list */
875 msghdr->msg_next = mqhdr->mqh_free;
876 mqhdr->mqh_free = index;
877
878 /* Wake up anyone blocked in mq_send waiting for room */
879 if (attr->mq_curmsgs == attr->mq_maxmsg)
880 ipc_cond_signal (mqinfo->mqi_waitsend);
881 attr->mq_curmsgs--;
882
883 ipc_mutex_unlock (mqinfo->mqi_lock);
884 return len;
885
886 err:
887 ipc_mutex_unlock (mqinfo->mqi_lock);
888 return -1;
889 }
890
891 extern "C" ssize_t
892 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
893 {
894 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
895 }
896
897 extern "C" ssize_t
898 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
899 const struct timespec *abstime)
900 {
901 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
902 }
903
904 extern "C" int
905 mq_close (mqd_t mqd)
906 {
907 long msgsize, filesize;
908 struct mq_hdr *mqhdr;
909 struct mq_attr *attr;
910 struct mq_info *mqinfo;
911
912 myfault efault;
913 if (efault.faulted (EBADF))
914 return -1;
915
916 mqinfo = (struct mq_info *) mqd;
917 if (mqinfo->mqi_magic != MQI_MAGIC)
918 {
919 set_errno (EBADF);
920 return -1;
921 }
922 mqhdr = mqinfo->mqi_hdr;
923 attr = &mqhdr->mqh_attr;
924
925 if (mq_notify (mqd, NULL)) /* unregister calling process */
926 return -1;
927
928 msgsize = MSGSIZE (attr->mq_msgsize);
929 filesize = sizeof (struct mq_hdr)
930 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
931 if (munmap (mqinfo->mqi_hdr, filesize) == -1)
932 return -1;
933
934 mqinfo->mqi_magic = 0; /* just in case */
935 ipc_cond_close (mqinfo->mqi_waitsend);
936 ipc_cond_close (mqinfo->mqi_waitrecv);
937 ipc_mutex_close (mqinfo->mqi_lock);
938 free (mqinfo);
939 return 0;
940 }
941
942 extern "C" int
943 mq_unlink (const char *name)
944 {
945 size_t len = strlen (name);
946 char mqname[ipc_names[mqueue].prefix_len + len];
947
948 if (!check_path (mqname, mqueue, name, len))
949 return -1;
950 if (unlink (mqname) == -1)
951 return -1;
952 return 0;
953 }
954
955 /* POSIX named semaphore implementation. Loosely based on Richard W. STEPHENS
956 implementation as far as sem_open is concerned, but under the hood using
957 the already existing semaphore class in thread.cc. Using a file backed
958 solution allows to implement kernel persistent named semaphores. */
959
960 struct sem_finfo
961 {
962 unsigned int value;
963 unsigned long long hash;
964 LUID luid;
965 };
966
967 extern "C" sem_t *
968 sem_open (const char *name, int oflag, ...)
969 {
970 int i, fd = -1, created;
971 va_list ap;
972 mode_t mode = 0;
973 unsigned int value = 0;
974 struct __stat64 statbuff;
975 sem_t *sem = SEM_FAILED;
976 sem_finfo sf;
977 bool wasopen = false;
978 ipc_flock file;
979
980 size_t len = strlen (name);
981 char semname[ipc_names[semaphore].prefix_len + len];
982
983 if (!check_path (semname, semaphore, name, len))
984 return SEM_FAILED;
985
986 myfault efault;
987 if (efault.faulted (EFAULT))
988 return SEM_FAILED;
989
990 created = 0;
991 oflag &= (O_CREAT | O_EXCL);
992
993 again:
994 if (oflag & O_CREAT)
995 {
996 va_start (ap, oflag); /* init ap to final named argument */
997 mode = va_arg (ap, mode_t) & ~S_IXUSR;
998 value = va_arg (ap, unsigned int);
999 va_end (ap);
1000
1001 /* Open and specify O_EXCL and user-execute */
1002 fd = open (semname, oflag | O_EXCL | O_RDWR | O_CLOEXEC, mode | S_IXUSR);
1003 if (fd < 0)
1004 {
1005 if (errno == EEXIST && (oflag & O_EXCL) == 0)
1006 goto exists; /* already exists, OK */
1007 return SEM_FAILED;
1008 }
1009 created = 1;
1010 /* First one to create the file initializes it. */
1011 if (!AllocateLocallyUniqueId (&sf.luid))
1012 {
1013 __seterrno ();
1014 goto err;
1015 }
1016 sf.value = value;
1017 sf.hash = hash_path_name (0, semname);
1018 if (write (fd, &sf, sizeof sf) != sizeof sf)
1019 goto err;
1020 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, value, wasopen);
1021 if (sem == SEM_FAILED)
1022 goto err;
1023 /* Initialization complete, turn off user-execute bit */
1024 if (fchmod (fd, mode) == -1)
1025 goto err;
1026 /* Don't close (fd); */
1027 return sem;
1028 }
1029
1030 exists:
1031 /* Open the file and fetch the semaphore name. */
1032 if ((fd = open (semname, O_RDWR | O_CLOEXEC)) < 0)
1033 {
1034 if (errno == ENOENT && (oflag & O_CREAT))
1035 goto again;
1036 goto err;
1037 }
1038 /* Make certain initialization is complete */
1039 for (i = 0; i < MAX_TRIES; i++)
1040 {
1041 if (stat64 (semname, &statbuff) == -1)
1042 {
1043 if (errno == ENOENT && (oflag & O_CREAT))
1044 {
1045 close (fd);
1046 fd = -1;
1047 goto again;
1048 }
1049 goto err;
1050 }
1051 if ((statbuff.st_mode & S_IXUSR) == 0)
1052 break;
1053 sleep (1);
1054 }
1055 if (i == MAX_TRIES)
1056 {
1057 set_errno (ETIMEDOUT);
1058 goto err;
1059 }
1060 if (file.lock (fd, sizeof sf))
1061 goto err;
1062 if (read (fd, &sf, sizeof sf) != sizeof sf)
1063 goto err;
1064 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, sf.value, wasopen);
1065 file.unlock (fd);
1066 if (sem == SEM_FAILED)
1067 goto err;
1068 /* If wasopen is set, the semaphore was already opened and we already have
1069 an open file descriptor pointing to the file. This means, we have to
1070 close the file descriptor created in this call. It won't be stored
1071 anywhere anyway. */
1072 if (wasopen)
1073 close (fd);
1074 return sem;
1075
1076 err:
1077 /* Don't let following function calls change errno */
1078 save_errno save;
1079
1080 file.unlock (fd);
1081 if (created)
1082 unlink (semname);
1083 if (sem != SEM_FAILED)
1084 semaphore::close (sem);
1085 if (fd >= 0)
1086 close (fd);
1087 return SEM_FAILED;
1088 }
1089
1090 int
1091 _sem_close (sem_t *sem, bool do_close)
1092 {
1093 sem_finfo sf;
1094 int fd, ret = -1;
1095 ipc_flock file;
1096
1097 if (semaphore::getinternal (sem, &fd, &sf.hash, &sf.luid, &sf.value) == -1)
1098 return -1;
1099 if (!file.lock (fd, sizeof sf)
1100 && lseek64 (fd, 0LL, SEEK_SET) != (_off64_t) -1
1101 && write (fd, &sf, sizeof sf) == sizeof sf)
1102 ret = do_close ? semaphore::close (sem) : 0;
1103
1104 /* Don't let following function calls change errno */
1105 save_errno save;
1106 file.unlock (fd);
1107 close (fd);
1108
1109 return ret;
1110 }
1111
1112 extern "C" int
1113 sem_close (sem_t *sem)
1114 {
1115 return _sem_close (sem, true);
1116 }
1117
1118 extern "C" int
1119 sem_unlink (const char *name)
1120 {
1121 size_t len = strlen (name);
1122 char semname[ipc_names[semaphore].prefix_len + len];
1123
1124 if (!check_path (semname, semaphore, name, len))
1125 return -1;
1126 if (unlink (semname) == -1)
1127 return -1;
1128 return 0;
1129 }
This page took 0.094247 seconds and 6 git commands to generate.