/* Close any FDs we still hold, similarly as though this were
a program being spawned due to an system("") tapset function. */
closefrom(3);
-
+
/* We could call closefrom() here, to make sure we don't leak any
* fds to the target, but it really isn't needed here since
* close-on-exec should catch everything. We don't have the
dbug(1, "blocking briefly\n");
alarm(60); /* but not indefinitely */
-
+
#if WORKAROUND_BZ467568
{
/* Wait for the SIGUSR1 */
if (monitor)
monitor_setup();
-
+
/* In monitor mode, we must timeout pselect to poll the monitor
interface. In non-monitor mode, we must timeout pselect so that
we can handle pending_interrupts. */
FD_ZERO(&fds);
FD_SET(control_channel, &fds);
maxfd = control_channel;
- if (monitor) {
- FD_SET(STDIN_FILENO, &fds);
- FD_SET(monitor_pfd[0], &fds);
- if (monitor_pfd[0] > maxfd)
- maxfd = monitor_pfd[0];
- }
+ // Immediately update screen on input
+ if (monitor)
+ FD_SET(STDIN_FILENO, &fds);
res = pselect(maxfd + 1, &fds, NULL, NULL, timeout, &mainset);
if (res < 0 && errno != EINTR)
{
#define MAX_COLS 256
#define MAX_DATA 262144 /* XXX: pass procfs.read().maxsize(NNN) from stap */
#define MAX_HISTORY 8192
-#define MAX_LINELENGTH 4096
#define MIN(X,Y) (((X) < (Y)) ? (X) : (Y))
#define MAX(X,Y) (((X) > (Y)) ? (X) : (Y))
typedef struct History_Queue
{
char *lines[MAX_HISTORY]; /* each malloc/strdup'd(). */
- char linebuf[MAX_LINELENGTH]; /* beyond this length, we cut up lines */
- int linebuf_ptr; /* index into linebuf[] where next line piece should be read */
int oldest;
int newest;
int count;
} monitor_state;
static History_Queue h_queue;
+static pthread_mutex_t mutex;
static char probe[MAX_INDEX_LEN];
static WINDOW *status = NULL;
static WINDOW *output = NULL;
void monitor_setup(void)
{
+ pthread_mutex_init(&mutex, NULL);
probe[0] = '\0';
start_time = time(NULL);
initscr();
void monitor_cleanup(void)
{
+ pthread_mutex_destroy(&mutex);
monitor_end = 1;
endwin();
}
/* Render previously recorded output */
wclear(output);
+ pthread_mutex_lock(&mutex);
for (i = 0; i < h_queue.count-output_scroll; i++)
wprintw(output, "%s", h_queue.lines[(h_queue.oldest+i) % MAX_HISTORY]);
+ pthread_mutex_unlock(&mutex);
update_panels();
doupdate();
(void) discard; /* Unused */
}
+/* Called in staprun/relay.c */
void monitor_remember_output_line(const char* buf, const size_t bytes)
{
+ pthread_mutex_lock(&mutex);
free (h_queue.lines[h_queue.newest]);
h_queue.lines[h_queue.newest] = strndup(buf, bytes);
h_queue.newest = (h_queue.newest+1) % MAX_HISTORY;
h_queue.count++; /* and h_queue.oldest stays at 0 */
else
h_queue.oldest = (h_queue.oldest+1) % MAX_HISTORY;
+ pthread_mutex_unlock(&mutex);
}
void monitor_exited(void)
int cur_y;
int discard;
- /* NB: monitor_pfd[0] is the read side, O_NONBLOCK, of the pipe
- that collects/serializes all the per-cpu outputs. We can't
- use stdio calls. */
-
- /* Collect normal systemtap output */
- while (monitor_set)
- {
- ssize_t bytes = read(monitor_pfd[0],
- h_queue.linebuf + h_queue.linebuf_ptr,
- MAX_LINELENGTH - h_queue.linebuf_ptr);
- if (bytes <= 0)
- break;
-
- /* Start scanning the linebuf[] for lines - \n.
- Plop each one found into the h_queue.lines[] ring. */
- char *p = h_queue.linebuf; /* scan position */
- char *p_end = h_queue.linebuf + h_queue.linebuf_ptr + bytes; /* one past last byte */
- char *line = p;
- while (p < p_end)
- {
- if (*p == '\n') /* got a line */
- {
- monitor_remember_output_line(line, (p-line)+1); /* strlen, including \n */
- line = p+1;
- }
- p ++;
- }
-
- /* Flush remaining output */
- monitor_remember_output_line(line, (p_end - line));
- h_queue.linebuf_ptr = 0;
- }
-
getmaxyx(output, max_rows, discard);
getyx(status, cur_y, discard);
#include "staprun.h"
int out_fd[NR_CPUS];
-int monitor_pfd[2];
-int monitor_set = 0;
int monitor_end = 0;
static pthread_t reader[NR_CPUS];
static int relay_fd[NR_CPUS];
static time_t *time_backlog[NR_CPUS];
static int backlog_order=0;
#define BACKLOG_MASK ((1 << backlog_order) - 1)
+#define MONITORLINELENGTH 4096
#ifdef NEED_PPOLL
int ppoll(struct pollfd *fds, nfds_t nfds,
sigfillset(&sigs);
sigdelset(&sigs,SIGUSR2);
-
+
if (bulkmode) {
cpu_set_t cpu_mask;
CPU_ZERO(&cpu_mask);
timeout->tv_sec = reader_timeout_ms / 1000;
timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
}
-
+
pollfd.fd = relay_fd[cpu];
pollfd.events = POLLIN;
while ((rc = read(relay_fd[cpu], buf, sizeof(buf))) > 0) {
int wbytes = rc;
char *wbuf = buf;
-
+
/* Switching file */
pthread_mutex_lock(&mutex[cpu]);
if ((fsize_max && ((wsize + rc) > fsize_max)) ||
/* Copy loop. Must repeat write(2) in case of a pipe overflow
or other transient fullness. */
while (wbytes > 0) {
- rc = write(out_fd[cpu], wbuf, wbytes);
- if (rc <= 0) {
- perr("Couldn't write to output %d for cpu %d, exiting.",
- out_fd[cpu], cpu);
- goto error_out;
- }
- wbytes -= rc;
- wbuf += rc;
- wsize += rc;
+ if (monitor) {
+ ssize_t bytes = wbytes > MONITORLINELENGTH ? MONITORLINELENGTH : wbytes;
+ /* Start scanning the wbuf[] for lines - \n.
+ Plop each one found into the h_queue.lines[] ring. */
+ char *p = wbuf; /* scan position */
+ char *p_end = wbuf + bytes; /* one past last byte */
+ char *line = p;
+ while (p < p_end) {
+ if (*p == '\n') { /* got a line */
+ monitor_remember_output_line(line, (p-line)+1); /* strlen, including \n */
+ line = p+1;
+ }
+ p++;
+ }
+ /* Flush remaining output */
+ if (line != p_end)
+ monitor_remember_output_line(line, (p_end - line));
+ wbytes -= bytes;
+ wbuf += bytes;
+ wsize += bytes;
+ } else {
+ rc = write(out_fd[cpu], wbuf, wbytes);
+ if (rc <= 0) {
+ perr("Couldn't write to output %d for cpu %d, exiting.",
+ out_fd[cpu], cpu);
+ goto error_out;
+ }
+ wbytes -= rc;
+ wbuf += rc;
+ wsize += rc;
+ }
}
}
} while (!stop_threads);
char rqbuf[128];
char buf[PATH_MAX];
struct sigaction sa;
-
+
dbug(2, "initializing relayfs\n");
reader[0] = (pthread_t)0;
if (sprintf_chk(buf, "stpd_cpu%d", avail_cpus[i]))
return -1;
}
-
+
out_fd[avail_cpus[i]] = open_cloexec (buf, O_CREAT|O_TRUNC|O_WRONLY, 0666);
if (out_fd[avail_cpus[i]] < 0) {
perr("Couldn't open output file %s", buf);
return -1;
}
} else
- if (monitor) {
- if (pipe_cloexec(monitor_pfd)) {
- perr("Couldn't create pipe");
- return -1;
- }
- fcntl(monitor_pfd[0], F_SETFL, O_NONBLOCK); /* read end */
- /* NB: leave write end of pipe normal blocking mode, since
- that's the same mode as for STDOUT_FILENO. */
- /* fcntl(monitor_pfd[1], F_SETFL, O_NONBLOCK); */
-#ifdef HAVE_F_SETPIPE_SZ
- /* Make it less likely for the pipe to be full. */
- /* fcntl(monitor_pfd[1], F_SETPIPE_SZ, 8*65536); */
-#endif
- monitor_set = 1;
- out_fd[avail_cpus[0]] = monitor_pfd[1];
- } else {
- out_fd[avail_cpus[0]] = STDOUT_FILENO;
- }
+ out_fd[avail_cpus[0]] = STDOUT_FILENO;
}
memset(&sa, 0, sizeof(sa));
return -1;
}
}
-
+
return 0;
}
}
dbug(2, "done\n");
}
-
extern int initialized;
extern int kernel_ptr_size;
extern int monitor_pfd[2];
-extern int monitor_set;
extern int monitor_end;
/* flags */