From c483db11a149a0a4b4b339abb5689e5b132d66a4 Mon Sep 17 00:00:00 2001 From: Felix Lu Date: Fri, 29 Apr 2016 15:50:29 -0400 Subject: [PATCH] monitor mode: remove pipe Save output to monitor ring buffer directly instead of sending it through a pipe. * staprun/mainloop.c: remove pipe fd from fd set * staprun/monitor.c: remove pipe data processing, add mutex for ring buffer * staprun/relay.c: reader_thread - add line processing logic init_relayfs - remove pipe * staprun/staprun.h - remove pipe --- staprun/mainloop.c | 15 ++++------ staprun/monitor.c | 44 +++++---------------------- staprun/relay.c | 75 ++++++++++++++++++++++++---------------------- staprun/staprun.h | 1 - 4 files changed, 53 insertions(+), 82 deletions(-) diff --git a/staprun/mainloop.c b/staprun/mainloop.c index 874fbd8f8..b53bb52a7 100644 --- a/staprun/mainloop.c +++ b/staprun/mainloop.c @@ -217,7 +217,7 @@ void start_cmd(void) /* Close any FDs we still hold, similarly as though this were a program being spawned due to an system("") tapset function. */ closefrom(3); - + /* We could call closefrom() here, to make sure we don't leak any * fds to the target, but it really isn't needed here since * close-on-exec should catch everything. We don't have the @@ -262,7 +262,7 @@ void start_cmd(void) dbug(1, "blocking briefly\n"); alarm(60); /* but not indefinitely */ - + #if WORKAROUND_BZ467568 { /* Wait for the SIGUSR1 */ @@ -658,7 +658,7 @@ int stp_main_loop(void) if (monitor) monitor_setup(); - + /* In monitor mode, we must timeout pselect to poll the monitor interface. In non-monitor mode, we must timeout pselect so that we can handle pending_interrupts. */ @@ -709,12 +709,9 @@ int stp_main_loop(void) FD_ZERO(&fds); FD_SET(control_channel, &fds); maxfd = control_channel; - if (monitor) { - FD_SET(STDIN_FILENO, &fds); - FD_SET(monitor_pfd[0], &fds); - if (monitor_pfd[0] > maxfd) - maxfd = monitor_pfd[0]; - } + // Immediately update screen on input + if (monitor) + FD_SET(STDIN_FILENO, &fds); res = pselect(maxfd + 1, &fds, NULL, NULL, timeout, &mainset); if (res < 0 && errno != EINTR) { diff --git a/staprun/monitor.c b/staprun/monitor.c index 7cdcd5eae..745519da3 100644 --- a/staprun/monitor.c +++ b/staprun/monitor.c @@ -13,7 +13,6 @@ #define MAX_COLS 256 #define MAX_DATA 262144 /* XXX: pass procfs.read().maxsize(NNN) from stap */ #define MAX_HISTORY 8192 -#define MAX_LINELENGTH 4096 #define MIN(X,Y) (((X) < (Y)) ? (X) : (Y)) #define MAX(X,Y) (((X) > (Y)) ? (X) : (Y)) @@ -23,8 +22,6 @@ typedef struct History_Queue { char *lines[MAX_HISTORY]; /* each malloc/strdup'd(). */ - char linebuf[MAX_LINELENGTH]; /* beyond this length, we cut up lines */ - int linebuf_ptr; /* index into linebuf[] where next line piece should be read */ int oldest; int newest; int count; @@ -50,6 +47,7 @@ static enum state } monitor_state; static History_Queue h_queue; +static pthread_mutex_t mutex; static char probe[MAX_INDEX_LEN]; static WINDOW *status = NULL; static WINDOW *output = NULL; @@ -275,6 +273,7 @@ void monitor_winch(__attribute__((unused)) int signum) void monitor_setup(void) { + pthread_mutex_init(&mutex, NULL); probe[0] = '\0'; start_time = time(NULL); initscr(); @@ -290,6 +289,7 @@ void monitor_setup(void) void monitor_cleanup(void) { + pthread_mutex_destroy(&mutex); monitor_end = 1; endwin(); } @@ -312,8 +312,10 @@ void monitor_render(void) /* Render previously recorded output */ wclear(output); + pthread_mutex_lock(&mutex); for (i = 0; i < h_queue.count-output_scroll; i++) wprintw(output, "%s", h_queue.lines[(h_queue.oldest+i) % MAX_HISTORY]); + pthread_mutex_unlock(&mutex); update_panels(); doupdate(); @@ -496,8 +498,10 @@ void monitor_render(void) (void) discard; /* Unused */ } +/* Called in staprun/relay.c */ void monitor_remember_output_line(const char* buf, const size_t bytes) { + pthread_mutex_lock(&mutex); free (h_queue.lines[h_queue.newest]); h_queue.lines[h_queue.newest] = strndup(buf, bytes); h_queue.newest = (h_queue.newest+1) % MAX_HISTORY; @@ -506,6 +510,7 @@ void monitor_remember_output_line(const char* buf, const size_t bytes) h_queue.count++; /* and h_queue.oldest stays at 0 */ else h_queue.oldest = (h_queue.oldest+1) % MAX_HISTORY; + pthread_mutex_unlock(&mutex); } void monitor_exited(void) @@ -522,39 +527,6 @@ void monitor_input(void) int cur_y; int discard; - /* NB: monitor_pfd[0] is the read side, O_NONBLOCK, of the pipe - that collects/serializes all the per-cpu outputs. We can't - use stdio calls. */ - - /* Collect normal systemtap output */ - while (monitor_set) - { - ssize_t bytes = read(monitor_pfd[0], - h_queue.linebuf + h_queue.linebuf_ptr, - MAX_LINELENGTH - h_queue.linebuf_ptr); - if (bytes <= 0) - break; - - /* Start scanning the linebuf[] for lines - \n. - Plop each one found into the h_queue.lines[] ring. */ - char *p = h_queue.linebuf; /* scan position */ - char *p_end = h_queue.linebuf + h_queue.linebuf_ptr + bytes; /* one past last byte */ - char *line = p; - while (p < p_end) - { - if (*p == '\n') /* got a line */ - { - monitor_remember_output_line(line, (p-line)+1); /* strlen, including \n */ - line = p+1; - } - p ++; - } - - /* Flush remaining output */ - monitor_remember_output_line(line, (p_end - line)); - h_queue.linebuf_ptr = 0; - } - getmaxyx(output, max_rows, discard); getyx(status, cur_y, discard); diff --git a/staprun/relay.c b/staprun/relay.c index aa8137014..c87bef4ca 100644 --- a/staprun/relay.c +++ b/staprun/relay.c @@ -13,8 +13,6 @@ #include "staprun.h" int out_fd[NR_CPUS]; -int monitor_pfd[2]; -int monitor_set = 0; int monitor_end = 0; static pthread_t reader[NR_CPUS]; static int relay_fd[NR_CPUS]; @@ -26,6 +24,7 @@ static volatile int stop_threads = 0; static time_t *time_backlog[NR_CPUS]; static int backlog_order=0; #define BACKLOG_MASK ((1 << backlog_order) - 1) +#define MONITORLINELENGTH 4096 #ifdef NEED_PPOLL int ppoll(struct pollfd *fds, nfds_t nfds, @@ -139,7 +138,7 @@ static void *reader_thread(void *data) sigfillset(&sigs); sigdelset(&sigs,SIGUSR2); - + if (bulkmode) { cpu_set_t cpu_mask; CPU_ZERO(&cpu_mask); @@ -160,7 +159,7 @@ static void *reader_thread(void *data) timeout->tv_sec = reader_timeout_ms / 1000; timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000; } - + pollfd.fd = relay_fd[cpu]; pollfd.events = POLLIN; @@ -194,7 +193,7 @@ static void *reader_thread(void *data) while ((rc = read(relay_fd[cpu], buf, sizeof(buf))) > 0) { int wbytes = rc; char *wbuf = buf; - + /* Switching file */ pthread_mutex_lock(&mutex[cpu]); if ((fsize_max && ((wsize + rc) > fsize_max)) || @@ -212,15 +211,37 @@ static void *reader_thread(void *data) /* Copy loop. Must repeat write(2) in case of a pipe overflow or other transient fullness. */ while (wbytes > 0) { - rc = write(out_fd[cpu], wbuf, wbytes); - if (rc <= 0) { - perr("Couldn't write to output %d for cpu %d, exiting.", - out_fd[cpu], cpu); - goto error_out; - } - wbytes -= rc; - wbuf += rc; - wsize += rc; + if (monitor) { + ssize_t bytes = wbytes > MONITORLINELENGTH ? MONITORLINELENGTH : wbytes; + /* Start scanning the wbuf[] for lines - \n. + Plop each one found into the h_queue.lines[] ring. */ + char *p = wbuf; /* scan position */ + char *p_end = wbuf + bytes; /* one past last byte */ + char *line = p; + while (p < p_end) { + if (*p == '\n') { /* got a line */ + monitor_remember_output_line(line, (p-line)+1); /* strlen, including \n */ + line = p+1; + } + p++; + } + /* Flush remaining output */ + if (line != p_end) + monitor_remember_output_line(line, (p_end - line)); + wbytes -= bytes; + wbuf += bytes; + wsize += bytes; + } else { + rc = write(out_fd[cpu], wbuf, wbytes); + if (rc <= 0) { + perr("Couldn't write to output %d for cpu %d, exiting.", + out_fd[cpu], cpu); + goto error_out; + } + wbytes -= rc; + wbuf += rc; + wsize += rc; + } } } } while (!stop_threads); @@ -281,7 +302,7 @@ int init_relayfs(void) char rqbuf[128]; char buf[PATH_MAX]; struct sigaction sa; - + dbug(2, "initializing relayfs\n"); reader[0] = (pthread_t)0; @@ -370,7 +391,7 @@ int init_relayfs(void) if (sprintf_chk(buf, "stpd_cpu%d", avail_cpus[i])) return -1; } - + out_fd[avail_cpus[i]] = open_cloexec (buf, O_CREAT|O_TRUNC|O_WRONLY, 0666); if (out_fd[avail_cpus[i]] < 0) { perr("Couldn't open output file %s", buf); @@ -392,24 +413,7 @@ int init_relayfs(void) return -1; } } else - if (monitor) { - if (pipe_cloexec(monitor_pfd)) { - perr("Couldn't create pipe"); - return -1; - } - fcntl(monitor_pfd[0], F_SETFL, O_NONBLOCK); /* read end */ - /* NB: leave write end of pipe normal blocking mode, since - that's the same mode as for STDOUT_FILENO. */ - /* fcntl(monitor_pfd[1], F_SETFL, O_NONBLOCK); */ -#ifdef HAVE_F_SETPIPE_SZ - /* Make it less likely for the pipe to be full. */ - /* fcntl(monitor_pfd[1], F_SETPIPE_SZ, 8*65536); */ -#endif - monitor_set = 1; - out_fd[avail_cpus[0]] = monitor_pfd[1]; - } else { - out_fd[avail_cpus[0]] = STDOUT_FILENO; - } + out_fd[avail_cpus[0]] = STDOUT_FILENO; } memset(&sa, 0, sizeof(sa)); @@ -432,7 +436,7 @@ int init_relayfs(void) return -1; } } - + return 0; } @@ -464,4 +468,3 @@ void close_relayfs(void) } dbug(2, "done\n"); } - diff --git a/staprun/staprun.h b/staprun/staprun.h index 8e0c664c1..c586e37ee 100644 --- a/staprun/staprun.h +++ b/staprun/staprun.h @@ -244,7 +244,6 @@ extern int ncpus; extern int initialized; extern int kernel_ptr_size; extern int monitor_pfd[2]; -extern int monitor_set; extern int monitor_end; /* flags */ -- 2.43.5