]> sourceware.org Git - systemtap.git/blob - runtime/dyninst/transport.c
8cffe1a5647923ddba44c91023b5b5a55888222b
[systemtap.git] / runtime / dyninst / transport.c
1 /* -*- linux-c -*-
2 * Transport Functions
3 * Copyright (C) 2013 Red Hat Inc.
4 *
5 * This file is part of systemtap, and is free software. You can
6 * redistribute it and/or modify it under the terms of the GNU General
7 * Public License (GPL); either version 2, or (at your option) any
8 * later version.
9 */
10
11 #ifndef _STAPDYN_TRANSPORT_C_
12 #define _STAPDYN_TRANSPORT_C_
13
14 #include <time.h>
15 #include <unistd.h>
16 #include <sys/types.h>
17 #include <spawn.h>
18
19 #include <sys/syscall.h>
20
21 #include <errno.h>
22 #include <string.h>
23 #include <search.h>
24 #include <signal.h>
25
26 #include "transport.h"
27
28 ////////////////////////////////////////
29 //
30 // GENERAL TRANSPORT OVERVIEW
31 //
32 // Each context structure has a '_stp_transport_context_data'
33 // structure (described in more detail later) in it, which contains
34 // that context's print and log (warning/error) buffers. There is a
35 // session-wide double-buffered queue (stored in the
36 // '_stp_transport_session_data' structure) where each probe can send
37 // print/control messages to a fairly simple consumer thread (see
38 // _stp_dyninst_transport_thread_func() for details). The consumer
39 // thread swaps the read/write queues, then handles each request.
40 //
41 // Note that there is as little as possible data copying going on. A
42 // probe adds data to a print/log buffer stored in shared memory, then
43 // the consumer queue outputs the data from that same buffer.
44 //
45 //
46 // QUEUE OVERVIEW
47 //
48 // See the session-wide queue's definition in transport.h. It is
49 // composed of the '_stp_transport_queue_item', '_stp_transport_queue'
50 // and '_stp_transport_session_data' structures.
51 //
52 // The queue is double-buffered and stored in shared memory. Because
53 // it is session-wide, and multiple threads can be trying to add data
54 // to it simultaneously, the 'queue_mutex' is used to serialize
55 // access. Probes write to the write queue. When the consumer thread
56 // realizes data is available, it swaps the read/write queues (by
57 // changing the 'write_queue' value) and then processes each
58 // '_stp_transport_queue_item' on the read queue.
59 //
60 // If the queue is full, probes will wait on the 'queue_space_avail'
61 // condition variable for more space. The consumer thread sets
62 // 'queue_space_avail' when it swaps the read/write queues.
63 //
64 // The consumer thread waits on the 'queue_data_avail' condition
65 // variable to know when more items are available. When probes add
66 // items to the queue (using __stp_dyninst_transport_queue_add()),
67 // 'queue_data_avail' gets set.
68 //
69 //
70 // LOG BUFFER OVERVIEW
71 //
72 // See the context-specific log buffer's (struct
73 // _stp_transport_context_data) definition in transport.h.
74 //
75 // The log buffer, used for warning/error messages, is stored in
76 // shared memory. Each context structure has its own log buffer. Each
77 // log buffer logically contains '_STP_LOG_BUF_ENTRIES' buffers of
78 // length 'STP_LOG_BUF_LEN'. In other words, the log buffer allocation
79 // is done in chunks of size 'STP_LOG_BUF_LEN'. The log buffer is
80 // circular, and the indices use an extra most significant bit to
81 // indicate wrapping.
82 //
83 // Only the consumer thread removes items from the log buffer. The
84 // log buffer is circular, and the indices use an extra most
85 // significant bit to indicate wrapping.
86 //
87 // If the log buffer is full, probes will wait on the
88 // 'log_space_avail' condition variable for more space. The consumer
89 // thread sets 'log_space_avail' after finishing with a particular log
90 // buffer chunk.
91 //
92 // Note that the read index 'log_start' is only written to by the
93 // consumer thread and that the write index 'log_end' is only written
94 // to by the probes (with a locked context).
95 //
96 //
97 // PRINT BUFFER OVERVIEW
98 //
99 // See the context-specific print buffer definition (struct
100 // _stp_transport_context_data) in transport.h.
101 //
102 // The print buffer is stored in shared memory. Each context structure
103 // has its own print buffer. The print buffer really isn't a true
104 // circular buffer, it is more like a "semi-cicular" buffer. If a
105 // reservation request won't fit after the write offset, we go ahead
106 // and wrap around to the beginning (if available), leaving an unused
107 // gap at the end of the buffer. This is done to not break up
108 // reservation requests. Like a circular buffer, the offsets use an
109 // extra most significant bit to indicate wrapping.
110 //
111 // Only the consumer thread (normally) removes items from the print
112 // buffer. It is possible to 'unreserve' bytes using
113 // _stp_dyninst_transport_unreserve_bytes() if the bytes haven't been
114 // flushed.
115 //
116 // If the print buffer doesn't have enough bytes available, probes
117 // will flush any reserved bytes earlier than normal, then wait on the
118 // 'print_space_avail' condition variable for more space to become
119 // available. The consumer thread sets 'print_space_avail' after
120 // finishing with a particular print buffer segment.
121 //
122 // Note that the read index 'read_offset' is only written to by the
123 // consumer thread and that the write index 'write_offset' (and number
124 // of bytes to write 'write_bytes) is only written to by the probes
125 // (with a locked context).
126 //
127 ////////////////////////////////////////
128
129 static pthread_t _stp_transport_thread;
130 static int _stp_transport_thread_started = 0;
131
132 #ifndef STP_DYNINST_TIMEOUT_SECS
133 #define STP_DYNINST_TIMEOUT_SECS 5
134 #endif
135
136 // When we're converting an circular buffer/index into a pointer
137 // value, we need the "normalized" value (i.e. one without the extra
138 // msb possibly set).
139 #define _STP_D_T_LOG_NORM(x) ((x) & (_STP_LOG_BUF_ENTRIES - 1))
140 #define _STP_D_T_PRINT_NORM(x) ((x) & (_STP_DYNINST_BUFFER_SIZE - 1))
141
142 // Define a macro to generically add circular buffer
143 // offsets/indicies.
144 #define __STP_D_T_ADD(offset, increment, buffer_size) \
145 (((offset) + (increment)) & (2 * (buffer_size) - 1))
146
147 // Using __STP_D_T_ADD(), define a specific macro for each circular
148 // buffer.
149 #define _STP_D_T_LOG_INC(offset) \
150 __STP_D_T_ADD((offset), 1, _STP_LOG_BUF_ENTRIES)
151 #define _STP_D_T_PRINT_ADD(offset, increment) \
152 __STP_D_T_ADD((offset), (increment), _STP_DYNINST_BUFFER_SIZE)
153
154 // Return a pointer to the session's current write queue.
155 #define _STP_D_T_WRITE_QUEUE(sess_data) \
156 (&((sess_data)->queues[(sess_data)->write_queue]))
157
158 // Limit remembered strings in __stp_d_t_eliminate_duplicate_warnings
159 #define MAX_STORED_WARNINGS 1024
160
161
162 // If the transport has an error or debug message to print, it can't very well
163 // recurse on itself, so we just print to the local stderr and hope...
164 static void _stp_transport_err (const char *fmt, ...)
165 __attribute ((format (printf, 1, 2)));
166 static void _stp_transport_err (const char *fmt, ...)
167 {
168 va_list args;
169 va_start(args, fmt);
170 vfprintf (stderr, fmt, args);
171 va_end(args);
172 }
173
174 #ifdef DEBUG_TRANS
175 #define _stp_transport_debug(fmt, ...) \
176 _stp_transport_err("%s:%d - " fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__)
177 #else
178 #define _stp_transport_debug(fmt, ...) do { } while(0)
179 #endif
180
181
182 static void
183 __stp_dyninst_transport_queue_add(unsigned type, int data_index,
184 size_t offset, size_t bytes)
185 {
186 struct _stp_transport_session_data *sess_data = stp_transport_data();
187
188 if (sess_data == NULL)
189 return;
190
191 pthread_mutex_lock(&(sess_data->queue_mutex));
192 // While the write queue is full, wait.
193 while (_STP_D_T_WRITE_QUEUE(sess_data)->items
194 == (STP_DYNINST_QUEUE_ITEMS - 1)) {
195 pthread_cond_wait(&(sess_data->queue_space_avail),
196 &(sess_data->queue_mutex));
197 }
198 struct _stp_transport_queue *q = _STP_D_T_WRITE_QUEUE(sess_data);
199 struct _stp_transport_queue_item *item = &(q->queue[q->items]);
200 q->items++;
201 item->type = type;
202 item->data_index = data_index;
203 item->offset = offset;
204 item->bytes = bytes;
205 pthread_cond_signal(&(sess_data->queue_data_avail));
206 pthread_mutex_unlock(&(sess_data->queue_mutex));
207 }
208
209 /* Handle duplicate warning elimination. Returns 0 if we've seen this
210 * warning (and should be eliminated), 1 otherwise. */
211 static int
212 __stp_d_t_eliminate_duplicate_warnings(char *data, size_t bytes)
213 {
214 static void *seen = 0;
215 static unsigned seen_count = 0;
216 char *dupstr = strndup (data, bytes);
217 char *retval;
218 int rc = 1;
219
220 if (! dupstr) {
221 /* OOM, should not happen. */
222 return 1;
223 }
224
225 retval = tfind (dupstr, &seen,
226 (int (*)(const void*, const void*))strcmp);
227 if (! retval) { /* new message */
228 /* We set a maximum for stored warning messages, to
229 * prevent a misbehaving script/environment from
230 * emitting countless _stp_warn()s, and overflow
231 * staprun's memory. */
232 if (seen_count++ == MAX_STORED_WARNINGS) {
233 _stp_transport_err("WARNING deduplication table full\n");
234 free (dupstr);
235 }
236 else if (seen_count > MAX_STORED_WARNINGS) {
237 /* Be quiet in the future, but stop counting
238 * to preclude overflow. */
239 free (dupstr);
240 seen_count = MAX_STORED_WARNINGS + 1;
241 }
242 else if (seen_count < MAX_STORED_WARNINGS) {
243 /* NB: don't free dupstr; it's going into the tree. */
244 retval = tsearch (dupstr, & seen,
245 (int (*)(const void*, const void*))strcmp);
246 if (retval == 0) {
247 /* OOM, should not happen. Next time
248 * we should get the 'full'
249 * message. */
250 free (dupstr);
251 seen_count = MAX_STORED_WARNINGS;
252 }
253 }
254 }
255 else { /* old message */
256 free (dupstr);
257 rc = 0;
258 }
259 return rc;
260 }
261
262 static void
263 __stp_d_t_run_command(char *command)
264 {
265 /*
266 * FIXME: We'll need to make sure the output from system goes
267 * to the correct file descriptor. We may need some posix file
268 * actions to pass to posix_spawnp().
269 */
270 char *spawn_argv[4] = { "sh", "-c", command, NULL };
271 int rc = posix_spawnp(NULL, "sh", NULL, NULL, spawn_argv, NULL);
272 if (rc != 0) {
273 _stp_transport_err("ERROR: %s : %s\n", command, strerror(rc));
274 }
275 /* Notice we're not waiting on the resulting process to finish. */
276 }
277
278 static void
279 __stp_d_t_request_exit(void)
280 {
281 /*
282 * We want stapdyn to trigger this module's exit code from outside. It
283 * knows to do this on receipt of signals, so we must kill ourselves.
284 * The signal handler will forward that to the main thread.
285 *
286 * NB: If the target process was created rather than attached, SIGTERM
287 * waits for it to exit. SIGQUIT always exits immediately. It's
288 * somewhat debateable which is most appropriate here...
289 */
290 pthread_kill(pthread_self(), SIGTERM);
291 }
292
293 static ssize_t
294 _stp_write_retry(int fd, const void *buf, size_t count)
295 {
296 size_t remaining = count;
297 while (remaining > 0) {
298 ssize_t ret = write(fd, buf, remaining);
299 if (ret >= 0) {
300 buf += ret;
301 remaining -= ret;
302 }
303 else if (errno != EINTR) {
304 return ret;
305 }
306 }
307 return count;
308 }
309
310 static int
311 stap_strfloctime(char *buf, size_t max, const char *fmt, time_t t)
312 {
313 struct tm tm;
314 size_t ret;
315 if (buf == NULL || fmt == NULL || max <= 1)
316 return -EINVAL;
317 localtime_r(&t, &tm);
318 /* NB: this following invocation is the reason for stapdyn's
319 being built with -Wno-format-nonliteral. strftime parsing
320 does not have security implications AFAIK, but gcc still
321 wants to check them. */
322 ret = strftime(buf, max, fmt, &tm);
323 if (ret == 0)
324 return -EINVAL;
325 return (int)ret;
326 }
327
328 static void *
329 _stp_dyninst_transport_thread_func(void *arg __attribute((unused)))
330 {
331 int stopping = 0;
332 int out_fd, err_fd;
333 struct _stp_transport_session_data *sess_data = stp_transport_data();
334
335 if (sess_data == NULL)
336 return NULL;
337
338 if (strlen(stp_session_attributes()->outfile_name)) {
339 char buf[PATH_MAX];
340 int rc;
341
342 rc = stap_strfloctime(buf, PATH_MAX,
343 stp_session_attributes()->outfile_name,
344 time(NULL));
345 if (rc < 0) {
346 _stp_transport_err("Invalid FILE name format\n");
347 return NULL;
348 }
349 out_fd = open (buf, O_CREAT|O_TRUNC|O_WRONLY|O_CLOEXEC, 0666);
350 if (out_fd < 0) {
351 _stp_transport_err("ERROR: Couldn't open output file %s: %s\n",
352 buf, strerror(rc));
353 return NULL;
354 }
355 }
356 else
357 out_fd = STDOUT_FILENO;
358 err_fd = STDERR_FILENO;
359 if (out_fd < 0 || err_fd < 0)
360 return NULL;
361
362 while (! stopping) {
363 struct _stp_transport_queue *q;
364 struct _stp_transport_queue_item *item;
365 struct context *c;
366 struct _stp_transport_context_data *data;
367 void *read_ptr;
368
369 pthread_mutex_lock(&(sess_data->queue_mutex));
370 // While there are no queue entries, wait.
371 q = _STP_D_T_WRITE_QUEUE(sess_data);
372 while (q->items == 0) {
373 // Mutex is locked. It is automatically
374 // unlocked while we are waiting.
375 pthread_cond_wait(&(sess_data->queue_data_avail),
376 &(sess_data->queue_mutex));
377 // Mutex is locked again.
378 }
379
380 // We've got data. Swap the queues and let any waiters
381 // know there is more space available.
382 sess_data->write_queue ^= 1;
383 pthread_cond_broadcast(&(sess_data->queue_space_avail));
384 pthread_mutex_unlock(&(sess_data->queue_mutex));
385
386 // Note that we're processing the read queue with no
387 // locking. This is possible since no other thread
388 // will be accessing it until we're finished with it
389 // (and we make it the write queue).
390
391 // Process the queue twice. First handle the OOB data types.
392 for (size_t i = 0; i < q->items; i++) {
393 int write_data = 1;
394 item = &(q->queue[i]);
395 if (! (item->type & STP_DYN_OOB_DATA_MASK))
396 continue;
397
398 c = stp_session_context(item->data_index);
399 data = &c->transport_data;
400 read_ptr = data->log_buf + item->offset;
401
402 switch (item->type) {
403 case STP_DYN_OOB_DATA:
404 _stp_transport_debug(
405 "STP_DYN_OOB_DATA (%ld bytes at offset %ld)\n",
406 item->bytes, item->offset);
407
408 /* Note that "WARNING:" should not be
409 * translated, since it is part of the
410 * module cmd protocol. */
411 if (strncmp(read_ptr, "WARNING:", 7) == 0) {
412 if (stp_session_attributes()->suppress_warnings) {
413 write_data = 0;
414 }
415 /* If we're not verbose, eliminate
416 * duplicate warning messages. */
417 else if (stp_session_attributes()->log_level
418 == 0) {
419 write_data = __stp_d_t_eliminate_duplicate_warnings(read_ptr, item->bytes);
420 }
421 }
422 /* "ERROR:" also should not be translated. */
423 else if (strncmp(read_ptr, "ERROR:", 5) == 0) {
424 if (_stp_exit_status == 0)
425 _stp_exit_status = 1;
426 }
427
428 if (! write_data) {
429 break;
430 }
431
432 if (_stp_write_retry(err_fd, read_ptr, item->bytes) < 0)
433 _stp_transport_err(
434 "couldn't write %ld bytes OOB data: %s\n",
435 (long)item->bytes, strerror(errno));
436 break;
437
438 case STP_DYN_SYSTEM:
439 _stp_transport_debug("STP_DYN_SYSTEM (%.*s) %d bytes\n",
440 (int)item->bytes, (char *)read_ptr,
441 (int)item->bytes);
442 /*
443 * Note that the null character is
444 * already included in the system
445 * string.
446 */
447 __stp_d_t_run_command(read_ptr);
448 break;
449 default:
450 _stp_transport_err(
451 "Error - unknown OOB item type %d\n",
452 item->type);
453 break;
454 }
455
456 // Signal there is a log buffer available to
457 // any waiters.
458 data->log_start = _STP_D_T_LOG_INC(data->log_start);
459 pthread_mutex_lock(&(data->log_mutex));
460 pthread_cond_signal(&(data->log_space_avail));
461 pthread_mutex_unlock(&(data->log_mutex));
462 }
463
464 // Handle the non-OOB data.
465 for (size_t i = 0; i < q->items; i++) {
466 item = &(q->queue[i]);
467
468 switch (item->type) {
469 case STP_DYN_NORMAL_DATA:
470 _stp_transport_debug("STP_DYN_NORMAL_DATA"
471 " (%ld bytes at offset %ld)\n",
472 item->bytes, item->offset);
473 c = stp_session_context(item->data_index);
474 data = &c->transport_data;
475 read_ptr = (data->print_buf
476 + _STP_D_T_PRINT_NORM(item->offset));
477 if (_stp_write_retry(out_fd, read_ptr, item->bytes) < 0)
478 _stp_transport_err(
479 "couldn't write %ld bytes data: %s\n",
480 (long)item->bytes, strerror(errno));
481
482 pthread_mutex_lock(&(data->print_mutex));
483
484 // Now we need to update the read
485 // pointer, using the data_index we
486 // received. Note that we're doing
487 // this with or without that context
488 // locked, but the print_mutex is
489 // locked.
490 data->read_offset = _STP_D_T_PRINT_ADD(item->offset, item->bytes);
491
492 // Signal more bytes available to any waiters.
493 pthread_cond_signal(&(data->print_space_avail));
494 pthread_mutex_unlock(&(data->print_mutex));
495
496 _stp_transport_debug(
497 "STP_DYN_NORMAL_DATA flushed,"
498 " read_offset %ld, write_offset %ld)\n",
499 data->read_offset, data->write_offset);
500 break;
501
502 case STP_DYN_EXIT:
503 _stp_transport_debug("STP_DYN_EXIT\n");
504 stopping = 1;
505 break;
506
507 case STP_DYN_REQUEST_EXIT:
508 _stp_transport_debug("STP_DYN_REQUEST_EXIT\n");
509 __stp_d_t_request_exit();
510 break;
511
512 default:
513 if (! (item->type & STP_DYN_OOB_DATA_MASK)) {
514 _stp_transport_err(
515 "Error - unknown item type"
516 " %d\n", item->type);
517 }
518 break;
519 }
520 }
521
522 // We're now finished with the read queue. Clear it
523 // out.
524 q->items = 0;
525 }
526 return NULL;
527 }
528
529 static int _stp_ctl_send(int type, void *data, unsigned len)
530 {
531 _stp_transport_debug("type 0x%x data %p len %d\n",
532 type, data, len);
533
534 // This thread should already have a context structure.
535 struct context* c = _stp_runtime_get_context();
536 if (c == NULL)
537 return EINVAL;
538
539 // Currently, we're only handling 'STP_SYSTEM' control
540 // messages, converting it to a STP_DYN_SYSTEM message.
541 if (type != STP_SYSTEM)
542 return 0;
543
544 char *buffer = _stp_dyninst_transport_log_buffer();
545 if (buffer == NULL)
546 return 0;
547
548 memcpy(buffer, data, len);
549 size_t offset = buffer - c->transport_data.log_buf;
550 __stp_dyninst_transport_queue_add(STP_DYN_SYSTEM,
551 c->data_index, offset, len);
552 return len;
553 }
554
555 static void _stp_dyninst_transport_signal_exit(void)
556 {
557 __stp_dyninst_transport_queue_add(STP_DYN_EXIT, 0, 0, 0);
558 }
559
560 static void _stp_dyninst_transport_request_exit(void)
561 {
562 __stp_dyninst_transport_queue_add(STP_DYN_REQUEST_EXIT, 0, 0, 0);
563 }
564
565 static int _stp_dyninst_transport_session_init(void)
566 {
567 int rc;
568
569 // Set up the transport session data.
570 struct _stp_transport_session_data *sess_data = stp_transport_data();
571 if (sess_data != NULL) {
572 rc = stp_pthread_mutex_init_shared(&(sess_data->queue_mutex));
573 if (rc != 0) {
574 _stp_error("transport queue mutex initialization"
575 " failed");
576 return rc;
577 }
578 rc = stp_pthread_cond_init_shared(&(sess_data->queue_space_avail));
579 if (rc != 0) {
580 _stp_error("transport queue space avail cond variable"
581 " initialization failed");
582 return rc;
583 }
584 rc = stp_pthread_cond_init_shared(&(sess_data->queue_data_avail));
585 if (rc != 0) {
586 _stp_error("transport queue empty cond variable"
587 " initialization failed");
588 return rc;
589 }
590 }
591
592 // Set up each context's transport data.
593 int i;
594 for_each_possible_cpu(i) {
595 struct context *c;
596 struct _stp_transport_context_data *data;
597 c = stp_session_context(i);
598 if (c == NULL)
599 continue;
600 data = &c->transport_data;
601 rc = stp_pthread_mutex_init_shared(&(data->print_mutex));
602 if (rc != 0) {
603 _stp_error("transport mutex initialization failed");
604 return rc;
605 }
606
607 rc = stp_pthread_cond_init_shared(&(data->print_space_avail));
608 if (rc != 0) {
609 _stp_error("transport cond variable initialization failed");
610 return rc;
611 }
612
613 rc = stp_pthread_mutex_init_shared(&(data->log_mutex));
614 if (rc != 0) {
615 _stp_error("transport log mutex initialization failed");
616 return rc;
617 }
618
619 rc = stp_pthread_cond_init_shared(&(data->log_space_avail));
620 if (rc != 0) {
621 _stp_error("transport log cond variable initialization failed");
622 return rc;
623 }
624 }
625
626 return 0;
627 }
628
629 static int _stp_dyninst_transport_session_start(void)
630 {
631 int rc;
632
633 // Start the thread.
634 rc = pthread_create(&_stp_transport_thread, NULL,
635 &_stp_dyninst_transport_thread_func, NULL);
636 if (rc != 0) {
637 _stp_error("transport thread creation failed (%d)", rc);
638 return rc;
639 }
640 _stp_transport_thread_started = 1;
641 return 0;
642 }
643
644 static int
645 _stp_dyninst_transport_write_oob_data(char *buffer, size_t bytes)
646 {
647 // This thread should already have a context structure.
648 struct context* c = _stp_runtime_get_context();
649 if (c == NULL)
650 return EINVAL;
651
652 size_t offset = buffer - c->transport_data.log_buf;
653 __stp_dyninst_transport_queue_add(STP_DYN_OOB_DATA,
654 c->data_index, offset, bytes);
655 return 0;
656 }
657
658 static int _stp_dyninst_transport_write(void)
659 {
660 // This thread should already have a context structure.
661 struct context* c = _stp_runtime_get_context();
662 if (c == NULL)
663 return 0;
664 struct _stp_transport_context_data *data = &c->transport_data;
665 size_t bytes = data->write_bytes;
666
667 if (bytes == 0)
668 return 0;
669
670 // This should be thread-safe without using any additional
671 // locking. This probe is the only one using this context and
672 // the transport thread (the consumer) only writes to
673 // 'read_offset'. Any concurrent-running probe will be using a
674 // different context.
675 _stp_transport_debug(
676 "read_offset %ld, write_offset %ld, write_bytes %ld\n",
677 data->read_offset, data->write_offset, data->write_bytes);
678
679 // Notice we're not normalizing 'write_offset'. The consumer
680 // thread needs "raw" offsets.
681 size_t saved_write_offset = data->write_offset;
682 data->write_bytes = 0;
683
684 // Note that if we're writing all remaining bytes in the
685 // buffer, it can wrap (but only to either "high" or "low"
686 // 0).
687 data->write_offset = _STP_D_T_PRINT_ADD(data->write_offset, bytes);
688
689 __stp_dyninst_transport_queue_add(STP_DYN_NORMAL_DATA,
690 c->data_index,
691 saved_write_offset, bytes);
692 return 0;
693 }
694
695 static void _stp_dyninst_transport_shutdown(void)
696 {
697 // If we started the thread, tear everything down.
698 if (_stp_transport_thread_started != 1) {
699 return;
700 }
701
702 // Signal the thread to stop.
703 _stp_dyninst_transport_signal_exit();
704
705 // Wait for thread to quit...
706 pthread_join(_stp_transport_thread, NULL);
707 _stp_transport_thread_started = 0;
708
709 // Tear down the transport session data.
710 struct _stp_transport_session_data *sess_data = stp_transport_data();
711 if (sess_data != NULL) {
712 pthread_mutex_destroy(&(sess_data->queue_mutex));
713 pthread_cond_destroy(&(sess_data->queue_space_avail));
714 pthread_cond_destroy(&(sess_data->queue_data_avail));
715 }
716
717 // Tear down each context's transport data.
718 int i;
719 for_each_possible_cpu(i) {
720 struct context *c;
721 struct _stp_transport_context_data *data;
722 c = stp_session_context(i);
723 if (c == NULL)
724 continue;
725 data = &c->transport_data;
726 pthread_mutex_destroy(&(data->print_mutex));
727 pthread_cond_destroy(&(data->print_space_avail));
728 pthread_mutex_destroy(&(data->log_mutex));
729 pthread_cond_destroy(&(data->log_space_avail));
730 }
731 }
732
733 static int
734 _stp_dyninst_transport_log_buffer_full(struct _stp_transport_context_data *data)
735 {
736 // This inverts the most significant bit of 'log_start' before
737 // comparison.
738 return (data->log_end == (data->log_start ^ _STP_LOG_BUF_ENTRIES));
739 }
740
741
742 static char *_stp_dyninst_transport_log_buffer(void)
743 {
744 // This thread should already have a context structure.
745 struct context* c = _stp_runtime_get_context();
746 if (c == NULL)
747 return NULL;
748
749 // Note that the context structure is locked, so only one
750 // probe at a time can be operating on it.
751 struct _stp_transport_context_data *data = &c->transport_data;
752
753 // If there isn't an available log buffer, wait.
754 if (_stp_dyninst_transport_log_buffer_full(data)) {
755 pthread_mutex_lock(&(data->log_mutex));
756 while (_stp_dyninst_transport_log_buffer_full(data)) {
757 pthread_cond_wait(&(data->log_space_avail),
758 &(data->log_mutex));
759 }
760 pthread_mutex_unlock(&(data->log_mutex));
761 }
762
763 // Note that we're taking 'log_end' and normalizing it to start
764 // at 0 to get the proper entry number. We then multiply it by
765 // STP_LOG_BUF_LEN to find the proper buffer offset.
766 //
767 // Every "allocation" here is done in STP_LOG_BUF_LEN-sized
768 // chunks.
769 char *ptr = &data->log_buf[_STP_D_T_LOG_NORM(data->log_end)
770 * STP_LOG_BUF_LEN];
771
772 // Increment 'log_end'.
773 data->log_end = _STP_D_T_LOG_INC(data->log_end);
774 return ptr;
775 }
776
777 static size_t
778 __stp_d_t_space_before(struct _stp_transport_context_data *data,
779 size_t read_offset)
780 {
781 // If the offsets have differing most significant bits, then
782 // the write offset has wrapped, so there isn't any available
783 // space before the write offset.
784 if ((read_offset & _STP_DYNINST_BUFFER_SIZE)
785 != (data->write_offset & _STP_DYNINST_BUFFER_SIZE)) {
786 return 0;
787 }
788
789 return (_STP_D_T_PRINT_NORM(read_offset));
790 }
791
792 static size_t
793 __stp_d_t_space_after(struct _stp_transport_context_data *data,
794 size_t read_offset)
795 {
796 // We have to worry about wraparound here, in the case of a
797 // full buffer.
798 size_t write_end_offset = _STP_D_T_PRINT_ADD(data->write_offset,
799 data->write_bytes);
800
801 // If the offsets have differing most significant bits, then
802 // the write offset has wrapped, so the only available space
803 // after the write offset is between the (normalized) write
804 // offset and the (normalized) read offset.
805 if ((read_offset & _STP_DYNINST_BUFFER_SIZE)
806 != (write_end_offset & _STP_DYNINST_BUFFER_SIZE)) {
807 return (_STP_D_T_PRINT_NORM(read_offset)
808 - _STP_D_T_PRINT_NORM(write_end_offset));
809 }
810
811 return (_STP_DYNINST_BUFFER_SIZE
812 - _STP_D_T_PRINT_NORM(write_end_offset));
813 }
814
815 static void *_stp_dyninst_transport_reserve_bytes(int numbytes)
816 {
817 void *ret;
818
819 // This thread should already have a context structure.
820 struct context* c = _stp_runtime_get_context();
821 if (c == NULL) {
822 _stp_transport_debug("NULL context!\n");
823 return NULL;
824 }
825
826 struct _stp_transport_context_data *data = &c->transport_data;
827 size_t space_before, space_after, read_offset;
828
829 recheck:
830 pthread_mutex_lock(&(data->print_mutex));
831
832 // If the buffer is empty, reset everything to the
833 // beginning. This cuts down on fragmentation.
834 if (data->write_bytes == 0 && data->read_offset == data->write_offset
835 && data->read_offset != 0) {
836 data->read_offset = 0;
837 data->write_offset = 0;
838 }
839 // We cache the read_offset value to get a consistent view of
840 // the buffer (between calls to get the space before/after).
841 read_offset = data->read_offset;
842 pthread_mutex_unlock(&(data->print_mutex));
843
844 space_before = __stp_d_t_space_before(data, read_offset);
845 space_after = __stp_d_t_space_after(data, read_offset);
846
847 // If we don't have enough space, try to get more space by
848 // flushing and/or waiting.
849 if (space_before < numbytes && space_after < numbytes) {
850 // First, lock the mutex.
851 pthread_mutex_lock(&(data->print_mutex));
852
853 // There is a race condition here. We've checked for
854 // available free space, then locked the mutex. It is
855 // possible for more free space to have become
856 // available between the time we checked and the time
857 // we locked the mutex. Recheck the available free
858 // space.
859 read_offset = data->read_offset;
860 space_before = __stp_d_t_space_before(data, read_offset);
861 space_after = __stp_d_t_space_after(data, read_offset);
862
863 // If we still don't have enough space and we have
864 // data we haven't flushed, go ahead and flush to free
865 // up space.
866 if (space_before < numbytes && space_after < numbytes
867 && data->write_bytes != 0) {
868 // Flush the buffer. We have to do this while
869 // the mutex is locked, so that we can't miss
870 // the condition change. (If we did flush
871 // without the mutex locked, it would be
872 // possible for the consumer thread to signal
873 // the condition variable before we were
874 // waiting on it.)
875 _stp_dyninst_transport_write();
876
877 // Mutex is locked. It is automatically
878 // unlocked while we are waiting.
879 pthread_cond_wait(&(data->print_space_avail),
880 &(data->print_mutex));
881 // Mutex is locked again.
882
883 // Recheck available free space.
884 read_offset = data->read_offset;
885 space_before = __stp_d_t_space_before(data,
886 read_offset);
887 space_after = __stp_d_t_space_after(data, read_offset);
888 }
889
890 // If we don't have enough bytes available, do a timed
891 // wait for more bytes to become available. This might
892 // fail if there isn't anything in the queue for this
893 // context structure.
894 if (space_before < numbytes && space_after < numbytes) {
895 _stp_transport_debug(
896 "waiting for more space, numbytes %d,"
897 " before %ld, after %ld\n",
898 numbytes, space_before, space_after);
899
900 // Setup a timeout for
901 // STP_DYNINST_TIMEOUT_SECS seconds into the
902 // future.
903 struct timespec ts;
904 clock_gettime(CLOCK_REALTIME, &ts);
905 ts.tv_sec += STP_DYNINST_TIMEOUT_SECS;
906
907 // Mutex is locked. It is automatically
908 // unlocked while we are waiting.
909 pthread_cond_timedwait(&(data->print_space_avail),
910 &(data->print_mutex),
911 &ts);
912 // When pthread_cond_timedwait() returns, the
913 // mutex has been (re)locked.
914
915 // Now see if we've got more bytes available.
916 read_offset = data->read_offset;
917 space_before = __stp_d_t_space_before(data,
918 read_offset);
919 space_after = __stp_d_t_space_after(data, read_offset);
920 }
921 // We're finished with the mutex.
922 pthread_mutex_unlock(&(data->print_mutex));
923
924 // If we *still* don't have enough space available,
925 // quit. We've done all we can do.
926 if (space_before < numbytes && space_after < numbytes) {
927 _stp_transport_debug(
928 "not enough space available,"
929 " numbytes %d, before %ld, after %ld,"
930 " read_offset %ld, write_offset %ld\n",
931 numbytes, space_before, space_after,
932 read_offset, data->write_offset);
933 return NULL;
934 }
935 }
936
937 // OK, now we have enough space, either before or after the
938 // current write offset.
939 //
940 // We prefer using the size after the current write, which
941 // will help keep writes contiguous.
942 if (space_after >= numbytes) {
943 ret = (data->print_buf
944 + _STP_D_T_PRINT_NORM(data->write_offset)
945 + data->write_bytes);
946 data->write_bytes += numbytes;
947 _stp_transport_debug(
948 "reserve %d bytes after, bytes available"
949 " (%ld, %ld) read_offset %ld, write_offset %ld,"
950 " write_bytes %ld\n",
951 numbytes, space_before, space_after, data->read_offset,
952 data->write_offset, data->write_bytes);
953 return ret;
954 }
955
956 // OK, now we know we need to use the space before the write
957 // offset. If we've got existing bytes that haven't been
958 // flushed, flush them now.
959 if (data->write_bytes != 0) {
960 _stp_dyninst_transport_write();
961 // Flushing the buffer updates the write_offset, which
962 // could have caused it to wrap. Start all over.
963 _stp_transport_debug(
964 "rechecking available bytes after a flush...\n");
965 goto recheck;
966 }
967
968 // Wrap the offset around by inverting the most significant
969 // bit, then clearing out the lower bits.
970 data->write_offset = ((data->write_offset ^ _STP_DYNINST_BUFFER_SIZE)
971 & _STP_DYNINST_BUFFER_SIZE);
972 ret = data->print_buf;
973 data->write_bytes += numbytes;
974 _stp_transport_debug(
975 "reserve %d bytes before, bytes available"
976 " (%ld, %ld) read_offset %ld, write_offset %ld,"
977 " write_bytes %ld\n",
978 numbytes, space_before, space_after, data->read_offset,
979 data->write_offset, data->write_bytes);
980 return ret;
981 }
982
983 static void _stp_dyninst_transport_unreserve_bytes(int numbytes)
984 {
985 // This thread should already have a context structure.
986 struct context* c = _stp_runtime_get_context();
987 if (c == NULL)
988 return;
989
990 struct _stp_transport_context_data *data = &c->transport_data;
991 if (unlikely(numbytes <= 0 || numbytes > data->write_bytes))
992 return;
993
994 data->write_bytes -= numbytes;
995 }
996 #endif /* _STAPDYN_TRANSPORT_C_ */
This page took 0.08018 seconds and 4 git commands to generate.