]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
Throughout, call fcntl64 instead of fcntl or fcntl_worker.
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 Copyright 2007 Red Hat, Inc.
4
5 This file is part of Cygwin.
6
7 This software is a copyrighted work licensed under the terms of the
8 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
9 details. */
10
11 #include "winsup.h"
12 #include "thread.h"
13 #include "path.h"
14 #include "cygerrno.h"
15 #include "cygtls.h"
16 #include "fhandler.h"
17 #include "dtable.h"
18 #include "cygheap.h"
19 #include "security.h"
20 #include "sigproc.h"
21 #include <sys/stat.h>
22 #include <sys/mman.h>
23 #include <sys/param.h>
24 #include <fcntl.h>
25 #include <pwd.h>
26 #include <stdlib.h>
27 #include <limits.h>
28 #include <unistd.h>
29 #include <stdarg.h>
30 #include <mqueue.h>
31 #include <semaphore.h>
32
33 /* The prefix_len is the length of the path prefix ncluding trailing "/"
34 (or "/sem." for semaphores) as well as the trailing NUL. */
35 struct
36 {
37 const char *prefix;
38 const size_t prefix_len;
39 const char *description;
40 } ipc_names[] = {
41 { "/dev/shm", 10, "POSIX shared memory object" },
42 { "/dev/mqueue", 13, "POSIX message queue" },
43 { "/dev/shm", 14, "POSIX semaphore" }
44 };
45
46 enum ipc_type_t
47 {
48 shmem,
49 mqueue,
50 semaphore
51 };
52
53 static bool
54 check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
55 {
56 /* Note that we require the existance of the apprpriate /dev subdirectories
57 for POSIX IPC object support, similar to Linux (which supports the
58 directories, but doesn't require to mount them). We don't create
59 these directory here, that's the task of the installer. But we check
60 for existance and give ample warning. */
61 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
62 if (path.error || !path.exists () || !path.isdir ())
63 {
64 small_printf (
65 "Warning: '%s' does not exists or is not a directory.\n\n"
66 "%ss require the existance of this directory.\n"
67 "Create the directory '%s' and set the permissions to 01777.\n"
68 "For instance on the command line: mkdir -m 01777 %s\n",
69 ipc_names[type].prefix, ipc_names[type].description,
70 ipc_names[type].prefix, ipc_names[type].prefix);
71 set_errno (EINVAL);
72 return false;
73 }
74 /* Name must start with a single slash. */
75 if (!name || name[0] != '/' || name[1] == '/' || !name[1])
76 {
77 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
78 set_errno (EINVAL);
79 return false;
80 }
81 if (len > PATH_MAX - ipc_names[type].prefix_len)
82 {
83 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
84 set_errno (ENAMETOOLONG);
85 return false;
86 }
87 __small_sprintf (res_name, "%s/%s%s", ipc_names[type].prefix,
88 type == semaphore ? "sem." : "",
89 name + 1);
90 return true;
91 }
92
93 static int
94 ipc_mutex_init (HANDLE *pmtx, const char *name)
95 {
96 char buf[MAX_PATH];
97 __small_sprintf (buf, "%scyg_pmtx/%s", cygheap->shared_prefix, name);
98 *pmtx = CreateMutex (&sec_all, FALSE, buf);
99 if (!*pmtx)
100 debug_printf ("failed: %E\n");
101 return *pmtx ? 0 : geterrno_from_win_error ();
102 }
103
104 static int
105 ipc_mutex_lock (HANDLE mtx)
106 {
107 HANDLE h[2] = { mtx, signal_arrived };
108
109 switch (WaitForMultipleObjects (2, h, FALSE, INFINITE))
110 {
111 case WAIT_OBJECT_0:
112 case WAIT_ABANDONED_0:
113 return 0;
114 case WAIT_OBJECT_0 + 1:
115 set_errno (EINTR);
116 return 1;
117 default:
118 break;
119 }
120 return geterrno_from_win_error ();
121 }
122
123 static inline int
124 ipc_mutex_unlock (HANDLE mtx)
125 {
126 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
127 }
128
129 static inline int
130 ipc_mutex_close (HANDLE mtx)
131 {
132 return CloseHandle (mtx) ? 0 : geterrno_from_win_error ();
133 }
134
135 static int
136 ipc_cond_init (HANDLE *pevt, const char *name)
137 {
138 char buf[MAX_PATH];
139 __small_sprintf (buf, "%scyg_pevt/%s", cygheap->shared_prefix, name);
140 *pevt = CreateEvent (&sec_all, TRUE, FALSE, buf);
141 if (!*pevt)
142 debug_printf ("failed: %E\n");
143 return *pevt ? 0 : geterrno_from_win_error ();
144 }
145
146 static int
147 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
148 {
149 struct timeval tv;
150 DWORD timeout;
151 HANDLE h[2] = { mtx, evt };
152
153 if (!abstime)
154 timeout = INFINITE;
155 else if (abstime->tv_sec < 0
156 || abstime->tv_nsec < 0
157 || abstime->tv_nsec > 999999999)
158 return EINVAL;
159 else
160 {
161 gettimeofday (&tv, NULL);
162 /* Check for immediate timeout. */
163 if (tv.tv_sec > abstime->tv_sec
164 || (tv.tv_sec == abstime->tv_sec
165 && tv.tv_usec > abstime->tv_nsec / 1000))
166 return ETIMEDOUT;
167 timeout = (abstime->tv_sec - tv.tv_sec) * 1000;
168 timeout += (abstime->tv_nsec / 1000 - tv.tv_usec) / 1000;
169 }
170 if (ipc_mutex_unlock (mtx))
171 return -1;
172 switch (WaitForMultipleObjects (2, h, TRUE, timeout))
173 {
174 case WAIT_OBJECT_0:
175 case WAIT_ABANDONED_0:
176 ResetEvent (evt);
177 return 0;
178 case WAIT_TIMEOUT:
179 ipc_mutex_lock (mtx);
180 return ETIMEDOUT;
181 default:
182 break;
183 }
184 return geterrno_from_win_error ();
185 }
186
187 static inline int
188 ipc_cond_signal (HANDLE evt)
189 {
190 return SetEvent (evt) ? 0 : geterrno_from_win_error ();
191 }
192
193 static inline int
194 ipc_cond_close (HANDLE evt)
195 {
196 return CloseHandle (evt) ? 0 : geterrno_from_win_error ();
197 }
198
199 class ipc_flock
200 {
201 struct __flock64 fl;
202
203 public:
204 ipc_flock () { memset (&fl, 0, sizeof fl); }
205
206 int lock (int fd, size_t size)
207 {
208 fl.l_type = F_WRLCK;
209 fl.l_whence = SEEK_SET;
210 fl.l_start = 0;
211 fl.l_len = size;
212 return fcntl64 (fd, F_SETLKW, &fl);
213 }
214 int unlock (int fd)
215 {
216 if (!fl.l_len)
217 return 0;
218 fl.l_type = F_UNLCK;
219 return fcntl64 (fd, F_SETLKW, &fl);
220 }
221 };
222
223 /* POSIX shared memory object implementation. */
224
225 extern "C" int
226 shm_open (const char *name, int oflag, mode_t mode)
227 {
228 size_t len = strlen (name);
229 char shmname[ipc_names[shmem].prefix_len + len];
230
231 if (!check_path (shmname, shmem, name, len))
232 return -1;
233
234 /* Check for valid flags. */
235 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
236 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
237 {
238 debug_printf ("Invalid oflag 0%o", oflag);
239 set_errno (EINVAL);
240 return -1;
241 }
242
243 return open (shmname, oflag, mode & 0777);
244 }
245
246 extern "C" int
247 shm_unlink (const char *name)
248 {
249 size_t len = strlen (name);
250 char shmname[ipc_names[shmem].prefix_len + len];
251
252 if (!check_path (shmname, shmem, name, len))
253 return -1;
254
255 return unlink (shmname);
256 }
257
258 /* The POSIX message queue implementation is based on W. Richard STEVENS
259 implementation, just tweaked for Cygwin. The main change is
260 the usage of Windows mutexes and events instead of using the pthread
261 synchronization objects. The pathname is massaged so that the
262 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
263 are implemented additionally. */
264
265 struct mq_hdr
266 {
267 struct mq_attr mqh_attr; /* the queue's attributes */
268 long mqh_head; /* index of first message */
269 long mqh_free; /* index of first free message */
270 long mqh_nwait; /* #threads blocked in mq_receive() */
271 pid_t mqh_pid; /* nonzero PID if mqh_event set */
272 char mqh_uname[36]; /* unique name used to identify synchronization
273 objects connected to this queue */
274 struct sigevent mqh_event; /* for mq_notify() */
275 };
276
277 struct msg_hdr
278 {
279 long msg_next; /* index of next on linked list */
280 ssize_t msg_len; /* actual length */
281 unsigned int msg_prio; /* priority */
282 };
283
284 struct mq_info
285 {
286 struct mq_hdr *mqi_hdr; /* start of mmap'ed region */
287 unsigned long mqi_magic; /* magic number if open */
288 int mqi_flags; /* flags for this process */
289 HANDLE mqi_lock; /* mutex lock */
290 HANDLE mqi_wait; /* and condition variable */
291 };
292
293 #define MQI_MAGIC 0x98765432UL
294
295 #define MSGSIZE(i) roundup((i), sizeof(long))
296
297 #define MAX_TRIES 10 /* for waiting for initialization */
298
299 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
300
301 extern "C" _off64_t lseek64 (int, _off64_t, int);
302 extern "C" void *mmap64 (void *, size_t, int, int, int, _off64_t);
303
304 extern "C" mqd_t
305 mq_open (const char *name, int oflag, ...)
306 {
307 int i, fd, nonblock, created;
308 long msgsize, index;
309 _off64_t filesize = 0;
310 va_list ap;
311 mode_t mode;
312 int8_t *mptr;
313 struct __stat64 statbuff;
314 struct mq_hdr *mqhdr;
315 struct msg_hdr *msghdr;
316 struct mq_attr *attr;
317 struct mq_info *mqinfo;
318 LUID luid;
319
320 size_t len = strlen (name);
321 char mqname[ipc_names[mqueue].prefix_len + len];
322
323 if (!check_path (mqname, mqueue, name, len))
324 return (mqd_t) -1;
325
326 myfault efault;
327 if (efault.faulted (EFAULT))
328 return (mqd_t) -1;
329
330 oflag &= (O_CREAT | O_EXCL | O_NONBLOCK);
331 created = 0;
332 nonblock = oflag & O_NONBLOCK;
333 oflag &= ~O_NONBLOCK;
334 mptr = (int8_t *) MAP_FAILED;
335 mqinfo = NULL;
336
337 again:
338 if (oflag & O_CREAT)
339 {
340 va_start (ap, oflag); /* init ap to final named argument */
341 mode = va_arg (ap, mode_t) & ~S_IXUSR;
342 attr = va_arg (ap, struct mq_attr *);
343 va_end (ap);
344
345 /* Open and specify O_EXCL and user-execute */
346 fd = open (mqname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
347 if (fd < 0)
348 {
349 if (errno == EEXIST && (oflag & O_EXCL) == 0)
350 goto exists; /* already exists, OK */
351 return (mqd_t) -1;
352 }
353 created = 1;
354 /* First one to create the file initializes it */
355 if (attr == NULL)
356 attr = &defattr;
357 else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
358 {
359 set_errno (EINVAL);
360 goto err;
361 }
362 /* Calculate and set the file size */
363 msgsize = MSGSIZE (attr->mq_msgsize);
364 filesize = sizeof (struct mq_hdr)
365 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
366 if (lseek64 (fd, filesize - 1, SEEK_SET) == -1)
367 goto err;
368 if (write (fd, "", 1) == -1)
369 goto err;
370
371 /* Memory map the file */
372 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
373 MAP_SHARED, fd, 0);
374 if (mptr == (int8_t *) MAP_FAILED)
375 goto err;
376
377 /* Allocate one mq_info{} for the queue */
378 if (!(mqinfo = (struct mq_info *) malloc (sizeof (struct mq_info))))
379 goto err;
380 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
381 mqinfo->mqi_magic = MQI_MAGIC;
382 mqinfo->mqi_flags = nonblock;
383
384 /* Initialize header at beginning of file */
385 /* Create free list with all messages on it */
386 mqhdr->mqh_attr.mq_flags = 0;
387 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
388 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
389 mqhdr->mqh_attr.mq_curmsgs = 0;
390 mqhdr->mqh_nwait = 0;
391 mqhdr->mqh_pid = 0;
392 if (!AllocateLocallyUniqueId (&luid))
393 {
394 __seterrno ();
395 goto err;
396 }
397 __small_sprintf (mqhdr->mqh_uname, "cyg%016X%08x%08x",
398 hash_path_name (0,mqname),
399 luid.HighPart, luid.LowPart);
400 mqhdr->mqh_head = 0;
401 index = sizeof (struct mq_hdr);
402 mqhdr->mqh_free = index;
403 for (i = 0; i < attr->mq_maxmsg - 1; i++)
404 {
405 msghdr = (struct msg_hdr *) &mptr[index];
406 index += sizeof (struct msg_hdr) + msgsize;
407 msghdr->msg_next = index;
408 }
409 msghdr = (struct msg_hdr *) &mptr[index];
410 msghdr->msg_next = 0; /* end of free list */
411
412 /* Initialize mutex & condition variable */
413 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
414 if (i != 0)
415 goto pthreaderr;
416
417 i = ipc_cond_init (&mqinfo->mqi_wait, mqhdr->mqh_uname);
418 if (i != 0)
419 goto pthreaderr;
420
421 /* Initialization complete, turn off user-execute bit */
422 if (fchmod (fd, mode) == -1)
423 goto err;
424 close (fd);
425 return ((mqd_t) mqinfo);
426 }
427
428 exists:
429 /* Open the file then memory map */
430 if ((fd = open (mqname, O_RDWR)) < 0)
431 {
432 if (errno == ENOENT && (oflag & O_CREAT))
433 goto again;
434 goto err;
435 }
436 /* Make certain initialization is complete */
437 for (i = 0; i < MAX_TRIES; i++)
438 {
439 if (stat64 (mqname, &statbuff) == -1)
440 {
441 if (errno == ENOENT && (oflag & O_CREAT))
442 {
443 close (fd);
444 goto again;
445 }
446 goto err;
447 }
448 if ((statbuff.st_mode & S_IXUSR) == 0)
449 break;
450 sleep (1);
451 }
452 if (i == MAX_TRIES)
453 {
454 set_errno (ETIMEDOUT);
455 goto err;
456 }
457
458 filesize = statbuff.st_size;
459 mptr = (int8_t *) mmap64 (NULL, (size_t) filesize, PROT_READ | PROT_WRITE,
460 MAP_SHARED, fd, 0);
461 if (mptr == (int8_t *) MAP_FAILED)
462 goto err;
463 close (fd);
464
465 /* Allocate one mq_info{} for each open */
466 if (!(mqinfo = (struct mq_info *) malloc (sizeof (struct mq_info))))
467 goto err;
468 mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
469 mqinfo->mqi_magic = MQI_MAGIC;
470 mqinfo->mqi_flags = nonblock;
471
472 /* Initialize mutex & condition variable */
473 i = ipc_mutex_init (&mqinfo->mqi_lock, mqhdr->mqh_uname);
474 if (i != 0)
475 goto pthreaderr;
476
477 i = ipc_cond_init (&mqinfo->mqi_wait, mqhdr->mqh_uname);
478 if (i != 0)
479 goto pthreaderr;
480
481 return (mqd_t) mqinfo;
482
483 pthreaderr:
484 errno = i;
485 err:
486 /* Don't let following function calls change errno */
487 save_errno save;
488
489 if (created)
490 unlink (mqname);
491 if (mptr != (int8_t *) MAP_FAILED)
492 munmap((void *) mptr, (size_t) filesize);
493 if (mqinfo)
494 free (mqinfo);
495 close (fd);
496 return (mqd_t) -1;
497 }
498
499 extern "C" int
500 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
501 {
502 int n;
503 struct mq_hdr *mqhdr;
504 struct mq_attr *attr;
505 struct mq_info *mqinfo;
506
507 myfault efault;
508 if (efault.faulted (EBADF))
509 return -1;
510
511 mqinfo = (struct mq_info *) mqd;
512 if (mqinfo->mqi_magic != MQI_MAGIC)
513 {
514 set_errno (EBADF);
515 return -1;
516 }
517 mqhdr = mqinfo->mqi_hdr;
518 attr = &mqhdr->mqh_attr;
519 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
520 {
521 errno = n;
522 return -1;
523 }
524 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
525 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
526 mqstat->mq_msgsize = attr->mq_msgsize;
527 mqstat->mq_curmsgs = attr->mq_curmsgs;
528
529 ipc_mutex_unlock (mqinfo->mqi_lock);
530 return 0;
531 }
532
533 extern "C" int
534 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
535 {
536 int n;
537 struct mq_hdr *mqhdr;
538 struct mq_attr *attr;
539 struct mq_info *mqinfo;
540
541 myfault efault;
542 if (efault.faulted (EBADF))
543 return -1;
544
545 mqinfo = (struct mq_info *) mqd;
546 if (mqinfo->mqi_magic != MQI_MAGIC)
547 {
548 set_errno (EBADF);
549 return -1;
550 }
551 mqhdr = mqinfo->mqi_hdr;
552 attr = &mqhdr->mqh_attr;
553 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
554 {
555 errno = n;
556 return -1;
557 }
558
559 if (omqstat != NULL)
560 {
561 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
562 omqstat->mq_maxmsg = attr->mq_maxmsg;
563 omqstat->mq_msgsize = attr->mq_msgsize;
564 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
565 }
566
567 if (mqstat->mq_flags & O_NONBLOCK)
568 mqinfo->mqi_flags |= O_NONBLOCK;
569 else
570 mqinfo->mqi_flags &= ~O_NONBLOCK;
571
572 ipc_mutex_unlock (mqinfo->mqi_lock);
573 return 0;
574 }
575
576 extern "C" int
577 mq_notify (mqd_t mqd, const struct sigevent *notification)
578 {
579 int n;
580 pid_t pid;
581 struct mq_hdr *mqhdr;
582 struct mq_info *mqinfo;
583
584 myfault efault;
585 if (efault.faulted (EBADF))
586 return -1;
587
588 mqinfo = (struct mq_info *) mqd;
589 if (mqinfo->mqi_magic != MQI_MAGIC)
590 {
591 set_errno (EBADF);
592 return -1;
593 }
594 mqhdr = mqinfo->mqi_hdr;
595 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
596 {
597 errno = n;
598 return -1;
599 }
600
601 pid = getpid ();
602 if (!notification)
603 {
604 if (mqhdr->mqh_pid == pid)
605 mqhdr->mqh_pid = 0; /* unregister calling process */
606 }
607 else
608 {
609 if (mqhdr->mqh_pid != 0)
610 {
611 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
612 {
613 set_errno (EBUSY);
614 ipc_mutex_unlock (mqinfo->mqi_lock);
615 return -1;
616 }
617 }
618 mqhdr->mqh_pid = pid;
619 mqhdr->mqh_event = *notification;
620 }
621 ipc_mutex_unlock (mqinfo->mqi_lock);
622 return 0;
623 }
624
625 static int
626 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
627 const struct timespec *abstime)
628 {
629 int n;
630 long index, freeindex;
631 int8_t *mptr;
632 struct sigevent *sigev;
633 struct mq_hdr *mqhdr;
634 struct mq_attr *attr;
635 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
636 struct mq_info *mqinfo;
637
638 myfault efault;
639 if (efault.faulted (EBADF))
640 return -1;
641
642 mqinfo = (struct mq_info *) mqd;
643 if (mqinfo->mqi_magic != MQI_MAGIC)
644 {
645 set_errno (EBADF);
646 return -1;
647 }
648 if (prio > MQ_PRIO_MAX)
649 {
650 set_errno (EINVAL);
651 return -1;
652 }
653
654 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
655 mptr = (int8_t *) mqhdr; /* byte pointer */
656 attr = &mqhdr->mqh_attr;
657 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
658 {
659 errno = n;
660 return -1;
661 }
662
663 if (len > (size_t) attr->mq_msgsize)
664 {
665 set_errno (EMSGSIZE);
666 goto err;
667 }
668 if (attr->mq_curmsgs == 0)
669 {
670 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
671 {
672 sigev = &mqhdr->mqh_event;
673 if (sigev->sigev_notify == SIGEV_SIGNAL)
674 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, sigev->sigev_value);
675 mqhdr->mqh_pid = 0; /* unregister */
676 }
677 }
678 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
679 {
680 /* Queue is full */
681 if (mqinfo->mqi_flags & O_NONBLOCK)
682 {
683 set_errno (EAGAIN);
684 goto err;
685 }
686 /* Wait for room for one message on the queue */
687 while (attr->mq_curmsgs >= attr->mq_maxmsg)
688 ipc_cond_timedwait (mqinfo->mqi_wait, mqinfo->mqi_lock, abstime);
689 }
690
691 /* nmsghdr will point to new message */
692 if ((freeindex = mqhdr->mqh_free) == 0)
693 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
694
695 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
696 nmsghdr->msg_prio = prio;
697 nmsghdr->msg_len = len;
698 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
699 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
700
701 /* Find right place for message in linked list */
702 index = mqhdr->mqh_head;
703 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
704 while (index)
705 {
706 msghdr = (struct msg_hdr *) &mptr[index];
707 if (prio > msghdr->msg_prio)
708 {
709 nmsghdr->msg_next = index;
710 pmsghdr->msg_next = freeindex;
711 break;
712 }
713 index = msghdr->msg_next;
714 pmsghdr = msghdr;
715 }
716 if (index == 0)
717 {
718 /* Queue was empty or new goes at end of list */
719 pmsghdr->msg_next = freeindex;
720 nmsghdr->msg_next = 0;
721 }
722 /* Wake up anyone blocked in mq_receive waiting for a message */
723 if (attr->mq_curmsgs == 0)
724 ipc_cond_signal (mqinfo->mqi_wait);
725 attr->mq_curmsgs++;
726
727 ipc_mutex_unlock (mqinfo->mqi_lock);
728 return 0;
729
730 err:
731 ipc_mutex_unlock (mqinfo->mqi_lock);
732 return -1;
733 }
734
735 extern "C" int
736 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
737 {
738 return _mq_send (mqd, ptr, len, prio, NULL);
739 }
740
741 extern "C" int
742 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
743 const struct timespec *abstime)
744 {
745 return _mq_send (mqd, ptr, len, prio, abstime);
746 }
747
748 static ssize_t
749 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
750 const struct timespec *abstime)
751 {
752 int n;
753 long index;
754 int8_t *mptr;
755 ssize_t len;
756 struct mq_hdr *mqhdr;
757 struct mq_attr *attr;
758 struct msg_hdr *msghdr;
759 struct mq_info *mqinfo;
760
761 myfault efault;
762 if (efault.faulted (EBADF))
763 return -1;
764
765 mqinfo = (struct mq_info *) mqd;
766 if (mqinfo->mqi_magic != MQI_MAGIC)
767 {
768 set_errno (EBADF);
769 return -1;
770 }
771 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
772 mptr = (int8_t *) mqhdr; /* byte pointer */
773 attr = &mqhdr->mqh_attr;
774 if ((n = ipc_mutex_lock (mqinfo->mqi_lock)) != 0)
775 {
776 errno = n;
777 return -1;
778 }
779
780 if (maxlen < (size_t) attr->mq_msgsize)
781 {
782 set_errno (EMSGSIZE);
783 goto err;
784 }
785 if (attr->mq_curmsgs == 0) /* queue is empty */
786 {
787 if (mqinfo->mqi_flags & O_NONBLOCK)
788 {
789 set_errno (EAGAIN);
790 goto err;
791 }
792 /* Wait for a message to be placed onto queue */
793 mqhdr->mqh_nwait++;
794 while (attr->mq_curmsgs == 0)
795 ipc_cond_timedwait (mqinfo->mqi_wait, mqinfo->mqi_lock, abstime);
796 mqhdr->mqh_nwait--;
797 }
798
799 if ((index = mqhdr->mqh_head) == 0)
800 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
801
802 msghdr = (struct msg_hdr *) &mptr[index];
803 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
804 len = msghdr->msg_len;
805 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
806 if (priop != NULL)
807 *priop = msghdr->msg_prio;
808
809 /* Just-read message goes to front of free list */
810 msghdr->msg_next = mqhdr->mqh_free;
811 mqhdr->mqh_free = index;
812
813 /* Wake up anyone blocked in mq_send waiting for room */
814 if (attr->mq_curmsgs == attr->mq_maxmsg)
815 ipc_cond_signal (mqinfo->mqi_wait);
816 attr->mq_curmsgs--;
817
818 ipc_mutex_unlock (mqinfo->mqi_lock);
819 return len;
820
821 err:
822 ipc_mutex_unlock (mqinfo->mqi_lock);
823 return -1;
824 }
825
826 extern "C" ssize_t
827 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
828 {
829 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
830 }
831
832 extern "C" ssize_t
833 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
834 const struct timespec *abstime)
835 {
836 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
837 }
838
839 extern "C" int
840 mq_close (mqd_t mqd)
841 {
842 long msgsize, filesize;
843 struct mq_hdr *mqhdr;
844 struct mq_attr *attr;
845 struct mq_info *mqinfo;
846
847 myfault efault;
848 if (efault.faulted (EBADF))
849 return -1;
850
851 mqinfo = (struct mq_info *) mqd;
852 if (mqinfo->mqi_magic != MQI_MAGIC)
853 {
854 set_errno (EBADF);
855 return -1;
856 }
857 mqhdr = mqinfo->mqi_hdr;
858 attr = &mqhdr->mqh_attr;
859
860 if (mq_notify (mqd, NULL)) /* unregister calling process */
861 return -1;
862
863 msgsize = MSGSIZE (attr->mq_msgsize);
864 filesize = sizeof (struct mq_hdr)
865 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
866 if (munmap (mqinfo->mqi_hdr, filesize) == -1)
867 return -1;
868
869 mqinfo->mqi_magic = 0; /* just in case */
870 ipc_cond_close (mqinfo->mqi_wait);
871 ipc_mutex_close (mqinfo->mqi_lock);
872 free (mqinfo);
873 return 0;
874 }
875
876 extern "C" int
877 mq_unlink (const char *name)
878 {
879 size_t len = strlen (name);
880 char mqname[ipc_names[mqueue].prefix_len + len];
881
882 if (!check_path (mqname, mqueue, name, len))
883 return -1;
884 if (unlink (mqname) == -1)
885 return -1;
886 return 0;
887 }
888
889 /* POSIX named semaphore implementation. Loosely based on Richard W. STEPHENS
890 implementation as far as sem_open is concerned, but under the hood using
891 the already existing semaphore class in thread.cc. Using a file backed
892 solution allows to implement kernel persistent named semaphores. */
893
894 struct sem_finfo
895 {
896 unsigned int value;
897 unsigned long long hash;
898 LUID luid;
899 };
900
901 extern "C" sem_t *
902 sem_open (const char *name, int oflag, ...)
903 {
904 int i, fd, created;
905 va_list ap;
906 mode_t mode = 0;
907 unsigned int value = 0;
908 struct __stat64 statbuff;
909 sem_t *sem = SEM_FAILED;
910 sem_finfo sf;
911 bool wasopen = false;
912 ipc_flock file;
913
914 size_t len = strlen (name);
915 char semname[ipc_names[semaphore].prefix_len + len];
916
917 if (!check_path (semname, semaphore, name, len))
918 return SEM_FAILED;
919
920 myfault efault;
921 if (efault.faulted (EFAULT))
922 return SEM_FAILED;
923
924 created = 0;
925 oflag &= (O_CREAT | O_EXCL);
926
927 again:
928 if (oflag & O_CREAT)
929 {
930 va_start (ap, oflag); /* init ap to final named argument */
931 mode = va_arg (ap, mode_t) & ~S_IXUSR;
932 value = va_arg (ap, unsigned int);
933 va_end (ap);
934
935 /* Open and specify O_EXCL and user-execute */
936 fd = open (semname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
937 if (fd < 0)
938 {
939 if (errno == EEXIST && (oflag & O_EXCL) == 0)
940 goto exists; /* already exists, OK */
941 return SEM_FAILED;
942 }
943 created = 1;
944 /* First one to create the file initializes it. */
945 if (!AllocateLocallyUniqueId (&sf.luid))
946 {
947 __seterrno ();
948 goto err;
949 }
950 sf.value = value;
951 sf.hash = hash_path_name (0, semname);
952 if (write (fd, &sf, sizeof sf) != sizeof sf)
953 goto err;
954 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, value, wasopen);
955 if (sem == SEM_FAILED)
956 goto err;
957 /* Initialization complete, turn off user-execute bit */
958 if (fchmod (fd, mode) == -1)
959 goto err;
960 /* Don't close (fd); */
961 return sem;
962 }
963
964 exists:
965 /* Open the file and fetch the semaphore name. */
966 if ((fd = open (semname, O_RDWR)) < 0)
967 {
968 if (errno == ENOENT && (oflag & O_CREAT))
969 goto again;
970 goto err;
971 }
972 /* Make certain initialization is complete */
973 for (i = 0; i < MAX_TRIES; i++)
974 {
975 if (stat64 (semname, &statbuff) == -1)
976 {
977 if (errno == ENOENT && (oflag & O_CREAT))
978 {
979 close (fd);
980 goto again;
981 }
982 goto err;
983 }
984 if ((statbuff.st_mode & S_IXUSR) == 0)
985 break;
986 sleep (1);
987 }
988 if (i == MAX_TRIES)
989 {
990 set_errno (ETIMEDOUT);
991 goto err;
992 }
993 if (file.lock (fd, sizeof sf))
994 goto err;
995 if (read (fd, &sf, sizeof sf) != sizeof sf)
996 goto err;
997 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, sf.value, wasopen);
998 file.unlock (fd);
999 if (sem == SEM_FAILED)
1000 goto err;
1001 /* If wasopen is set, the semaphore was already opened and we already have
1002 an open file descriptor pointing to the file. This means, we have to
1003 close the file descriptor created in this call. It won't be stored
1004 anywhere anyway. */
1005 if (wasopen)
1006 close (fd);
1007 return sem;
1008
1009 err:
1010 /* Don't let following function calls change errno */
1011 save_errno save;
1012
1013 file.unlock (fd);
1014 if (created)
1015 unlink (semname);
1016 if (sem != SEM_FAILED)
1017 semaphore::close (sem);
1018 close (fd);
1019 return SEM_FAILED;
1020 }
1021
1022 int
1023 _sem_close (sem_t *sem, bool do_close)
1024 {
1025 sem_finfo sf;
1026 int fd, ret = -1;
1027 ipc_flock file;
1028
1029 if (semaphore::getinternal (sem, &fd, &sf.hash, &sf.luid, &sf.value) == -1)
1030 return -1;
1031 if (!file.lock (fd, sizeof sf)
1032 && lseek64 (fd, 0LL, SEEK_SET) != (_off64_t) -1
1033 && write (fd, &sf, sizeof sf) == sizeof sf)
1034 ret = do_close ? semaphore::close (sem) : 0;
1035
1036 /* Don't let following function calls change errno */
1037 save_errno save;
1038 file.unlock (fd);
1039 close (fd);
1040
1041 return ret;
1042 }
1043
1044 extern "C" int
1045 sem_close (sem_t *sem)
1046 {
1047 return _sem_close (sem, true);
1048 }
1049
1050 extern "C" int
1051 sem_unlink (const char *name)
1052 {
1053 size_t len = strlen (name);
1054 char semname[ipc_names[semaphore].prefix_len + len];
1055
1056 if (!check_path (semname, semaphore, name, len))
1057 return -1;
1058 if (unlink (semname) == -1)
1059 return -1;
1060 return 0;
1061 }
This page took 0.084097 seconds and 6 git commands to generate.