]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
300a3f46ffd9159fb816139766b50d1260abfb5d
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 This file is part of Cygwin.
4
5 This software is a copyrighted work licensed under the terms of the
6 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
7 details. */
8
9 #include "winsup.h"
10 #include "shared_info.h"
11 #include "thread.h"
12 #include "path.h"
13 #include "cygtls.h"
14 #include "fhandler.h"
15 #include "dtable.h"
16 #include "cygheap.h"
17 #include "sigproc.h"
18 #include "ntdll.h"
19 #include <io.h>
20 #include <sys/mman.h>
21 #include <sys/param.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <mqueue.h>
25 #include <semaphore.h>
26
27 extern "C" int ftruncate64 (int fd, off_t length);
28
29 /* The prefix_len is the length of the path prefix including trailing "/"
30 (or "/sem." for semaphores) as well as the trailing NUL. */
31 static struct
32 {
33 const char *prefix;
34 const size_t prefix_len;
35 const char *description;
36 } ipc_names[] = {
37 { "/dev/shm", 10, "POSIX shared memory object" },
38 { "/dev/mqueue", 13, "POSIX message queue" },
39 { "/dev/shm", 14, "POSIX semaphore" }
40 };
41
42 enum ipc_type_t
43 {
44 shmem,
45 mqueue,
46 semaphore
47 };
48
49 static bool
50 check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
51 {
52 /* Note that we require the existance of the appropriate /dev subdirectories
53 for POSIX IPC object support, similar to Linux (which supports the
54 directories, but doesn't require to mount them). We don't create
55 these directory here, that's the task of the installer. But we check
56 for existance and give ample warning. */
57 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
58 if (path.error || !path.exists () || !path.isdir ())
59 {
60 small_printf (
61 "Warning: '%s' does not exists or is not a directory.\n\n"
62 "%ss require the existance of this directory.\n"
63 "Create the directory '%s' and set the permissions to 01777.\n"
64 "For instance on the command line: mkdir -m 01777 %s\n",
65 ipc_names[type].prefix, ipc_names[type].description,
66 ipc_names[type].prefix, ipc_names[type].prefix);
67 set_errno (EINVAL);
68 return false;
69 }
70 /* Apart from handling backslash like slash, the naming rules are identical
71 to Linux, including the names and requirements for subdirectories, if
72 the name contains further slashes. */
73 /* Name must not be empty and has to start with a slash (or backslash) */
74 if (!name || !strchr ("/\\", name[0]))
75 {
76 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
77 set_errno (EINVAL);
78 return false;
79 }
80 /* Name must not consist of just a single slash (or backslash) */
81 if (!name[1])
82 {
83 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
84 set_errno (ENOENT);
85 return false;
86 }
87 /* Name must not contain slashes after the leading one */
88 if (strpbrk (name + 1, "/\\"))
89 {
90 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
91 set_errno (EACCES);
92 return false;
93 }
94 /* Length must be less than or equal to NAME_MAX, or NAME_MAX - 4 in
95 case of semaphores, due to the leading "sem." prefix */
96 if (len > NAME_MAX - (type == semaphore ? strlen ("sem.") : 0))
97 {
98 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
99 set_errno (ENAMETOOLONG);
100 return false;
101 }
102 __small_sprintf (res_name, "%s/%s%s", ipc_names[type].prefix,
103 type == semaphore ? "sem." : "",
104 name + 1);
105 return true;
106 }
107
108 static int
109 ipc_mutex_lock (HANDLE mtx, bool eintr)
110 {
111 switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
112 | (eintr ? cw_sig_eintr : cw_sig_restart)))
113 {
114 case WAIT_OBJECT_0:
115 case WAIT_ABANDONED_0:
116 return 0;
117 case WAIT_SIGNALED:
118 set_errno (EINTR);
119 return 1;
120 default:
121 break;
122 }
123 return geterrno_from_win_error ();
124 }
125
126 static inline int
127 ipc_mutex_unlock (HANDLE mtx)
128 {
129 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
130 }
131
132 static int
133 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
134 {
135 HANDLE w4[4] = { evt, };
136 DWORD cnt = 2;
137 DWORD timer_idx = 0;
138 int ret = 0;
139
140 wait_signal_arrived here (w4[1]);
141 if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
142 ++cnt;
143 if (abstime)
144 {
145 if (!valid_timespec (*abstime))
146 return EINVAL;
147
148 /* If a timeout is set, we create a waitable timer to wait for.
149 This is the easiest way to handle the absolute timeout value, given
150 that NtSetTimer also takes absolute times and given the double
151 dependency on evt *and* mtx, which requires to call WFMO twice. */
152 NTSTATUS status;
153 LARGE_INTEGER duetime;
154
155 timer_idx = cnt++;
156 status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
157 NotificationTimer);
158 if (!NT_SUCCESS (status))
159 return geterrno_from_nt_status (status);
160 timespec_to_filetime (abstime, &duetime);
161 status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
162 if (!NT_SUCCESS (status))
163 {
164 NtClose (w4[timer_idx]);
165 return geterrno_from_nt_status (status);
166 }
167 }
168 ResetEvent (evt);
169 if ((ret = ipc_mutex_unlock (mtx)) != 0)
170 return ret;
171 /* Everything's set up, so now wait for the event to be signalled. */
172 restart1:
173 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
174 {
175 case WAIT_OBJECT_0:
176 break;
177 case WAIT_OBJECT_0 + 1:
178 if (_my_tls.call_signal_handler ())
179 goto restart1;
180 ret = EINTR;
181 break;
182 case WAIT_OBJECT_0 + 2:
183 if (timer_idx != 2)
184 pthread::static_cancel_self ();
185 fallthrough;
186 case WAIT_OBJECT_0 + 3:
187 ret = ETIMEDOUT;
188 break;
189 default:
190 ret = geterrno_from_win_error ();
191 break;
192 }
193 if (ret == 0)
194 {
195 /* At this point we need to lock the mutex. The wait is practically
196 the same as before, just that we now wait on the mutex instead of the
197 event. */
198 restart2:
199 w4[0] = mtx;
200 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
201 {
202 case WAIT_OBJECT_0:
203 case WAIT_ABANDONED_0:
204 break;
205 case WAIT_OBJECT_0 + 1:
206 if (_my_tls.call_signal_handler ())
207 goto restart2;
208 ret = EINTR;
209 break;
210 case WAIT_OBJECT_0 + 2:
211 if (timer_idx != 2)
212 pthread_testcancel ();
213 fallthrough;
214 case WAIT_OBJECT_0 + 3:
215 ret = ETIMEDOUT;
216 break;
217 default:
218 ret = geterrno_from_win_error ();
219 break;
220 }
221 }
222 if (timer_idx)
223 {
224 if (ret != ETIMEDOUT)
225 NtCancelTimer (w4[timer_idx], NULL);
226 NtClose (w4[timer_idx]);
227 }
228 return ret;
229 }
230
231 static inline void
232 ipc_cond_signal (HANDLE evt)
233 {
234 SetEvent (evt);
235 }
236
237 class ipc_flock
238 {
239 struct flock fl;
240
241 public:
242 ipc_flock () { memset (&fl, 0, sizeof fl); }
243
244 int lock (int fd, size_t size)
245 {
246 fl.l_type = F_WRLCK;
247 fl.l_whence = SEEK_SET;
248 fl.l_start = 0;
249 fl.l_len = size;
250 return fcntl64 (fd, F_SETLKW, &fl);
251 }
252 int unlock (int fd)
253 {
254 if (!fl.l_len)
255 return 0;
256 fl.l_type = F_UNLCK;
257 return fcntl64 (fd, F_SETLKW, &fl);
258 }
259 };
260
261 /* POSIX shared memory object implementation. */
262
263 extern "C" int
264 shm_open (const char *name, int oflag, mode_t mode)
265 {
266 size_t len = strlen (name);
267 char shmname[ipc_names[shmem].prefix_len + len];
268
269 if (!check_path (shmname, shmem, name, len))
270 return -1;
271
272 /* Check for valid flags. */
273 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
274 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
275 {
276 debug_printf ("Invalid oflag 0%o", oflag);
277 set_errno (EINVAL);
278 return -1;
279 }
280
281 return open (shmname, oflag | O_CLOEXEC, mode & 0777);
282 }
283
284 extern "C" int
285 shm_unlink (const char *name)
286 {
287 size_t len = strlen (name);
288 char shmname[ipc_names[shmem].prefix_len + len];
289
290 if (!check_path (shmname, shmem, name, len))
291 return -1;
292
293 return unlink (shmname);
294 }
295
296 /* The POSIX message queue implementation is based on W. Richard STEVENS
297 implementation, just tweaked for Cygwin. The main change is
298 the usage of Windows mutexes and events instead of using the pthread
299 synchronization objects. The pathname is massaged so that the
300 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
301 are implemented additionally. */
302
303 #pragma pack (push, 4)
304 struct msg_hdr
305 {
306 int32_t msg_next; /* index of next on linked list */
307 int32_t msg_len; /* actual length */
308 unsigned int msg_prio; /* priority */
309 };
310 #pragma pack (pop)
311
312 #define MSGSIZE(i) roundup((i), sizeof(long))
313
314 #define MAX_TRIES 10 /* for waiting for initialization */
315
316 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
317
318 extern "C" off_t lseek64 (int, off_t, int);
319 extern "C" void *mmap64 (void *, size_t, int, int, int, off_t);
320
321 extern "C" mqd_t
322 mq_open (const char *name, int oflag, ...)
323 {
324 int i, fd = -1, nonblock, created = 0;
325 long msgsize, index;
326 off_t filesize = 0;
327 va_list ap;
328 mode_t mode;
329 fhandler_mqueue *fh = NULL;
330 struct stat statbuff;
331 int8_t *mptr = NULL;
332 struct mq_hdr *mqhdr;
333 struct msg_hdr *msghdr;
334 struct mq_attr *attr;
335 struct mq_info *mqinfo = NULL;
336
337 size_t len = strlen (name);
338 char mqname[ipc_names[mqueue].prefix_len + len];
339
340 if (!check_path (mqname, mqueue, name, len))
341 return (mqd_t) -1;
342
343 __try
344 {
345 oflag &= (O_CREAT | O_EXCL | O_NONBLOCK);
346 nonblock = oflag & O_NONBLOCK;
347 oflag &= ~O_NONBLOCK;
348
349 again:
350 if (oflag & O_CREAT)
351 {
352 va_start (ap, oflag); /* init ap to final named argument */
353 mode = va_arg (ap, mode_t) & ~S_IXUSR;
354 attr = va_arg (ap, struct mq_attr *);
355 va_end (ap);
356
357 /* Open and specify O_EXCL and user-execute */
358 fd = open (mqname, oflag | O_EXCL | O_RDWR | O_CLOEXEC,
359 mode | S_IXUSR);
360 if (fd < 0)
361 {
362 if (errno == EEXIST && (oflag & O_EXCL) == 0)
363 goto exists; /* already exists, OK */
364 return (mqd_t) -1;
365 }
366 created = 1;
367 /* First one to create the file initializes it */
368 if (attr == NULL)
369 attr = &defattr;
370 /* Check minimum and maximum values. The max values are pretty much
371 arbitrary, taken from the linux mq_overview man page. However,
372 these max values make sure that the internal mq_fattr structure
373 can use 32 bit types. */
374 else if (attr->mq_maxmsg <= 0 || attr->mq_maxmsg > 32768
375 || attr->mq_msgsize <= 0 || attr->mq_msgsize > 1048576)
376 {
377 set_errno (EINVAL);
378 __leave;
379 }
380 /* Calculate and set the file size */
381 msgsize = MSGSIZE (attr->mq_msgsize);
382 filesize = sizeof (struct mq_hdr)
383 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
384 if (ftruncate64 (fd, filesize) == -1)
385 __leave;
386
387 /* Create file descriptor for mqueue */
388 cygheap_fdnew fdm;
389
390 if (fdm < 0)
391 __leave;
392 fh = (fhandler_mqueue *) build_fh_dev (*mqueue_dev, name);
393 if (!fh)
394 __leave;
395
396 mqinfo = fh->mqinfo_create ((HANDLE) _get_osfhandle (fd), filesize,
397 mode, nonblock);
398 if (!mqinfo)
399 __leave;
400
401 /* Initialize header at beginning of file */
402 /* Create free list with all messages on it */
403 mptr = (int8_t *) mqinfo->mqi_hdr;
404 mqhdr = mqinfo->mqi_hdr;
405 mqhdr->mqh_attr.mq_flags = 0;
406 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
407 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
408 mqhdr->mqh_attr.mq_curmsgs = 0;
409 mqhdr->mqh_nwait = 0;
410 mqhdr->mqh_pid = 0;
411 mqhdr->mqh_head = 0;
412 mqhdr->mqh_magic = MQI_MAGIC;
413 index = sizeof (struct mq_hdr);
414 mqhdr->mqh_free = index;
415 for (i = 0; i < attr->mq_maxmsg - 1; i++)
416 {
417 msghdr = (struct msg_hdr *) &mptr[index];
418 index += sizeof (struct msg_hdr) + msgsize;
419 msghdr->msg_next = index;
420 }
421 msghdr = (struct msg_hdr *) &mptr[index];
422 msghdr->msg_next = 0; /* end of free list */
423
424 /* Initialization complete, turn off user-execute bit */
425 if (fchmod (fd, mode) == -1)
426 __leave;
427 close (fd);
428 fdm = fh;
429 return (mqd_t) fdm;
430 }
431
432 exists:
433 /* Open the file then memory map */
434 if ((fd = open (mqname, O_RDWR | O_CLOEXEC)) < 0)
435 {
436 if (errno == ENOENT && (oflag & O_CREAT))
437 goto again;
438 __leave;
439 }
440 /* Make certain initialization is complete */
441 for (i = 0; i < MAX_TRIES; i++)
442 {
443 if (stat64 (mqname, &statbuff) == -1)
444 {
445 if (errno == ENOENT && (oflag & O_CREAT))
446 {
447 close (fd);
448 fd = -1;
449 goto again;
450 }
451 __leave;
452 }
453 if ((statbuff.st_mode & S_IXUSR) == 0)
454 break;
455 sleep (1);
456 }
457 if (i == MAX_TRIES)
458 {
459 set_errno (ETIMEDOUT);
460 __leave;
461 }
462
463 /* Create file descriptor for mqueue */
464 cygheap_fdnew fdm;
465
466 if (fdm < 0)
467 __leave;
468 fh = (fhandler_mqueue *) build_fh_dev (*mqueue_dev, name);
469 if (!fh)
470 __leave;
471
472 mqinfo = fh->mqinfo_open ((HANDLE) _get_osfhandle (fd), statbuff.st_size,
473 statbuff.st_mode, nonblock);
474 if (!mqinfo)
475 __leave;
476
477 close (fd);
478 fdm = fh;
479 return (mqd_t) fdm;
480 }
481 __except (EFAULT) {}
482 __endtry
483 /* Don't let following function calls change errno */
484 save_errno save;
485 if (created)
486 unlink (mqname);
487 if (fd >= 0)
488 close (fd);
489 if (fh)
490 {
491 fh->close ();
492 delete fh;
493 }
494 return (mqd_t) -1;
495 }
496
497 static struct mq_info *
498 get_mqinfo (cygheap_fdget &fd)
499 {
500 if (fd >= 0)
501 {
502 fhandler_mqueue *fh = fd->is_mqueue ();
503 if (fh)
504 return fh->mqinfo ();
505 set_errno (EINVAL);
506 }
507 return NULL;
508 }
509
510 extern "C" int
511 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
512 {
513 int n;
514 struct mq_hdr *mqhdr;
515 struct mq_fattr *attr;
516 struct mq_info *mqinfo;
517
518 __try
519 {
520 cygheap_fdget fd ((int) mqd, true);
521 mqinfo = get_mqinfo (fd);
522 if (mqinfo->mqi_magic != MQI_MAGIC)
523 {
524 set_errno (EBADF);
525 __leave;
526 }
527 mqhdr = mqinfo->mqi_hdr;
528 attr = &mqhdr->mqh_attr;
529 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
530 {
531 errno = n;
532 __leave;
533 }
534 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
535 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
536 mqstat->mq_msgsize = attr->mq_msgsize;
537 mqstat->mq_curmsgs = attr->mq_curmsgs;
538
539 ipc_mutex_unlock (mqinfo->mqi_lock);
540 return 0;
541 }
542 __except (EBADF) {}
543 __endtry
544 return -1;
545 }
546
547 extern "C" int
548 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
549 {
550 int n;
551 struct mq_hdr *mqhdr;
552 struct mq_fattr *attr;
553 struct mq_info *mqinfo;
554
555 __try
556 {
557 cygheap_fdget fd ((int) mqd, true);
558 mqinfo = get_mqinfo (fd);
559 if (mqinfo->mqi_magic != MQI_MAGIC)
560 {
561 set_errno (EBADF);
562 __leave;
563 }
564 mqhdr = mqinfo->mqi_hdr;
565 attr = &mqhdr->mqh_attr;
566 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
567 {
568 errno = n;
569 __leave;
570 }
571
572 if (omqstat != NULL)
573 {
574 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
575 omqstat->mq_maxmsg = attr->mq_maxmsg;
576 omqstat->mq_msgsize = attr->mq_msgsize;
577 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
578 }
579
580 if (mqstat->mq_flags & O_NONBLOCK)
581 mqinfo->mqi_flags |= O_NONBLOCK;
582 else
583 mqinfo->mqi_flags &= ~O_NONBLOCK;
584
585 ipc_mutex_unlock (mqinfo->mqi_lock);
586 return 0;
587 }
588 __except (EBADF) {}
589 __endtry
590 return -1;
591 }
592
593 extern "C" int
594 mq_notify (mqd_t mqd, const struct sigevent *notification)
595 {
596 int n;
597 pid_t pid;
598 struct mq_hdr *mqhdr;
599 struct mq_info *mqinfo;
600
601 __try
602 {
603 cygheap_fdget fd ((int) mqd, true);
604 mqinfo = get_mqinfo (fd);
605 if (mqinfo->mqi_magic != MQI_MAGIC)
606 {
607 set_errno (EBADF);
608 __leave;
609 }
610 mqhdr = mqinfo->mqi_hdr;
611 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
612 {
613 errno = n;
614 __leave;
615 }
616
617 pid = getpid ();
618 if (!notification)
619 {
620 if (mqhdr->mqh_pid == pid)
621 mqhdr->mqh_pid = 0; /* unregister calling process */
622 }
623 else
624 {
625 if (mqhdr->mqh_pid != 0)
626 {
627 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
628 {
629 set_errno (EBUSY);
630 ipc_mutex_unlock (mqinfo->mqi_lock);
631 __leave;
632 }
633 }
634 mqhdr->mqh_pid = pid;
635 mqhdr->mqh_event = *notification;
636 }
637 ipc_mutex_unlock (mqinfo->mqi_lock);
638 return 0;
639 }
640 __except (EBADF) {}
641 __endtry
642 return -1;
643 }
644
645 static int
646 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
647 const struct timespec *abstime)
648 {
649 int n;
650 long index, freeindex;
651 int8_t *mptr;
652 struct sigevent *sigev;
653 struct mq_hdr *mqhdr;
654 struct mq_fattr *attr;
655 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
656 struct mq_info *mqinfo = NULL;
657 bool ipc_mutex_locked = false;
658 int ret = -1;
659
660 pthread_testcancel ();
661
662 __try
663 {
664 cygheap_fdget fd ((int) mqd);
665 mqinfo = get_mqinfo (fd);
666 if (mqinfo->mqi_magic != MQI_MAGIC)
667 {
668 set_errno (EBADF);
669 __leave;
670 }
671 if (prio >= MQ_PRIO_MAX)
672 {
673 set_errno (EINVAL);
674 __leave;
675 }
676
677 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
678 mptr = (int8_t *) mqhdr; /* byte pointer */
679 attr = &mqhdr->mqh_attr;
680 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
681 {
682 errno = n;
683 __leave;
684 }
685 ipc_mutex_locked = true;
686 if (len > (size_t) attr->mq_msgsize)
687 {
688 set_errno (EMSGSIZE);
689 __leave;
690 }
691 if (attr->mq_curmsgs == 0)
692 {
693 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
694 {
695 sigev = &mqhdr->mqh_event;
696 if (sigev->sigev_notify == SIGEV_SIGNAL)
697 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
698 sigev->sigev_value);
699 mqhdr->mqh_pid = 0; /* unregister */
700 }
701 }
702 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
703 {
704 /* Queue is full */
705 if (mqinfo->mqi_flags & O_NONBLOCK)
706 {
707 set_errno (EAGAIN);
708 __leave;
709 }
710 /* Wait for room for one message on the queue */
711 while (attr->mq_curmsgs >= attr->mq_maxmsg)
712 {
713 int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend,
714 mqinfo->mqi_lock, abstime);
715 if (ret != 0)
716 {
717 set_errno (ret);
718 __leave;
719 }
720 }
721 }
722
723 /* nmsghdr will point to new message */
724 if ((freeindex = mqhdr->mqh_free) == 0)
725 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
726
727 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
728 nmsghdr->msg_prio = prio;
729 nmsghdr->msg_len = len;
730 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
731 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
732
733 /* Find right place for message in linked list */
734 index = mqhdr->mqh_head;
735 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
736 while (index)
737 {
738 msghdr = (struct msg_hdr *) &mptr[index];
739 if (prio > msghdr->msg_prio)
740 {
741 nmsghdr->msg_next = index;
742 pmsghdr->msg_next = freeindex;
743 break;
744 }
745 index = msghdr->msg_next;
746 pmsghdr = msghdr;
747 }
748 if (index == 0)
749 {
750 /* Queue was empty or new goes at end of list */
751 pmsghdr->msg_next = freeindex;
752 nmsghdr->msg_next = 0;
753 }
754 /* Wake up anyone blocked in mq_receive waiting for a message */
755 if (attr->mq_curmsgs == 0)
756 ipc_cond_signal (mqinfo->mqi_waitrecv);
757 attr->mq_curmsgs++;
758
759 ret = 0;
760 }
761 __except (EBADF) {}
762 __endtry
763 if (ipc_mutex_locked)
764 ipc_mutex_unlock (mqinfo->mqi_lock);
765 return ret;
766 }
767
768 extern "C" int
769 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
770 {
771 return _mq_send (mqd, ptr, len, prio, NULL);
772 }
773
774 extern "C" int
775 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
776 const struct timespec *abstime)
777 {
778 return _mq_send (mqd, ptr, len, prio, abstime);
779 }
780
781 static ssize_t
782 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
783 const struct timespec *abstime)
784 {
785 int n;
786 long index;
787 int8_t *mptr;
788 ssize_t len = -1;
789 struct mq_hdr *mqhdr;
790 struct mq_fattr *attr;
791 struct msg_hdr *msghdr;
792 struct mq_info *mqinfo;
793 bool ipc_mutex_locked = false;
794
795 pthread_testcancel ();
796
797 __try
798 {
799 cygheap_fdget fd ((int) mqd);
800 mqinfo = get_mqinfo (fd);
801 if (mqinfo->mqi_magic != MQI_MAGIC)
802 {
803 set_errno (EBADF);
804 __leave;
805 }
806 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
807 mptr = (int8_t *) mqhdr; /* byte pointer */
808 attr = &mqhdr->mqh_attr;
809 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
810 {
811 errno = n;
812 __leave;
813 }
814 ipc_mutex_locked = true;
815 if (maxlen < (size_t) attr->mq_msgsize)
816 {
817 set_errno (EMSGSIZE);
818 __leave;
819 }
820 if (attr->mq_curmsgs == 0) /* queue is empty */
821 {
822 if (mqinfo->mqi_flags & O_NONBLOCK)
823 {
824 set_errno (EAGAIN);
825 __leave;
826 }
827 /* Wait for a message to be placed onto queue */
828 mqhdr->mqh_nwait++;
829 while (attr->mq_curmsgs == 0)
830 {
831 int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv,
832 mqinfo->mqi_lock, abstime);
833 if (ret != 0)
834 {
835 set_errno (ret);
836 __leave;
837 }
838 }
839 mqhdr->mqh_nwait--;
840 }
841
842 if ((index = mqhdr->mqh_head) == 0)
843 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
844
845 msghdr = (struct msg_hdr *) &mptr[index];
846 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
847 len = msghdr->msg_len;
848 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
849 if (priop != NULL)
850 *priop = msghdr->msg_prio;
851
852 /* Just-read message goes to front of free list */
853 msghdr->msg_next = mqhdr->mqh_free;
854 mqhdr->mqh_free = index;
855
856 /* Wake up anyone blocked in mq_send waiting for room */
857 if (attr->mq_curmsgs == attr->mq_maxmsg)
858 ipc_cond_signal (mqinfo->mqi_waitsend);
859 attr->mq_curmsgs--;
860 }
861 __except (EBADF) {}
862 __endtry
863 if (ipc_mutex_locked)
864 ipc_mutex_unlock (mqinfo->mqi_lock);
865 return len;
866 }
867
868 extern "C" ssize_t
869 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
870 {
871 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
872 }
873
874 extern "C" ssize_t
875 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
876 const struct timespec *abstime)
877 {
878 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
879 }
880
881 extern "C" int
882 mq_close (mqd_t mqd)
883 {
884 __try
885 {
886 cygheap_fdget fd ((int) mqd, true);
887 if (!fd->is_mqueue ())
888 {
889 set_errno (EBADF);
890 __leave;
891 }
892
893 if (mq_notify (mqd, NULL)) /* unregister calling process */
894 __leave;
895
896 fd->isclosed (true);
897 fd->close ();
898 fd.release ();
899 return 0;
900 }
901 __except (EBADF) {}
902 __endtry
903 return -1;
904 }
905
906 extern "C" int
907 mq_unlink (const char *name)
908 {
909 size_t len = strlen (name);
910 char mqname[ipc_names[mqueue].prefix_len + len];
911
912 if (!check_path (mqname, mqueue, name, len))
913 return -1;
914 if (unlink (mqname) == -1)
915 return -1;
916 return 0;
917 }
918
919 /* POSIX named semaphore implementation. Loosely based on Richard W. STEPHENS
920 implementation as far as sem_open is concerned, but under the hood using
921 the already existing semaphore class in thread.cc. Using a file backed
922 solution allows to implement kernel persistent named semaphores. */
923
924 struct sem_finfo
925 {
926 unsigned int value;
927 unsigned long long hash;
928 LUID luid;
929 };
930
931 extern "C" sem_t *
932 sem_open (const char *name, int oflag, ...)
933 {
934 int i, fd = -1, created = 0;
935 va_list ap;
936 mode_t mode = 0;
937 unsigned int value = 0;
938 struct stat statbuff;
939 sem_t *sem = SEM_FAILED;
940 sem_finfo sf;
941 bool wasopen = false;
942 ipc_flock file;
943
944 size_t len = strlen (name);
945 char semname[ipc_names[semaphore].prefix_len + len];
946
947 if (!check_path (semname, semaphore, name, len))
948 return SEM_FAILED;
949
950 __try
951 {
952 oflag &= (O_CREAT | O_EXCL);
953
954 again:
955 if (oflag & O_CREAT)
956 {
957 va_start (ap, oflag); /* init ap to final named argument */
958 mode = va_arg (ap, mode_t) & ~S_IXUSR;
959 value = va_arg (ap, unsigned int);
960 va_end (ap);
961
962 /* Open and specify O_EXCL and user-execute */
963 fd = open (semname, oflag | O_EXCL | O_RDWR | O_CLOEXEC,
964 mode | S_IXUSR);
965 if (fd < 0)
966 {
967 if (errno == EEXIST && (oflag & O_EXCL) == 0)
968 goto exists; /* already exists, OK */
969 return SEM_FAILED;
970 }
971 created = 1;
972 /* First one to create the file initializes it. */
973 NtAllocateLocallyUniqueId (&sf.luid);
974 sf.value = value;
975 sf.hash = hash_path_name (0, semname);
976 if (write (fd, &sf, sizeof sf) != sizeof sf)
977 __leave;
978 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, value,
979 wasopen);
980 if (sem == SEM_FAILED)
981 __leave;
982 /* Initialization complete, turn off user-execute bit */
983 if (fchmod (fd, mode) == -1)
984 __leave;
985 /* Don't close (fd); */
986 return sem;
987 }
988
989 exists:
990 /* Open the file and fetch the semaphore name. */
991 if ((fd = open (semname, O_RDWR | O_CLOEXEC)) < 0)
992 {
993 if (errno == ENOENT && (oflag & O_CREAT))
994 goto again;
995 __leave;
996 }
997 /* Make certain initialization is complete */
998 for (i = 0; i < MAX_TRIES; i++)
999 {
1000 if (stat64 (semname, &statbuff) == -1)
1001 {
1002 if (errno == ENOENT && (oflag & O_CREAT))
1003 {
1004 close (fd);
1005 fd = -1;
1006 goto again;
1007 }
1008 __leave;
1009 }
1010 if ((statbuff.st_mode & S_IXUSR) == 0)
1011 break;
1012 sleep (1);
1013 }
1014 if (i == MAX_TRIES)
1015 {
1016 set_errno (ETIMEDOUT);
1017 __leave;
1018 }
1019 if (file.lock (fd, sizeof sf))
1020 __leave;
1021 if (read (fd, &sf, sizeof sf) != sizeof sf)
1022 __leave;
1023 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, sf.value,
1024 wasopen);
1025 file.unlock (fd);
1026 if (sem == SEM_FAILED)
1027 __leave;
1028 /* If wasopen is set, the semaphore was already opened and we already have
1029 an open file descriptor pointing to the file. This means, we have to
1030 close the file descriptor created in this call. It won't be stored
1031 anywhere anyway. */
1032 if (wasopen)
1033 close (fd);
1034 return sem;
1035 }
1036 __except (EFAULT) {}
1037 __endtry
1038 /* Don't let following function calls change errno */
1039 save_errno save;
1040
1041 if (fd >= 0)
1042 file.unlock (fd);
1043 if (created)
1044 unlink (semname);
1045 if (sem != SEM_FAILED)
1046 semaphore::close (sem);
1047 if (fd >= 0)
1048 close (fd);
1049 return SEM_FAILED;
1050 }
1051
1052 int
1053 _sem_close (sem_t *sem, bool do_close)
1054 {
1055 sem_finfo sf;
1056 int fd, ret = -1;
1057 ipc_flock file;
1058
1059 if (semaphore::getinternal (sem, &fd, &sf.hash, &sf.luid, &sf.value) == -1)
1060 return -1;
1061 if (!file.lock (fd, sizeof sf)
1062 && lseek64 (fd, 0LL, SEEK_SET) != (off_t) -1
1063 && write (fd, &sf, sizeof sf) == sizeof sf)
1064 ret = do_close ? semaphore::close (sem) : 0;
1065
1066 /* Don't let following function calls change errno */
1067 save_errno save;
1068 file.unlock (fd);
1069 close (fd);
1070
1071 return ret;
1072 }
1073
1074 extern "C" int
1075 sem_close (sem_t *sem)
1076 {
1077 return _sem_close (sem, true);
1078 }
1079
1080 extern "C" int
1081 sem_unlink (const char *name)
1082 {
1083 size_t len = strlen (name);
1084 char semname[ipc_names[semaphore].prefix_len + len];
1085
1086 if (!check_path (semname, semaphore, name, len))
1087 return -1;
1088 if (unlink (semname) == -1)
1089 return -1;
1090 return 0;
1091 }
This page took 0.082535 seconds and 4 git commands to generate.