]> sourceware.org Git - systemtap.git/commitdiff
monitor mode: remove pipe
authorFelix Lu <flu@redhat.com>
Fri, 29 Apr 2016 19:50:29 +0000 (15:50 -0400)
committerFelix Lu <flu@redhat.com>
Mon, 2 May 2016 15:05:06 +0000 (11:05 -0400)
Save output to monitor ring buffer directly instead of sending it
through a pipe.

* staprun/mainloop.c: remove pipe fd from fd set
* staprun/monitor.c: remove pipe data processing, add mutex for ring
  buffer
* staprun/relay.c:
  reader_thread - add line processing logic
  init_relayfs - remove pipe
* staprun/staprun.h - remove pipe

staprun/mainloop.c
staprun/monitor.c
staprun/relay.c
staprun/staprun.h

index 874fbd8f8a992f200956b99e532ff09228e92e08..b53bb52a7ceff22b29130f6578f69d59b4a631b0 100644 (file)
@@ -217,7 +217,7 @@ void start_cmd(void)
     /* Close any FDs we still hold, similarly as though this were
        a program being spawned due to an system("") tapset function. */
     closefrom(3);
-    
+
     /* We could call closefrom() here, to make sure we don't leak any
      * fds to the target, but it really isn't needed here since
      * close-on-exec should catch everything. We don't have the
@@ -262,7 +262,7 @@ void start_cmd(void)
 
     dbug(1, "blocking briefly\n");
     alarm(60); /* but not indefinitely */
