From fc67febc6733e5803e6883a3757abda6268a953a Mon Sep 17 00:00:00 2001 From: Mark Wielaard Date: Tue, 16 Aug 2011 14:31:29 +0200 Subject: [PATCH] Reintroduce timer for transport cmd channel, don't wake_up unconditionally. Revert parts of commit a85c8a "runtime/io.c: Explicitly signal setting of _stp_exit_flag" and commit 46ac9e "Remove _stp_ctl_work_timer from module transport layer". Introduce a new test wake_up.exp that shows a deadlock when sending cmd messages and waking up the reader immediately. Renamed _stp_ctl_write to _stp_ctl_send, which can be called from everywhere. Rename _stp_ctl_send to _stp_ctl_send_notify that can be called from user context in the transport layer itself (this will immediately notify any readers). Document all places that use _stp_ctl_send_notify directly to clarify why that is safe. See http://sourceware.org/ml/systemtap/2011-q3/msg00163.html --- runtime/io.c | 4 +- runtime/print_flush.c | 8 ++- runtime/transport/control.c | 93 +++++++++++++---------------- runtime/transport/control.h | 3 +- runtime/transport/transport.c | 69 +++++++++++++++++++-- runtime/transport/transport.h | 2 - testsuite/systemtap.base/wakeup.exp | 24 ++++++++ testsuite/systemtap.base/wakeup.stp | 13 ++++ 8 files changed, 153 insertions(+), 63 deletions(-) create mode 100644 testsuite/systemtap.base/wakeup.exp create mode 100644 testsuite/systemtap.base/wakeup.stp diff --git a/runtime/io.c b/runtime/io.c index 94e828d90..64b521bbc 100644 --- a/runtime/io.c +++ b/runtime/io.c @@ -99,8 +99,10 @@ static void _stp_warn (const char *fmt, ...) */ static void _stp_exit (void) { + /* Just set the flag since this is possibly called from + kprobe context. A timer will come along and call + _stp_request_exit() for us. */ _stp_exit_flag = 1; - wake_up_interruptible(&_stp_ctl_wq); } /** Prints error message and exits. diff --git a/runtime/print_flush.c b/runtime/print_flush.c index a66a305e7..f067609fa 100644 --- a/runtime/print_flush.c +++ b/runtime/print_flush.c @@ -99,10 +99,12 @@ void EXPORT_FN(stp_print_flush)(_stp_pbuf *pb) #else /* !STP_BULKMODE */ #if STP_TRANSPORT_VERSION == 1 - /** STP_TRANSPORT_VERSION == 1 is special, _stp_ctl_send/write will + /** STP_TRANSPORT_VERSION == 1 is special, _stp_ctl_send will pass through procfs _stp_ctl_write_fs which recognizes - STP_REALTIME_DATA as data that needs to be send right away - over the .cmd channel instead of being queued. */ + STP_REALTIME_DATA as data that can be concatenated if the + previous buffer is also of type STP_REALTIME_DATA and there + is some room left in that packet instead of creating a new + packet to be queued. */ if (unlikely(_stp_ctl_send(STP_REALTIME_DATA, pb->buf, len) <= 0)) atomic_inc (&_stp_transport_failures); diff --git a/runtime/transport/control.c b/runtime/transport/control.c index cab2ab509..28a70ab1e 100644 --- a/runtime/transport/control.c +++ b/runtime/transport/control.c @@ -355,13 +355,12 @@ static void _stp_ctl_free_buffer(struct _stp_buffer *bptr) } } -/* Send a message directly (only old_relay) or puts it on the - _stp_ctl_ready_q. Doesn't call wake_up on _stp_ctl_wq (use - _stp_ctl_send if you need that behavior). Returns zero if len - was zero, or len > STP_CTL_BUFFER_SIZE. Returns the the length - if the send or stored message on success. Returns a negative - error code on failure. */ -static int _stp_ctl_write(int type, void *data, unsigned len) +/* Put a message on the _stp_ctl_ready_q. Safe to call from a probe context. + Doesn't call wake_up on _stp_ctl_wq (which would not be safe from all + probe context). A timer will come by and pick up the message to notify + any readers. Returns the number of bytes queued/send or zero/negative + on error. */ +static int _stp_ctl_send(int type, void *data, unsigned len) { struct _stp_buffer *bptr; unsigned long flags; @@ -371,61 +370,61 @@ static int _stp_ctl_write(int type, void *data, unsigned len) _stp_ctl_write_dbug(type, data, len); #endif + /* Give the fs a chance to do something special. + Like merging two packets in case the previous buffer + still has some room (transport version 1 procfs does this. */ hlen = _stp_ctl_write_fs(type, data, len); if (hlen > 0) return hlen; /* make sure we won't overflow the buffer */ - if (unlikely(len > STP_CTL_BUFFER_SIZE)) + if (unlikely(len > STP_CTL_BUFFER_SIZE)) { + /* We could try to send an error message instead? */ + printk(KERN_ERR "ctl_write_msg type=%d len=%d too large\n", + type, len); return 0; + } /* get a buffer from the free pool */ bptr = _stp_ctl_get_buffer(type, data, len); - if (unlikely(bptr == NULL)) + if (unlikely(bptr == NULL)) { + /* Nothing else we can do... */ + printk(KERN_ERR "ctl_write_msg type=%d len=%d ENOMEM\n", + type, len); return -ENOMEM; + } - /* put it on the pool of ready buffers */ + /* put it on the pool of ready buffers. Even though we are + holding a lock, calling list_add_tail() is safe here since + it will be totally inlined from the list.h header file so + we cannot recursively hit a kprobe inside the lock. */ spin_lock_irqsave(&_stp_ctl_ready_lock, flags); list_add_tail(&bptr->list, &_stp_ctl_ready_q); spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags); + /* It would be nice if we could speed up the notification + timer at this point, but calling mod_timer() at this + point would bring in more locking issues... */ return len + sizeof(bptr->type); } -/* send commands with timeout and retry (can still fail though). - Will call wake_up on _stp_ctl_wq if new data is available for - _stp_ctl_read_cmd, so stapio can immediately read it if wanted. - Returns zero if the message had zero length or was too big. - Returns the length of the final message if sucessfully send or queued. - Returns a negative error number on failure. */ -static int _stp_ctl_send(int type, void *data, int len) +/* Calls _stp_ctl_send and then calls wake_up on _stp_ctl_wq + to immediately notify listeners. DO NOT CALL THIS FROM A (KERNEL) + PROBE CONTEXT. This is only safe to call from the transport layer + itself when in user context. All code that could be triggered from + a probe context should call _stp_ctl_send(). */ +static int _stp_ctl_send_notify(int type, void *data, unsigned len) { - int err, mesg_on_queue = 0; - unsigned long flags; - dbug_trans(1, "ctl_send: type=%d len=%d\n", type, len); - err = _stp_ctl_write(type, data, len); - if (err > 0) { - /* A message was queued (or directly written), so wake up - _stp_ctl_read_cmd so stapio can pick it up asap. */ + int ret; + dbug_trans(1, "_stp_ctl_send_notify: type=%d len=%d\n", type, len); + ret = _stp_ctl_send(type, data, len); + + /* A message was queued, so wake up all _stp_ctl_wq listeners + so stapio can pick it up asap. */ + if (ret > 0) wake_up_interruptible(&_stp_ctl_wq); - } else { - /* printk instead of _stp_error since an error here means - our message or transport is suspect. */ - printk(KERN_ERR "ctl_send (type=%d len=%d) failed: %d\n", type, len, err); - /* If there are pending messages on the queue, then yell - and scream for someone to pick them off quickly. */ - spin_lock_irqsave(&_stp_ctl_ready_lock, flags); - if (!list_empty(&_stp_ctl_ready_q)) - mesg_on_queue = 1; - spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags); - if (!mesg_on_queue) - printk(KERN_ERR "_stp_ctl_write failed, but no messages on queue\n"); - else - wake_up_interruptible(&_stp_ctl_wq); - } - dbug_trans(1, "returning %d\n", err); - return err; + return ret; } /** Called when someone tries to read from our .cmd file. @@ -442,12 +441,6 @@ static ssize_t _stp_ctl_read_cmd(struct file *file, char __user *buf, spin_lock_irqsave(&_stp_ctl_ready_lock, flags); while (list_empty(&_stp_ctl_ready_q)) { spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags); - - /* Someone is listening, if exit flag is set AND we have finished - with probe_start() we might want them to know. */ - if (unlikely(_stp_exit_flag && _stp_probes_started)) - _stp_request_exit(); - if (file->f_flags & O_NONBLOCK) return -EAGAIN; if (wait_event_interruptible(_stp_ctl_wq, !list_empty(&_stp_ctl_ready_q))) @@ -511,10 +504,10 @@ static unsigned _stp_ctl_poll_cmd(struct file *file, poll_table *wait) poll_wait(file, &_stp_ctl_wq, wait); - /* If there are messages waiting or we want to exit, then - there will (soon) be data available. */ + /* If there are messages waiting, then there will be + data available to read. */ spin_lock_irqsave(&_stp_ctl_ready_lock, flags); - if (! list_empty(&_stp_ctl_ready_q) || _stp_exit_flag) + if (! list_empty(&_stp_ctl_ready_q)) res |= POLLIN | POLLRDNORM; spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags); diff --git a/runtime/transport/control.h b/runtime/transport/control.h index f1dd13cbf..2a009d502 100644 --- a/runtime/transport/control.h +++ b/runtime/transport/control.h @@ -32,7 +32,8 @@ struct _stp_buffer { static struct file_operations _stp_ctl_fops_cmd; -static int _stp_ctl_send(int type, void *data, int len); +static int _stp_ctl_send(int type, void *data, unsigned len); +static int _stp_ctl_send_notify(int type, void *data, unsigned len); static int _stp_ctl_write_fs(int type, void *data, unsigned len); diff --git a/runtime/transport/transport.c b/runtime/transport/transport.c index 5d30a93d6..6f40f05d5 100644 --- a/runtime/transport/transport.c +++ b/runtime/transport/transport.c @@ -15,6 +15,7 @@ #define _TRANSPORT_TRANSPORT_C_ #include "transport.h" +#include "control.h" #include #include #include @@ -33,6 +34,12 @@ static int _stp_probes_started = 0; static int _stp_exit_called = 0; static DEFINE_MUTEX(_stp_transport_mutex); +#ifndef STP_CTL_TIMER_INTERVAL +/* ctl timer interval in jiffies (default 20 ms) */ +#define STP_CTL_TIMER_INTERVAL ((HZ+49)/50) +#endif + + // For now, disable transport version 3 (unless STP_USE_RING_BUFFER is // defined). #if STP_TRANSPORT_VERSION == 3 && !defined(STP_USE_RING_BUFFER) @@ -66,6 +73,8 @@ MODULE_PARM_DESC(_stp_bufsize, "buffer size"); static void probe_exit(void); static int probe_start(void); +struct timer_list _stp_ctl_work_timer; + /* * _stp_handle_start - handle STP_START */ @@ -89,7 +98,10 @@ static void _stp_handle_start(struct _stp_msg_start *st) if (st->res >= 0) _stp_probes_started = 1; - _stp_ctl_send(STP_START, st, sizeof(*st)); + /* Called from the user context in response to a proc + file write (in _stp_ctl_write_cmd), so may notify + the reader directly. */ + _stp_ctl_send_notify(STP_START, st, sizeof(*st)); } mutex_unlock(&_stp_transport_mutex); } @@ -125,8 +137,13 @@ static void _stp_cleanup_and_exit(int send_exit) _stp_transport_data_fs_stop(); dbug_trans(1, "ctl_send STP_EXIT\n"); - if (send_exit) - _stp_ctl_send(STP_EXIT, NULL, 0); + if (send_exit) { + /* send_exit is only set to one if called from + _stp_ctl_write_cmd() in response to a write + to the proc cmd file, so in user context. It + is safe to immediately notify the reader. */ + _stp_ctl_send_notify(STP_EXIT, NULL, 0); + } dbug_trans(1, "done with ctl_send STP_EXIT\n"); } mutex_unlock(&_stp_transport_mutex); @@ -139,7 +156,9 @@ static void _stp_request_exit(void) /* we only want to do this once */ called = 1; dbug_trans(1, "ctl_send STP_REQUEST_EXIT\n"); - _stp_ctl_send(STP_REQUEST_EXIT, NULL, 0); + /* Called from the timer when _stp_exit_flag has been + been set. So safe to immediately notify any readers. */ + _stp_ctl_send_notify(STP_REQUEST_EXIT, NULL, 0); dbug_trans(1, "done with ctl_send STP_REQUEST_EXIT\n"); } } @@ -155,6 +174,7 @@ static void _stp_detach(void) if (!_stp_exit_flag) _stp_transport_data_fs_overwrite(1); + del_timer_sync(&_stp_ctl_work_timer); wake_up_interruptible(&_stp_ctl_wq); } @@ -169,6 +189,41 @@ static void _stp_attach(void) dbug_trans(1, "attach\n"); _stp_pid = current->pid; _stp_transport_data_fs_overwrite(0); + init_timer(&_stp_ctl_work_timer); + _stp_ctl_work_timer.expires = jiffies + STP_CTL_TIMER_INTERVAL; + _stp_ctl_work_timer.function = _stp_ctl_work_callback; + _stp_ctl_work_timer.data= 0; + add_timer(&_stp_ctl_work_timer); +} + +/* + * _stp_ctl_work_callback - periodically check for IO or exit + * This IO comes from control messages like system(), warn(), + * that could potentially have been send from krpobe context, + * so they don't immediately trigger a wake_up of _stp_ctl_wq. + * This is run by a kernel thread and may NOT sleep, but it + * may call wake_up_interruptible on _stp_ctl_wq to notify + * any readers, or send messages itself that are immediately + * notified. Reschedules itself if someone is still attached + * to the cmd channel. + */ +static void _stp_ctl_work_callback(unsigned long val) +{ + int do_io = 0; + unsigned long flags; + + spin_lock_irqsave(&_stp_ctl_ready_lock, flags); + if (!list_empty(&_stp_ctl_ready_q)) + do_io = 1; + spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags); + if (do_io) + wake_up_interruptible(&_stp_ctl_wq); + + /* if exit flag is set AND we have finished with probe_start() */ + if (unlikely(_stp_exit_flag && _stp_probes_started)) + _stp_request_exit(); + if (atomic_read(& _stp_ctl_attached)) + mod_timer (&_stp_ctl_work_timer, jiffies + STP_CTL_TIMER_INTERVAL); } /** @@ -239,8 +294,10 @@ static int _stp_transport_init(void) /* Signal stapio to send us STP_START back. This is an historic convention. This was called STP_TRANSPORT_INFO and had a payload that described the - transport buffering, this is no longer the case. */ - _stp_ctl_send(STP_TRANSPORT, NULL, 0); + transport buffering, this is no longer the case. + Called during module initialization time, so safe to immediately + notify reader we are ready. */ + _stp_ctl_send_notify(STP_TRANSPORT, NULL, 0); dbug_trans(1, "returning 0...\n"); return 0; diff --git a/runtime/transport/transport.h b/runtime/transport/transport.h index c803d459a..1eb1facde 100644 --- a/runtime/transport/transport.h +++ b/runtime/transport/transport.h @@ -11,8 +11,6 @@ /* amount of data a print can send. */ #define STP_BUFFER_SIZE 8192 -static void _stp_request_exit(void); - /* STP_CTL_BUFFER_SIZE is the maximum size of a message */ /* exchanged on the control channel. */ #if STP_TRANSPORT_VERSION == 1 diff --git a/testsuite/systemtap.base/wakeup.exp b/testsuite/systemtap.base/wakeup.exp new file mode 100644 index 000000000..19cc56c5f --- /dev/null +++ b/testsuite/systemtap.base/wakeup.exp @@ -0,0 +1,24 @@ +set test "wakeup" + +if {![installtest_p]} {untested $test; return} + +# There have been issues with kernel probes put into the wake_up path. +# see http://sourceware.org/ml/systemtap/2011-q3/msg00163.html +spawn stap -DMAXERRORS=8 $srcdir/$subdir/$test.stp -c {ls -laR /dev/* /proc/* > /dev/null 2>&1} +set systems 0 +set warns 0 +set errors 0 +set prints 0 +expect { + -timeout 180 + -re {^sleeping\r\n} { incr systems; exp_continue } + -re {^WARNING: wake_up\r\n} { incr warns; exp_continue } + -re {^ERROR: wake_down\r\n} { incr errors; exp_continue } + -re {^count: 25\r\n} { incr prints; exp_continue } + timeout { fail "$test (timeout)" } + eof { } +} +wait + +# WARNINGs get collapsed, so only 1 expected. +if {$systems == 8 && $warns == 1 && $errors == 8 && $prints == 1} { pass "$test" } { fail "$test ($systems,$warns,$errors,$prints)" } diff --git a/testsuite/systemtap.base/wakeup.stp b/testsuite/systemtap.base/wakeup.stp new file mode 100644 index 000000000..253e5de78 --- /dev/null +++ b/testsuite/systemtap.base/wakeup.stp @@ -0,0 +1,13 @@ +global count; +probe kernel.function("__wake_up_common") +{ + count++ + if (count <= 8) + { system("echo sleeping") } + else if (count <= 16) + { warn("wake_up") } + else if (count <= 24) + { error("wake_down") } + else + { printf("count: %d\n", count); exit() } +} -- 2.43.5