]> sourceware.org Git - newlib-cygwin.git/blob - winsup/cygwin/posix_ipc.cc
Cygwin: POSIX msg queues: use queue name as key
[newlib-cygwin.git] / winsup / cygwin / posix_ipc.cc
1 /* posix_ipc.cc: POSIX IPC API for Cygwin.
2
3 This file is part of Cygwin.
4
5 This software is a copyrighted work licensed under the terms of the
6 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
7 details. */
8
9 #include "winsup.h"
10 #include "shared_info.h"
11 #include "thread.h"
12 #include "path.h"
13 #include "cygtls.h"
14 #include "fhandler.h"
15 #include "dtable.h"
16 #include "cygheap.h"
17 #include "sigproc.h"
18 #include "ntdll.h"
19 #include <io.h>
20 #include <sys/mman.h>
21 #include <sys/param.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <mqueue.h>
25 #include <semaphore.h>
26
27 extern "C" int ftruncate64 (int fd, off_t length);
28
29 /* The prefix_len is the length of the path prefix ncluding trailing "/"
30 (or "/sem." for semaphores) as well as the trailing NUL. */
31 static struct
32 {
33 const char *prefix;
34 const size_t prefix_len;
35 const char *description;
36 } ipc_names[] = {
37 { "/dev/shm", 10, "POSIX shared memory object" },
38 { "/dev/mqueue", 13, "POSIX message queue" },
39 { "/dev/shm", 14, "POSIX semaphore" }
40 };
41
42 enum ipc_type_t
43 {
44 shmem,
45 mqueue,
46 semaphore
47 };
48
49 static bool
50 check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
51 {
52 /* Note that we require the existance of the appropriate /dev subdirectories
53 for POSIX IPC object support, similar to Linux (which supports the
54 directories, but doesn't require to mount them). We don't create
55 these directory here, that's the task of the installer. But we check
56 for existance and give ample warning. */
57 path_conv path (ipc_names[type].prefix, PC_SYM_NOFOLLOW);
58 if (path.error || !path.exists () || !path.isdir ())
59 {
60 small_printf (
61 "Warning: '%s' does not exists or is not a directory.\n\n"
62 "%ss require the existance of this directory.\n"
63 "Create the directory '%s' and set the permissions to 01777.\n"
64 "For instance on the command line: mkdir -m 01777 %s\n",
65 ipc_names[type].prefix, ipc_names[type].description,
66 ipc_names[type].prefix, ipc_names[type].prefix);
67 set_errno (EINVAL);
68 return false;
69 }
70 /* Name must not be empty, or just be a single slash, or start with more
71 than one slash. Same for backslash.
72 Apart from handling backslash like slash, the naming rules are identical
73 to Linux, including the names and requirements for subdirectories, if
74 the name contains further slashes. */
75 if (!name || (strchr ("/\\", name[0])
76 && (!name[1] || strchr ("/\\", name[1]))))
77 {
78 debug_printf ("Invalid %s name '%s'", ipc_names[type].description, name);
79 set_errno (EINVAL);
80 return false;
81 }
82 /* Skip leading (back-)slash. */
83 if (strchr ("/\\", name[0]))
84 ++name;
85 if (len > PATH_MAX - ipc_names[type].prefix_len)
86 {
87 debug_printf ("%s name '%s' too long", ipc_names[type].description, name);
88 set_errno (ENAMETOOLONG);
89 return false;
90 }
91 __small_sprintf (res_name, "%s/%s%s", ipc_names[type].prefix,
92 type == semaphore ? "sem." : "",
93 name);
94 return true;
95 }
96
97 static int
98 ipc_mutex_init (HANDLE *pmtx, const char *name)
99 {
100 WCHAR buf[MAX_PATH];
101 UNICODE_STRING uname;
102 OBJECT_ATTRIBUTES attr;
103 NTSTATUS status;
104
105 __small_swprintf (buf, L"mqueue/mtx%s", name);
106 RtlInitUnicodeString (&uname, buf);
107 InitializeObjectAttributes (&attr, &uname, OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
108 get_shared_parent_dir (),
109 everyone_sd (CYG_MUTANT_ACCESS));
110 status = NtCreateMutant (pmtx, CYG_MUTANT_ACCESS, &attr, FALSE);
111 if (!NT_SUCCESS (status))
112 {
113 debug_printf ("NtCreateMutant: %y", status);
114 return geterrno_from_win_error (RtlNtStatusToDosError (status));
115 }
116 return 0;
117 }
118
119 static int
120 ipc_mutex_lock (HANDLE mtx, bool eintr)
121 {
122 switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
123 | (eintr ? cw_sig_eintr : cw_sig_restart)))
124 {
125 case WAIT_OBJECT_0:
126 case WAIT_ABANDONED_0:
127 return 0;
128 case WAIT_SIGNALED:
129 set_errno (EINTR);
130 return 1;
131 default:
132 break;
133 }
134 return geterrno_from_win_error ();
135 }
136
137 static inline int
138 ipc_mutex_unlock (HANDLE mtx)
139 {
140 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
141 }
142
143 static int
144 ipc_cond_init (HANDLE *pevt, const char *name, char sr)
145 {
146 WCHAR buf[MAX_PATH];
147 UNICODE_STRING uname;
148 OBJECT_ATTRIBUTES attr;
149 NTSTATUS status;
150
151 __small_swprintf (buf, L"mqueue/evt%s%c", name, sr);
152 RtlInitUnicodeString (&uname, buf);
153 InitializeObjectAttributes (&attr, &uname, OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
154 get_shared_parent_dir (),
155 everyone_sd (CYG_EVENT_ACCESS));
156 status = NtCreateEvent (pevt, CYG_EVENT_ACCESS, &attr,
157 NotificationEvent, FALSE);
158 if (!NT_SUCCESS (status))
159 {
160 debug_printf ("NtCreateEvent: %y", status);
161 return geterrno_from_win_error (RtlNtStatusToDosError (status));
162 }
163 return 0;
164 }
165
166 static int
167 ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
168 {
169 HANDLE w4[4] = { evt, };
170 DWORD cnt = 2;
171 DWORD timer_idx = 0;
172 int ret = 0;
173
174 wait_signal_arrived here (w4[1]);
175 if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
176 ++cnt;
177 if (abstime)
178 {
179 if (!valid_timespec (*abstime))
180 return EINVAL;
181
182 /* If a timeout is set, we create a waitable timer to wait for.
183 This is the easiest way to handle the absolute timeout value, given
184 that NtSetTimer also takes absolute times and given the double
185 dependency on evt *and* mtx, which requires to call WFMO twice. */
186 NTSTATUS status;
187 LARGE_INTEGER duetime;
188
189 timer_idx = cnt++;
190 status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
191 NotificationTimer);
192 if (!NT_SUCCESS (status))
193 return geterrno_from_nt_status (status);
194 timespec_to_filetime (abstime, &duetime);
195 status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
196 if (!NT_SUCCESS (status))
197 {
198 NtClose (w4[timer_idx]);
199 return geterrno_from_nt_status (status);
200 }
201 }
202 ResetEvent (evt);
203 if ((ret = ipc_mutex_unlock (mtx)) != 0)
204 return ret;
205 /* Everything's set up, so now wait for the event to be signalled. */
206 restart1:
207 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
208 {
209 case WAIT_OBJECT_0:
210 break;
211 case WAIT_OBJECT_0 + 1:
212 if (_my_tls.call_signal_handler ())
213 goto restart1;
214 ret = EINTR;
215 break;
216 case WAIT_OBJECT_0 + 2:
217 if (timer_idx != 2)
218 pthread::static_cancel_self ();
219 fallthrough;
220 case WAIT_OBJECT_0 + 3:
221 ret = ETIMEDOUT;
222 break;
223 default:
224 ret = geterrno_from_win_error ();
225 break;
226 }
227 if (ret == 0)
228 {
229 /* At this point we need to lock the mutex. The wait is practically
230 the same as before, just that we now wait on the mutex instead of the
231 event. */
232 restart2:
233 w4[0] = mtx;
234 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
235 {
236 case WAIT_OBJECT_0:
237 case WAIT_ABANDONED_0:
238 break;
239 case WAIT_OBJECT_0 + 1:
240 if (_my_tls.call_signal_handler ())
241 goto restart2;
242 ret = EINTR;
243 break;
244 case WAIT_OBJECT_0 + 2:
245 if (timer_idx != 2)
246 pthread_testcancel ();
247 fallthrough;
248 case WAIT_OBJECT_0 + 3:
249 ret = ETIMEDOUT;
250 break;
251 default:
252 ret = geterrno_from_win_error ();
253 break;
254 }
255 }
256 if (timer_idx)
257 {
258 if (ret != ETIMEDOUT)
259 NtCancelTimer (w4[timer_idx], NULL);
260 NtClose (w4[timer_idx]);
261 }
262 return ret;
263 }
264
265 static inline void
266 ipc_cond_signal (HANDLE evt)
267 {
268 SetEvent (evt);
269 }
270
271 class ipc_flock
272 {
273 struct flock fl;
274
275 public:
276 ipc_flock () { memset (&fl, 0, sizeof fl); }
277
278 int lock (int fd, size_t size)
279 {
280 fl.l_type = F_WRLCK;
281 fl.l_whence = SEEK_SET;
282 fl.l_start = 0;
283 fl.l_len = size;
284 return fcntl64 (fd, F_SETLKW, &fl);
285 }
286 int unlock (int fd)
287 {
288 if (!fl.l_len)
289 return 0;
290 fl.l_type = F_UNLCK;
291 return fcntl64 (fd, F_SETLKW, &fl);
292 }
293 };
294
295 /* POSIX shared memory object implementation. */
296
297 extern "C" int
298 shm_open (const char *name, int oflag, mode_t mode)
299 {
300 size_t len = strlen (name);
301 char shmname[ipc_names[shmem].prefix_len + len];
302
303 if (!check_path (shmname, shmem, name, len))
304 return -1;
305
306 /* Check for valid flags. */
307 if (((oflag & O_ACCMODE) != O_RDONLY && (oflag & O_ACCMODE) != O_RDWR)
308 || (oflag & ~(O_ACCMODE | O_CREAT | O_EXCL | O_TRUNC)))
309 {
310 debug_printf ("Invalid oflag 0%o", oflag);
311 set_errno (EINVAL);
312 return -1;
313 }
314
315 return open (shmname, oflag | O_CLOEXEC, mode & 0777);
316 }
317
318 extern "C" int
319 shm_unlink (const char *name)
320 {
321 size_t len = strlen (name);
322 char shmname[ipc_names[shmem].prefix_len + len];
323
324 if (!check_path (shmname, shmem, name, len))
325 return -1;
326
327 return unlink (shmname);
328 }
329
330 /* The POSIX message queue implementation is based on W. Richard STEVENS
331 implementation, just tweaked for Cygwin. The main change is
332 the usage of Windows mutexes and events instead of using the pthread
333 synchronization objects. The pathname is massaged so that the
334 files are created under /dev/mqueue. mq_timedsend and mq_timedreceive
335 are implemented additionally. */
336
337 #pragma pack (push, 4)
338 struct msg_hdr
339 {
340 int32_t msg_next; /* index of next on linked list */
341 int32_t msg_len; /* actual length */
342 unsigned int msg_prio; /* priority */
343 };
344 #pragma pack (pop)
345
346 #define MSGSIZE(i) roundup((i), sizeof(long))
347
348 #define MAX_TRIES 10 /* for waiting for initialization */
349
350 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
351
352 extern "C" off_t lseek64 (int, off_t, int);
353 extern "C" void *mmap64 (void *, size_t, int, int, int, off_t);
354
355 static inline int
356 _mq_ipc_init (struct mq_info *mqinfo, const char *name)
357 {
358 int ret;
359
360 /* Initialize mutex & condition variable */
361 ret = ipc_mutex_init (&mqinfo->mqi_lock, name);
362 if (ret)
363 return ret;
364 ret = ipc_cond_init (&mqinfo->mqi_waitsend, name, 'S');
365 if (ret)
366 {
367 NtClose (mqinfo->mqi_lock);
368 return ret;
369 }
370 ret = ipc_cond_init (&mqinfo->mqi_waitrecv, name, 'R');
371 if (ret)
372 {
373 NtClose (mqinfo->mqi_waitsend);
374 NtClose (mqinfo->mqi_lock);
375 return ret;
376 }
377 return 0;
378 }
379
380 static int8_t *
381 _map_file (int fd, SIZE_T filesize, HANDLE &secth)
382 {
383 OBJECT_ATTRIBUTES oa;
384 LARGE_INTEGER fsiz = { QuadPart: (LONGLONG) filesize };
385 NTSTATUS status;
386 PVOID addr = NULL;
387
388 secth = NULL;
389 InitializeObjectAttributes (&oa, NULL, 0, NULL, NULL);
390 status = NtCreateSection (&secth, SECTION_ALL_ACCESS, &oa, &fsiz,
391 PAGE_READWRITE, SEC_COMMIT,
392 (HANDLE) _get_osfhandle (fd));
393 if (NT_SUCCESS (status))
394 {
395 status = NtMapViewOfSection (secth, NtCurrentProcess (), &addr, 0,
396 filesize, NULL, &filesize,
397 ViewShare, 0, PAGE_READWRITE);
398 if (!NT_SUCCESS (status))
399 {
400 NtClose (secth);
401 secth = NULL;
402 }
403 }
404 if (!NT_SUCCESS (status))
405 __seterrno_from_nt_status (status);
406 return (int8_t *) addr;
407 }
408
409 extern "C" mqd_t
410 mq_open (const char *name, int oflag, ...)
411 {
412 int i, fd = -1, nonblock, created = 0;
413 long msgsize, index;
414 off_t filesize = 0;
415 va_list ap;
416 mode_t mode;
417 HANDLE secth;
418 int8_t *mptr = NULL;
419 fhandler_mqueue *fh;
420 struct stat statbuff;
421 struct mq_hdr *mqhdr;
422 struct msg_hdr *msghdr;
423 struct mq_attr *attr;
424 struct mq_info *mqinfo = NULL;
425
426 size_t len = strlen (name);
427 char mqname[ipc_names[mqueue].prefix_len + len];
428
429 if (!check_path (mqname, mqueue, name, len))
430 return (mqd_t) -1;
431
432 __try
433 {
434 oflag &= (O_CREAT | O_EXCL | O_NONBLOCK);
435 nonblock = oflag & O_NONBLOCK;
436 oflag &= ~O_NONBLOCK;
437
438 again:
439 if (oflag & O_CREAT)
440 {
441 va_start (ap, oflag); /* init ap to final named argument */
442 mode = va_arg (ap, mode_t) & ~S_IXUSR;
443 attr = va_arg (ap, struct mq_attr *);
444 va_end (ap);
445
446 /* Open and specify O_EXCL and user-execute */
447 fd = open (mqname, oflag | O_EXCL | O_RDWR | O_CLOEXEC,
448 mode | S_IXUSR);
449 if (fd < 0)
450 {
451 if (errno == EEXIST && (oflag & O_EXCL) == 0)
452 goto exists; /* already exists, OK */
453 return (mqd_t) -1;
454 }
455 created = 1;
456 /* First one to create the file initializes it */
457 if (attr == NULL)
458 attr = &defattr;
459 /* Check minimum and maximum values. The max values are pretty much
460 arbitrary, taken from the linux mq_overview man page. However,
461 these max values make sure that the internal mq_fattr structure
462 can use 32 bit types. */
463 else if (attr->mq_maxmsg <= 0 || attr->mq_maxmsg > 32768
464 || attr->mq_msgsize <= 0 || attr->mq_msgsize > 1048576)
465 {
466 set_errno (EINVAL);
467 __leave;
468 }
469 /* Calculate and set the file size */
470 msgsize = MSGSIZE (attr->mq_msgsize);
471 filesize = sizeof (struct mq_hdr)
472 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
473 if (ftruncate64 (fd, filesize) == -1)
474 __leave;
475
476 /* Memory map the file */
477 mptr = _map_file (fd, filesize, secth);
478 if (!mptr)
479 __leave;
480
481 /* Create file descriptor for mqueue */
482 cygheap_fdnew fdm;
483
484 if (fdm < 0)
485 __leave;
486 fh = (fhandler_mqueue *) build_fh_dev (*mqueue_dev);
487 if (!fh)
488 __leave;
489 fdm = fh;
490
491 mqinfo = fh->mqinfo (name, mptr, secth, filesize, mode, nonblock);
492
493 /* Initialize mutex & condition variables */
494 i = _mq_ipc_init (mqinfo, fh->get_name ());
495 if (i != 0)
496 {
497 set_errno (i);
498 __leave;
499 }
500
501 /* Initialize header at beginning of file */
502 /* Create free list with all messages on it */
503 mqhdr = mqinfo->mqi_hdr;
504 mqhdr->mqh_attr.mq_flags = 0;
505 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
506 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
507 mqhdr->mqh_attr.mq_curmsgs = 0;
508 mqhdr->mqh_nwait = 0;
509 mqhdr->mqh_pid = 0;
510 mqhdr->mqh_head = 0;
511 mqhdr->mqh_magic = MQI_MAGIC;
512 index = sizeof (struct mq_hdr);
513 mqhdr->mqh_free = index;
514 for (i = 0; i < attr->mq_maxmsg - 1; i++)
515 {
516 msghdr = (struct msg_hdr *) &mptr[index];
517 index += sizeof (struct msg_hdr) + msgsize;
518 msghdr->msg_next = index;
519 }
520 msghdr = (struct msg_hdr *) &mptr[index];
521 msghdr->msg_next = 0; /* end of free list */
522
523 /* Initialization complete, turn off user-execute bit */
524 if (fchmod (fd, mode) == -1)
525 __leave;
526 close (fd);
527 return (mqd_t) fdm;
528 }
529
530 exists:
531 /* Open the file then memory map */
532 if ((fd = open (mqname, O_RDWR | O_CLOEXEC)) < 0)
533 {
534 if (errno == ENOENT && (oflag & O_CREAT))
535 goto again;
536 __leave;
537 }
538 /* Make certain initialization is complete */
539 for (i = 0; i < MAX_TRIES; i++)
540 {
541 if (stat64 (mqname, &statbuff) == -1)
542 {
543 if (errno == ENOENT && (oflag & O_CREAT))
544 {
545 close (fd);
546 fd = -1;
547 goto again;
548 }
549 __leave;
550 }
551 if ((statbuff.st_mode & S_IXUSR) == 0)
552 break;
553 sleep (1);
554 }
555 if (i == MAX_TRIES)
556 {
557 set_errno (ETIMEDOUT);
558 __leave;
559 }
560
561 filesize = statbuff.st_size;
562 mptr = _map_file (fd, filesize, secth);
563 if (!mptr)
564 __leave;
565
566 close (fd);
567 fd = -1;
568
569 mqhdr = (struct mq_hdr *) mptr;
570 if (mqhdr->mqh_magic != MQI_MAGIC)
571 {
572 system_printf (
573 "Old message queue \"%s\" detected!\n"
574 "This file is not usable as message queue anymore due to changes in the "
575 "internal file layout. Please remove the file and try again.", mqname);
576 set_errno (EACCES);
577 __leave;
578 }
579
580 /* Create file descriptor for mqueue */
581 cygheap_fdnew fdm;
582
583 if (fdm < 0)
584 __leave;
585 fh = (fhandler_mqueue *) build_fh_dev (*mqueue_dev);
586 if (!fh)
587 __leave;
588 fdm = fh;
589
590 mqinfo = fh->mqinfo (name, mptr, secth, filesize, statbuff.st_mode,
591 nonblock);
592
593 /* Initialize mutex & condition variable */
594 i = _mq_ipc_init (mqinfo, fh->get_name ());
595 if (i != 0)
596 {
597 set_errno (i);
598 __leave;
599 }
600
601 return (mqd_t) fdm;
602 }
603 __except (EFAULT) {}
604 __endtry
605 /* Don't let following function calls change errno */
606 save_errno save;
607 if (created)
608 unlink (mqname);
609 if (mptr)
610 {
611 NtUnmapViewOfSection (NtCurrentProcess (), mptr);
612 NtClose (secth);
613 }
614 if (mqinfo)
615 {
616 if (mqinfo->mqi_lock)
617 NtClose (mqinfo->mqi_lock);
618 if (mqinfo->mqi_waitsend)
619 NtClose (mqinfo->mqi_waitsend);
620 if (mqinfo->mqi_waitrecv)
621 NtClose (mqinfo->mqi_waitrecv);
622 }
623 if (fd >= 0)
624 close (fd);
625 return (mqd_t) -1;
626 }
627
628 static struct mq_info *
629 get_mqinfo (cygheap_fdget &fd)
630 {
631 if (fd >= 0)
632 {
633 fhandler_mqueue *fh = fd->is_mqueue ();
634 if (!fh)
635 set_errno (EINVAL);
636 else
637 return fh->mqinfo ();
638 }
639 return NULL;
640 }
641
642 extern "C" int
643 mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
644 {
645 int n;
646 struct mq_hdr *mqhdr;
647 struct mq_fattr *attr;
648 struct mq_info *mqinfo;
649
650 __try
651 {
652 cygheap_fdget fd ((int) mqd, true);
653 mqinfo = get_mqinfo (fd);
654 if (mqinfo->mqi_magic != MQI_MAGIC)
655 {
656 set_errno (EBADF);
657 __leave;
658 }
659 mqhdr = mqinfo->mqi_hdr;
660 attr = &mqhdr->mqh_attr;
661 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
662 {
663 errno = n;
664 __leave;
665 }
666 mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
667 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
668 mqstat->mq_msgsize = attr->mq_msgsize;
669 mqstat->mq_curmsgs = attr->mq_curmsgs;
670
671 ipc_mutex_unlock (mqinfo->mqi_lock);
672 return 0;
673 }
674 __except (EBADF) {}
675 __endtry
676 return -1;
677 }
678
679 extern "C" int
680 mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
681 {
682 int n;
683 struct mq_hdr *mqhdr;
684 struct mq_fattr *attr;
685 struct mq_info *mqinfo;
686
687 __try
688 {
689 cygheap_fdget fd ((int) mqd, true);
690 mqinfo = get_mqinfo (fd);
691 if (mqinfo->mqi_magic != MQI_MAGIC)
692 {
693 set_errno (EBADF);
694 __leave;
695 }
696 mqhdr = mqinfo->mqi_hdr;
697 attr = &mqhdr->mqh_attr;
698 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
699 {
700 errno = n;
701 __leave;
702 }
703
704 if (omqstat != NULL)
705 {
706 omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
707 omqstat->mq_maxmsg = attr->mq_maxmsg;
708 omqstat->mq_msgsize = attr->mq_msgsize;
709 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
710 }
711
712 if (mqstat->mq_flags & O_NONBLOCK)
713 mqinfo->mqi_flags |= O_NONBLOCK;
714 else
715 mqinfo->mqi_flags &= ~O_NONBLOCK;
716
717 ipc_mutex_unlock (mqinfo->mqi_lock);
718 return 0;
719 }
720 __except (EBADF) {}
721 __endtry
722 return -1;
723 }
724
725 extern "C" int
726 mq_notify (mqd_t mqd, const struct sigevent *notification)
727 {
728 int n;
729 pid_t pid;
730 struct mq_hdr *mqhdr;
731 struct mq_info *mqinfo;
732
733 __try
734 {
735 cygheap_fdget fd ((int) mqd, true);
736 mqinfo = get_mqinfo (fd);
737 if (mqinfo->mqi_magic != MQI_MAGIC)
738 {
739 set_errno (EBADF);
740 __leave;
741 }
742 mqhdr = mqinfo->mqi_hdr;
743 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
744 {
745 errno = n;
746 __leave;
747 }
748
749 pid = getpid ();
750 if (!notification)
751 {
752 if (mqhdr->mqh_pid == pid)
753 mqhdr->mqh_pid = 0; /* unregister calling process */
754 }
755 else
756 {
757 if (mqhdr->mqh_pid != 0)
758 {
759 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
760 {
761 set_errno (EBUSY);
762 ipc_mutex_unlock (mqinfo->mqi_lock);
763 __leave;
764 }
765 }
766 mqhdr->mqh_pid = pid;
767 mqhdr->mqh_event = *notification;
768 }
769 ipc_mutex_unlock (mqinfo->mqi_lock);
770 return 0;
771 }
772 __except (EBADF) {}
773 __endtry
774 return -1;
775 }
776
777 static int
778 _mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
779 const struct timespec *abstime)
780 {
781 int n;
782 long index, freeindex;
783 int8_t *mptr;
784 struct sigevent *sigev;
785 struct mq_hdr *mqhdr;
786 struct mq_fattr *attr;
787 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
788 struct mq_info *mqinfo = NULL;
789 bool ipc_mutex_locked = false;
790 int ret = -1;
791
792 pthread_testcancel ();
793
794 __try
795 {
796 cygheap_fdget fd ((int) mqd);
797 mqinfo = get_mqinfo (fd);
798 if (mqinfo->mqi_magic != MQI_MAGIC)
799 {
800 set_errno (EBADF);
801 __leave;
802 }
803 if (prio >= MQ_PRIO_MAX)
804 {
805 set_errno (EINVAL);
806 __leave;
807 }
808
809 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
810 mptr = (int8_t *) mqhdr; /* byte pointer */
811 attr = &mqhdr->mqh_attr;
812 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
813 {
814 errno = n;
815 __leave;
816 }
817 ipc_mutex_locked = true;
818 if (len > (size_t) attr->mq_msgsize)
819 {
820 set_errno (EMSGSIZE);
821 __leave;
822 }
823 if (attr->mq_curmsgs == 0)
824 {
825 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
826 {
827 sigev = &mqhdr->mqh_event;
828 if (sigev->sigev_notify == SIGEV_SIGNAL)
829 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
830 sigev->sigev_value);
831 mqhdr->mqh_pid = 0; /* unregister */
832 }
833 }
834 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
835 {
836 /* Queue is full */
837 if (mqinfo->mqi_flags & O_NONBLOCK)
838 {
839 set_errno (EAGAIN);
840 __leave;
841 }
842 /* Wait for room for one message on the queue */
843 while (attr->mq_curmsgs >= attr->mq_maxmsg)
844 {
845 int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend,
846 mqinfo->mqi_lock, abstime);
847 if (ret != 0)
848 {
849 set_errno (ret);
850 __leave;
851 }
852 }
853 }
854
855 /* nmsghdr will point to new message */
856 if ((freeindex = mqhdr->mqh_free) == 0)
857 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
858
859 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
860 nmsghdr->msg_prio = prio;
861 nmsghdr->msg_len = len;
862 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
863 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
864
865 /* Find right place for message in linked list */
866 index = mqhdr->mqh_head;
867 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
868 while (index)
869 {
870 msghdr = (struct msg_hdr *) &mptr[index];
871 if (prio > msghdr->msg_prio)
872 {
873 nmsghdr->msg_next = index;
874 pmsghdr->msg_next = freeindex;
875 break;
876 }
877 index = msghdr->msg_next;
878 pmsghdr = msghdr;
879 }
880 if (index == 0)
881 {
882 /* Queue was empty or new goes at end of list */
883 pmsghdr->msg_next = freeindex;
884 nmsghdr->msg_next = 0;
885 }
886 /* Wake up anyone blocked in mq_receive waiting for a message */
887 if (attr->mq_curmsgs == 0)
888 ipc_cond_signal (mqinfo->mqi_waitrecv);
889 attr->mq_curmsgs++;
890
891 ret = 0;
892 }
893 __except (EBADF) {}
894 __endtry
895 if (ipc_mutex_locked)
896 ipc_mutex_unlock (mqinfo->mqi_lock);
897 return ret;
898 }
899
900 extern "C" int
901 mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
902 {
903 return _mq_send (mqd, ptr, len, prio, NULL);
904 }
905
906 extern "C" int
907 mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
908 const struct timespec *abstime)
909 {
910 return _mq_send (mqd, ptr, len, prio, abstime);
911 }
912
913 static ssize_t
914 _mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
915 const struct timespec *abstime)
916 {
917 int n;
918 long index;
919 int8_t *mptr;
920 ssize_t len = -1;
921 struct mq_hdr *mqhdr;
922 struct mq_fattr *attr;
923 struct msg_hdr *msghdr;
924 struct mq_info *mqinfo;
925 bool ipc_mutex_locked = false;
926
927 pthread_testcancel ();
928
929 __try
930 {
931 cygheap_fdget fd ((int) mqd);
932 mqinfo = get_mqinfo (fd);
933 if (mqinfo->mqi_magic != MQI_MAGIC)
934 {
935 set_errno (EBADF);
936 __leave;
937 }
938 mqhdr = mqinfo->mqi_hdr; /* struct pointer */
939 mptr = (int8_t *) mqhdr; /* byte pointer */
940 attr = &mqhdr->mqh_attr;
941 if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
942 {
943 errno = n;
944 __leave;
945 }
946 ipc_mutex_locked = true;
947 if (maxlen < (size_t) attr->mq_msgsize)
948 {
949 set_errno (EMSGSIZE);
950 __leave;
951 }
952 if (attr->mq_curmsgs == 0) /* queue is empty */
953 {
954 if (mqinfo->mqi_flags & O_NONBLOCK)
955 {
956 set_errno (EAGAIN);
957 __leave;
958 }
959 /* Wait for a message to be placed onto queue */
960 mqhdr->mqh_nwait++;
961 while (attr->mq_curmsgs == 0)
962 {
963 int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv,
964 mqinfo->mqi_lock, abstime);
965 if (ret != 0)
966 {
967 set_errno (ret);
968 __leave;
969 }
970 }
971 mqhdr->mqh_nwait--;
972 }
973
974 if ((index = mqhdr->mqh_head) == 0)
975 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
976
977 msghdr = (struct msg_hdr *) &mptr[index];
978 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
979 len = msghdr->msg_len;
980 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
981 if (priop != NULL)
982 *priop = msghdr->msg_prio;
983
984 /* Just-read message goes to front of free list */
985 msghdr->msg_next = mqhdr->mqh_free;
986 mqhdr->mqh_free = index;
987
988 /* Wake up anyone blocked in mq_send waiting for room */
989 if (attr->mq_curmsgs == attr->mq_maxmsg)
990 ipc_cond_signal (mqinfo->mqi_waitsend);
991 attr->mq_curmsgs--;
992 }
993 __except (EBADF) {}
994 __endtry
995 if (ipc_mutex_locked)
996 ipc_mutex_unlock (mqinfo->mqi_lock);
997 return len;
998 }
999
1000 extern "C" ssize_t
1001 mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
1002 {
1003 return _mq_receive (mqd, ptr, maxlen, priop, NULL);
1004 }
1005
1006 extern "C" ssize_t
1007 mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
1008 const struct timespec *abstime)
1009 {
1010 return _mq_receive (mqd, ptr, maxlen, priop, abstime);
1011 }
1012
1013 extern "C" int
1014 mq_close (mqd_t mqd)
1015 {
1016 struct mq_info *mqinfo;
1017
1018 __try
1019 {
1020 cygheap_fdget fd ((int) mqd, true);
1021 mqinfo = get_mqinfo (fd);
1022 if (mqinfo->mqi_magic != MQI_MAGIC)
1023 {
1024 set_errno (EBADF);
1025 __leave;
1026 }
1027
1028 if (mq_notify (mqd, NULL)) /* unregister calling process */
1029 __leave;
1030
1031 fd->isclosed (true);
1032 fd->close ();
1033 fd.release ();
1034 return 0;
1035 }
1036 __except (EBADF) {}
1037 __endtry
1038 return -1;
1039 }
1040
1041 extern "C" int
1042 mq_unlink (const char *name)
1043 {
1044 size_t len = strlen (name);
1045 char mqname[ipc_names[mqueue].prefix_len + len];
1046
1047 if (!check_path (mqname, mqueue, name, len))
1048 return -1;
1049 if (unlink (mqname) == -1)
1050 return -1;
1051 return 0;
1052 }
1053
1054 /* POSIX named semaphore implementation. Loosely based on Richard W. STEPHENS
1055 implementation as far as sem_open is concerned, but under the hood using
1056 the already existing semaphore class in thread.cc. Using a file backed
1057 solution allows to implement kernel persistent named semaphores. */
1058
1059 struct sem_finfo
1060 {
1061 unsigned int value;
1062 unsigned long long hash;
1063 LUID luid;
1064 };
1065
1066 extern "C" sem_t *
1067 sem_open (const char *name, int oflag, ...)
1068 {
1069 int i, fd = -1, created = 0;
1070 va_list ap;
1071 mode_t mode = 0;
1072 unsigned int value = 0;
1073 struct stat statbuff;
1074 sem_t *sem = SEM_FAILED;
1075 sem_finfo sf;
1076 bool wasopen = false;
1077 ipc_flock file;
1078
1079 size_t len = strlen (name);
1080 char semname[ipc_names[semaphore].prefix_len + len];
1081
1082 if (!check_path (semname, semaphore, name, len))
1083 return SEM_FAILED;
1084
1085 __try
1086 {
1087 oflag &= (O_CREAT | O_EXCL);
1088
1089 again:
1090 if (oflag & O_CREAT)
1091 {
1092 va_start (ap, oflag); /* init ap to final named argument */
1093 mode = va_arg (ap, mode_t) & ~S_IXUSR;
1094 value = va_arg (ap, unsigned int);
1095 va_end (ap);
1096
1097 /* Open and specify O_EXCL and user-execute */
1098 fd = open (semname, oflag | O_EXCL | O_RDWR | O_CLOEXEC,
1099 mode | S_IXUSR);
1100 if (fd < 0)
1101 {
1102 if (errno == EEXIST && (oflag & O_EXCL) == 0)
1103 goto exists; /* already exists, OK */
1104 return SEM_FAILED;
1105 }
1106 created = 1;
1107 /* First one to create the file initializes it. */
1108 NtAllocateLocallyUniqueId (&sf.luid);
1109 sf.value = value;
1110 sf.hash = hash_path_name (0, semname);
1111 if (write (fd, &sf, sizeof sf) != sizeof sf)
1112 __leave;
1113 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, value,
1114 wasopen);
1115 if (sem == SEM_FAILED)
1116 __leave;
1117 /* Initialization complete, turn off user-execute bit */
1118 if (fchmod (fd, mode) == -1)
1119 __leave;
1120 /* Don't close (fd); */
1121 return sem;
1122 }
1123
1124 exists:
1125 /* Open the file and fetch the semaphore name. */
1126 if ((fd = open (semname, O_RDWR | O_CLOEXEC)) < 0)
1127 {
1128 if (errno == ENOENT && (oflag & O_CREAT))
1129 goto again;
1130 __leave;
1131 }
1132 /* Make certain initialization is complete */
1133 for (i = 0; i < MAX_TRIES; i++)
1134 {
1135 if (stat64 (semname, &statbuff) == -1)
1136 {
1137 if (errno == ENOENT && (oflag & O_CREAT))
1138 {
1139 close (fd);
1140 fd = -1;
1141 goto again;
1142 }
1143 __leave;
1144 }
1145 if ((statbuff.st_mode & S_IXUSR) == 0)
1146 break;
1147 sleep (1);
1148 }
1149 if (i == MAX_TRIES)
1150 {
1151 set_errno (ETIMEDOUT);
1152 __leave;
1153 }
1154 if (file.lock (fd, sizeof sf))
1155 __leave;
1156 if (read (fd, &sf, sizeof sf) != sizeof sf)
1157 __leave;
1158 sem = semaphore::open (sf.hash, sf.luid, fd, oflag, mode, sf.value,
1159 wasopen);
1160 file.unlock (fd);
1161 if (sem == SEM_FAILED)
1162 __leave;
1163 /* If wasopen is set, the semaphore was already opened and we already have
1164 an open file descriptor pointing to the file. This means, we have to
1165 close the file descriptor created in this call. It won't be stored
1166 anywhere anyway. */
1167 if (wasopen)
1168 close (fd);
1169 return sem;
1170 }
1171 __except (EFAULT) {}
1172 __endtry
1173 /* Don't let following function calls change errno */
1174 save_errno save;
1175
1176 if (fd >= 0)
1177 file.unlock (fd);
1178 if (created)
1179 unlink (semname);
1180 if (sem != SEM_FAILED)
1181 semaphore::close (sem);
1182 if (fd >= 0)
1183 close (fd);
1184 return SEM_FAILED;
1185 }
1186
1187 int
1188 _sem_close (sem_t *sem, bool do_close)
1189 {
1190 sem_finfo sf;
1191 int fd, ret = -1;
1192 ipc_flock file;
1193
1194 if (semaphore::getinternal (sem, &fd, &sf.hash, &sf.luid, &sf.value) == -1)
1195 return -1;
1196 if (!file.lock (fd, sizeof sf)
1197 && lseek64 (fd, 0LL, SEEK_SET) != (off_t) -1
1198 && write (fd, &sf, sizeof sf) == sizeof sf)
1199 ret = do_close ? semaphore::close (sem) : 0;
1200
1201 /* Don't let following function calls change errno */
1202 save_errno save;
1203 file.unlock (fd);
1204 close (fd);
1205
1206 return ret;
1207 }
1208
1209 extern "C" int
1210 sem_close (sem_t *sem)
1211 {
1212 return _sem_close (sem, true);
1213 }
1214
1215 extern "C" int
1216 sem_unlink (const char *name)
1217 {
1218 size_t len = strlen (name);
1219 char semname[ipc_names[semaphore].prefix_len + len];
1220
1221 if (!check_path (semname, semaphore, name, len))
1222 return -1;
1223 if (unlink (semname) == -1)
1224 return -1;
1225 return 0;
1226 }
This page took 0.095183 seconds and 6 git commands to generate.