-    
+
 #if WORKAROUND_BZ467568
     {
       /* Wait for the SIGUSR1 */
@@ -658,7 +658,7 @@ int stp_main_loop(void)
 
   if (monitor)
       monitor_setup();
-  
+
   /* In monitor mode, we must timeout pselect to poll the monitor
      interface. In non-monitor mode, we must timeout pselect so that
      we can handle pending_interrupts. */
@@ -709,12 +709,9 @@ int stp_main_loop(void)
        FD_ZERO(&fds);
        FD_SET(control_channel, &fds);
         maxfd = control_channel;
-       if (monitor) {
-         FD_SET(STDIN_FILENO, &fds);
-         FD_SET(monitor_pfd[0], &fds);
-         if (monitor_pfd[0] > maxfd)
-           maxfd = monitor_pfd[0];
-       }
+        // Immediately update screen on input
+        if (monitor)
+          FD_SET(STDIN_FILENO, &fds);
        res = pselect(maxfd + 1, &fds, NULL, NULL, timeout, &mainset);
        if (res < 0 && errno != EINTR)
          {
index 7cdcd5eae2a4ed2154bd025dc4325a907f4482dd..745519da3895cf9950ead9cba66c49c6e4514eb7 100644 (file)
@@ -13,7 +13,6 @@
 #define MAX_COLS 256
 #define MAX_DATA 262144 /* XXX: pass procfs.read().maxsize(NNN) from stap */
 #define MAX_HISTORY 8192
-#define MAX_LINELENGTH 4096
 
 #define MIN(X,Y) (((X) < (Y)) ? (X) : (Y))
 #define MAX(X,Y) (((X) > (Y)) ? (X) : (Y))
@@ -23,8 +22,6 @@
 typedef struct History_Queue
 {
   char *lines[MAX_HISTORY]; /* each malloc/strdup'd(). */
-  char linebuf[MAX_LINELENGTH]; /* beyond this length, we cut up lines */
-  int linebuf_ptr; /* index into linebuf[] where next line piece should be read */
   int oldest;
   int newest;
   int count;
@@ -50,6 +47,7 @@ static enum state
 } monitor_state;
 
 static History_Queue h_queue;
+static pthread_mutex_t mutex;
 static char probe[MAX_INDEX_LEN];
 static WINDOW *status = NULL;
 static WINDOW *output = NULL;
@@ -275,6 +273,7 @@ void monitor_winch(__attribute__((unused)) int signum)
 
 void monitor_setup(void)
 {
+  pthread_mutex_init(&mutex, NULL);
   probe[0] = '\0';
   start_time = time(NULL);
   initscr();
@@ -290,6 +289,7 @@ void monitor_setup(void)
 
 void monitor_cleanup(void)
 {
+  pthread_mutex_destroy(&mutex);
   monitor_end = 1;
   endwin();
 }
@@ -312,8 +312,10 @@ void monitor_render(void)
 
   /* Render previously recorded output */
   wclear(output);
+  pthread_mutex_lock(&mutex);
   for (i = 0; i < h_queue.count-output_scroll; i++)
     wprintw(output, "%s", h_queue.lines[(h_queue.oldest+i) % MAX_HISTORY]);
+  pthread_mutex_unlock(&mutex);
   update_panels();
   doupdate();
 
@@ -496,8 +498,10 @@ void monitor_render(void)
   (void) discard; /* Unused */
 }
 
+/* Called in staprun/relay.c */
 void monitor_remember_output_line(const char* buf, const size_t bytes)
 {
+  pthread_mutex_lock(&mutex);
   free (h_queue.lines[h_queue.newest]);
   h_queue.lines[h_queue.newest] = strndup(buf, bytes);
   h_queue.newest = (h_queue.newest+1) % MAX_HISTORY;
@@ -506,6 +510,7 @@ void monitor_remember_output_line(const char* buf, const size_t bytes)
     h_queue.count++; /* and h_queue.oldest stays at 0 */
   else
     h_queue.oldest = (h_queue.oldest+1) % MAX_HISTORY;
+  pthread_mutex_unlock(&mutex);
 }
 
 void monitor_exited(void)
@@ -522,39 +527,6 @@ void monitor_input(void)
   int cur_y;
   int discard;
 
-  /* NB: monitor_pfd[0] is the read side, O_NONBLOCK, of the pipe
-     that collects/serializes all the per-cpu outputs.  We can't
-     use stdio calls. */
-
-  /* Collect normal systemtap output */
-  while (monitor_set)
-    {
-      ssize_t bytes = read(monitor_pfd[0],
-                           h_queue.linebuf + h_queue.linebuf_ptr,
-                           MAX_LINELENGTH - h_queue.linebuf_ptr);
-      if (bytes <= 0)
-        break;
-
-      /* Start scanning the linebuf[] for lines - \n.
-         Plop each one found into the h_queue.lines[] ring. */
-      char *p = h_queue.linebuf; /* scan position */
-      char *p_end = h_queue.linebuf + h_queue.linebuf_ptr + bytes; /* one past last byte */
-      char *line = p;
-      while (p < p_end)
-        {
-          if (*p == '\n') /* got a line */
-            {
-              monitor_remember_output_line(line, (p-line)+1); /* strlen, including \n */
-              line = p+1;
-            }
-          p ++;
-        }
-
-      /* Flush remaining output */
-      monitor_remember_output_line(line, (p_end - line));
-      h_queue.linebuf_ptr = 0;
-    }
-
   getmaxyx(output, max_rows, discard);
   getyx(status, cur_y, discard);
 
index aa81370142ed11714462705992dcf3a2e651bd04..c87bef4cad14b8d10eddc9671bedffdbc215a6cf 100644 (file)
@@ -13,8 +13,6 @@
 #include "staprun.h"
 
 int out_fd[NR_CPUS];
-int monitor_pfd[2];
-int monitor_set = 0;
 int monitor_end = 0;
 static pthread_t reader[NR_CPUS];
 static int relay_fd[NR_CPUS];
@@ -26,6 +24,7 @@ static volatile int stop_threads = 0;
 static time_t *time_backlog[NR_CPUS];
 static int backlog_order=0;
 #define BACKLOG_MASK ((1 << backlog_order) - 1)
+#define MONITORLINELENGTH 4096
 
 #ifdef NEED_PPOLL
 int ppoll(struct pollfd *fds, nfds_t nfds,
@@ -139,7 +138,7 @@ static void *reader_thread(void *data)
 
        sigfillset(&sigs);
        sigdelset(&sigs,SIGUSR2);
-       
+
        if (bulkmode) {
                cpu_set_t cpu_mask;
                CPU_ZERO(&cpu_mask);
@@ -160,7 +159,7 @@ static void *reader_thread(void *data)
                 timeout->tv_sec = reader_timeout_ms / 1000;
                 timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
         }
-       
+
        pollfd.fd = relay_fd[cpu];
        pollfd.events = POLLIN;
 
@@ -194,7 +193,7 @@ static void *reader_thread(void *data)
                while ((rc = read(relay_fd[cpu], buf, sizeof(buf))) > 0) {
                         int wbytes = rc;
                         char *wbuf = buf;
-                        
+
                        /* Switching file */
                        pthread_mutex_lock(&mutex[cpu]);
                        if ((fsize_max && ((wsize + rc) > fsize_max)) ||
@@ -212,15 +211,37 @@ static void *reader_thread(void *data)
                         /* Copy loop.  Must repeat write(2) in case of a pipe overflow
                            or other transient fullness. */
                         while (wbytes > 0) {
-                                rc = write(out_fd[cpu], wbuf, wbytes);
-                                if (rc <= 0) {
-                                       perr("Couldn't write to output %d for cpu %d, exiting.",
-                                             out_fd[cpu], cpu);
-                                        goto error_out;
-                                }
-                                wbytes -= rc;
-                                wbuf += rc;
-                                wsize += rc;
+                               if (monitor) {
+                                       ssize_t bytes = wbytes > MONITORLINELENGTH ? MONITORLINELENGTH : wbytes;
+                                       /* Start scanning the wbuf[] for lines - \n.
+                                         Plop each one found into the h_queue.lines[] ring. */
+                                       char *p = wbuf; /* scan position */
+                                       char *p_end = wbuf + bytes; /* one past last byte */
+                                       char *line = p;
+                                       while (p < p_end) {
+                                               if (*p == '\n') { /* got a line */
+                                                       monitor_remember_output_line(line, (p-line)+1); /* strlen, including \n */
+                                                       line = p+1;
+                                               }
+                                               p++;
+                                       }
+                                       /* Flush remaining output */
+                                       if (line != p_end)
+                                               monitor_remember_output_line(line, (p_end - line));
+                                       wbytes -= bytes;
+                                       wbuf += bytes;
+                                       wsize += bytes;
+                               } else {
+                                       rc = write(out_fd[cpu], wbuf, wbytes);
+                                       if (rc <= 0) {
+                                               perr("Couldn't write to output %d for cpu %d, exiting.",
+                                                    out_fd[cpu], cpu);
+                                               goto error_out;
+                                       }
+                                       wbytes -= rc;
+                                       wbuf += rc;
+                                       wsize += rc;
+                               }
                         }
                }
         } while (!stop_threads);
@@ -281,7 +302,7 @@ int init_relayfs(void)
        char rqbuf[128];
         char buf[PATH_MAX];
         struct sigaction sa;
-        
+
        dbug(2, "initializing relayfs\n");
 
        reader[0] = (pthread_t)0;
@@ -370,7 +391,7 @@ int init_relayfs(void)
                                if (sprintf_chk(buf, "stpd_cpu%d", avail_cpus[i]))
                                        return -1;
                        }
-                       
+
                        out_fd[avail_cpus[i]] = open_cloexec (buf, O_CREAT|O_TRUNC|O_WRONLY, 0666);
                        if (out_fd[avail_cpus[i]] < 0) {
                                perr("Couldn't open output file %s", buf);
@@ -392,24 +413,7 @@ int init_relayfs(void)
                                return -1;
                        }
                } else
-                       if (monitor) {
-                               if (pipe_cloexec(monitor_pfd)) {
-                                       perr("Couldn't create pipe");
-                                       return -1;
-                               }
-                               fcntl(monitor_pfd[0], F_SETFL, O_NONBLOCK); /* read end */
-                                /* NB: leave write end of pipe normal blocking mode, since
-                                   that's the same mode as for STDOUT_FILENO. */
-                               /* fcntl(monitor_pfd[1], F_SETFL, O_NONBLOCK); */ 
-#ifdef HAVE_F_SETPIPE_SZ
-                                /* Make it less likely for the pipe to be full. */
-                               /* fcntl(monitor_pfd[1], F_SETPIPE_SZ, 8*65536); */
-#endif
-                               monitor_set = 1;
-                               out_fd[avail_cpus[0]] = monitor_pfd[1];
-                       } else {
-                               out_fd[avail_cpus[0]] = STDOUT_FILENO;
-                       }
+                       out_fd[avail_cpus[0]] = STDOUT_FILENO;
        }
 
         memset(&sa, 0, sizeof(sa));
@@ -432,7 +436,7 @@ int init_relayfs(void)
                         return -1;
                 }
         }
-       
+
        return 0;
 }
 
@@ -464,4 +468,3 @@ void close_relayfs(void)
        }
        dbug(2, "done\n");
 }
-
index 8e0c664c1ad2beed779347623e593e653ce37967..c586e37eec05877fca79df7ffef5f8ac1293d6b0 100644 (file)
@@ -244,7 +244,6 @@ extern int ncpus;
 extern int initialized;
 extern int kernel_ptr_size;
 extern int monitor_pfd[2];
-extern int monitor_set;
 extern int monitor_end;
 
 /* flags */
This page took 0.035316 seconds and 5 git commands to generate.