]> sourceware.org Git - systemtap.git/commitdiff
Reintroduce timer for transport cmd channel, don't wake_up unconditionally.
authorMark Wielaard <mjw@redhat.com>
Tue, 16 Aug 2011 12:31:29 +0000 (14:31 +0200)
committerMark Wielaard <mjw@redhat.com>
Tue, 16 Aug 2011 13:04:28 +0000 (15:04 +0200)
Revert parts of commit a85c8a "runtime/io.c: Explicitly signal setting of
_stp_exit_flag" and commit 46ac9e "Remove _stp_ctl_work_timer from module
transport layer". Introduce a new test wake_up.exp that shows a deadlock
when sending cmd messages and waking up the reader immediately.

Renamed _stp_ctl_write to _stp_ctl_send, which can be called from
everywhere. Rename _stp_ctl_send to _stp_ctl_send_notify that can be
called from user context in the transport layer itself (this will
immediately notify any readers). Document all places that use
_stp_ctl_send_notify directly to clarify why that is safe.

See http://sourceware.org/ml/systemtap/2011-q3/msg00163.html

runtime/io.c
runtime/print_flush.c
runtime/transport/control.c
runtime/transport/control.h
runtime/transport/transport.c
runtime/transport/transport.h
testsuite/systemtap.base/wakeup.exp [new file with mode: 0644]
testsuite/systemtap.base/wakeup.stp [new file with mode: 0644]

index 94e828d90f7d96e6f547f38ae3c7c51a68b8fff9..64b521bbcbc36d03d3117f56009867d4f0c30c78 100644 (file)
@@ -99,8 +99,10 @@ static void _stp_warn (const char *fmt, ...)
  */
 static void _stp_exit (void)
 {
+       /* Just set the flag since this is possibly called from
+          kprobe context. A timer will come along and call
+          _stp_request_exit() for us.  */
        _stp_exit_flag = 1;
-       wake_up_interruptible(&_stp_ctl_wq);
 }
 
 /** Prints error message and exits.
index a66a305e7d3e6f3170afeded2b96d5f8e2223602..f067609fa421b63f19b3169e389d4d16e78dd0ff 100644 (file)
@@ -99,10 +99,12 @@ void EXPORT_FN(stp_print_flush)(_stp_pbuf *pb)
 #else  /* !STP_BULKMODE */
 
 #if STP_TRANSPORT_VERSION == 1
-       /** STP_TRANSPORT_VERSION == 1 is special, _stp_ctl_send/write will
+       /** STP_TRANSPORT_VERSION == 1 is special, _stp_ctl_send will
            pass through procfs _stp_ctl_write_fs which recognizes
-           STP_REALTIME_DATA as data that needs to be send right away
-           over the .cmd channel instead of being queued.  */
+           STP_REALTIME_DATA as data that can be concatenated if the
+           previous buffer is also of type STP_REALTIME_DATA and there
+           is some room left in that packet instead of creating a new
+           packet to be queued.  */
        if (unlikely(_stp_ctl_send(STP_REALTIME_DATA, pb->buf, len) <= 0))
                atomic_inc (&_stp_transport_failures);
 
index cab2ab5097d417ae97dd961b8af2ca2bd5f6d40d..28a70ab1eab144af379c5d0ca4c06957f739a7e0 100644 (file)
@@ -355,13 +355,12 @@ static void _stp_ctl_free_buffer(struct _stp_buffer *bptr)
        }
 }
 
-/* Send a message directly (only old_relay) or puts it on the
-   _stp_ctl_ready_q.  Doesn't call wake_up on _stp_ctl_wq (use
-   _stp_ctl_send if you need that behavior).  Returns zero if len
-   was zero, or len > STP_CTL_BUFFER_SIZE.  Returns the the length
-   if the send or stored message on success. Returns a negative
-   error code on failure.  */
-static int _stp_ctl_write(int type, void *data, unsigned len)
+/* Put a message on the _stp_ctl_ready_q.  Safe to call from a probe context.
+   Doesn't call wake_up on _stp_ctl_wq (which would not be safe from all
+   probe context). A timer will come by and pick up the message to notify
+   any readers. Returns the number of bytes queued/send or zero/negative
+   on error. */
+static int _stp_ctl_send(int type, void *data, unsigned len)
 {
        struct _stp_buffer *bptr;
        unsigned long flags;
@@ -371,61 +370,61 @@ static int _stp_ctl_write(int type, void *data, unsigned len)
        _stp_ctl_write_dbug(type, data, len);
 #endif
 
+       /* Give the fs a chance to do something special.
+          Like merging two packets in case the previous buffer
+          still has some room (transport version 1 procfs does  this. */
        hlen = _stp_ctl_write_fs(type, data, len);
        if (hlen > 0)
                return hlen;
 
        /* make sure we won't overflow the buffer */
-       if (unlikely(len > STP_CTL_BUFFER_SIZE))
+       if (unlikely(len > STP_CTL_BUFFER_SIZE)) {
+               /* We could try to send an error message instead? */
+                printk(KERN_ERR "ctl_write_msg type=%d len=%d too large\n",
+                      type, len);
                return 0;
+       }
 
        /* get a buffer from the free pool */
        bptr = _stp_ctl_get_buffer(type, data, len);
-       if (unlikely(bptr == NULL))
+       if (unlikely(bptr == NULL)) {
+               /* Nothing else we can do... */
+                printk(KERN_ERR "ctl_write_msg type=%d len=%d ENOMEM\n",
+                      type, len);
                return -ENOMEM;
+       }
 
-       /* put it on the pool of ready buffers */
+       /* put it on the pool of ready buffers. Even though we are
+          holding a lock, calling list_add_tail() is safe here since
+          it will be totally inlined from the list.h header file so
+          we cannot recursively hit a kprobe inside the lock. */
        spin_lock_irqsave(&_stp_ctl_ready_lock, flags);
        list_add_tail(&bptr->list, &_stp_ctl_ready_q);
        spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags);
 
+       /* It would be nice if we could speed up the notification
+          timer at this point, but calling mod_timer() at this
+          point would bring in more locking issues... */
        return len + sizeof(bptr->type);
 }
 
-/* send commands with timeout and retry (can still fail though).
-   Will call wake_up on _stp_ctl_wq if new data is available for
-   _stp_ctl_read_cmd, so stapio can immediately read it if wanted.
-   Returns zero if the message had zero length or was too big.
-   Returns the length of the final message if sucessfully send or queued.
-   Returns a negative error number on failure.  */
-static int _stp_ctl_send(int type, void *data, int len)
+/* Calls _stp_ctl_send and then calls wake_up on _stp_ctl_wq
+   to immediately notify listeners. DO NOT CALL THIS FROM A (KERNEL)
+   PROBE CONTEXT. This is only safe to call from the transport layer
+   itself when in user context. All code that could be triggered from
+   a probe context should call _stp_ctl_send(). */
+static int _stp_ctl_send_notify(int type, void *data, unsigned len)
 {
-       int err, mesg_on_queue = 0;
-       unsigned long flags;
-       dbug_trans(1, "ctl_send: type=%d len=%d\n", type, len);
-       err = _stp_ctl_write(type, data, len);
-       if (err > 0) {
-               /* A message was queued (or directly written), so wake up
-                  _stp_ctl_read_cmd so stapio can pick it up asap.  */
+       int ret;
+       dbug_trans(1, "_stp_ctl_send_notify: type=%d len=%d\n", type, len);
+       ret = _stp_ctl_send(type, data, len);
+
+       /* A message was queued, so wake up all _stp_ctl_wq listeners
+          so stapio can pick it up asap.  */
+       if (ret > 0)
                wake_up_interruptible(&_stp_ctl_wq);
-        } else {
-                /* printk instead of _stp_error since an error here means
-                  our message or transport is suspect.  */
-                printk(KERN_ERR "ctl_send (type=%d len=%d) failed: %d\n", type, len, err);
 
-               /* If there are pending messages on the queue, then yell
-                  and scream for someone to pick them off quickly.  */
-               spin_lock_irqsave(&_stp_ctl_ready_lock, flags);
-               if (!list_empty(&_stp_ctl_ready_q))
-                       mesg_on_queue = 1;
-               spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags);
-               if (!mesg_on_queue)
-                       printk(KERN_ERR "_stp_ctl_write failed, but no messages on queue\n");
-               else
-                       wake_up_interruptible(&_stp_ctl_wq);
-       }
-       dbug_trans(1, "returning %d\n", err);
-       return err;
+       return ret;
 }
 
 /** Called when someone tries to read from our .cmd file.
@@ -442,12 +441,6 @@ static ssize_t _stp_ctl_read_cmd(struct file *file, char __user *buf,
        spin_lock_irqsave(&_stp_ctl_ready_lock, flags);
        while (list_empty(&_stp_ctl_ready_q)) {
                spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags);
-
-       /* Someone is listening, if exit flag is set AND we have finished
-          with probe_start() we might want them to know.  */
-       if (unlikely(_stp_exit_flag && _stp_probes_started))
-               _stp_request_exit();
-
                if (file->f_flags & O_NONBLOCK)
                        return -EAGAIN;
                if (wait_event_interruptible(_stp_ctl_wq, !list_empty(&_stp_ctl_ready_q)))
@@ -511,10 +504,10 @@ static unsigned _stp_ctl_poll_cmd(struct file *file, poll_table *wait)
 
         poll_wait(file, &_stp_ctl_wq, wait);
 
-        /* If there are messages waiting or we want to exit, then
-           there will (soon) be data available. */
+        /* If there are messages waiting, then there will be
+          data available to read. */
        spin_lock_irqsave(&_stp_ctl_ready_lock, flags);
-       if (! list_empty(&_stp_ctl_ready_q) || _stp_exit_flag)
+       if (! list_empty(&_stp_ctl_ready_q))
                res |= POLLIN | POLLRDNORM;
        spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags);
 
index f1dd13cbf5672d31ab973ad0871df82226f45a94..2a009d502dac85c269c7312f802ab4495ddd8cca 100644 (file)
@@ -32,7 +32,8 @@ struct _stp_buffer {
 
 static struct file_operations _stp_ctl_fops_cmd;
 
-static int _stp_ctl_send(int type, void *data, int len);
+static int _stp_ctl_send(int type, void *data, unsigned len);
+static int _stp_ctl_send_notify(int type, void *data, unsigned len);
 
 static int _stp_ctl_write_fs(int type, void *data, unsigned len);
 
index 5d30a93d67f99779da2af41a99caffb284cd1a21..6f40f05d5219bc763098a9d22c58c1d6d5c43c2b 100644 (file)
@@ -15,6 +15,7 @@
 #define _TRANSPORT_TRANSPORT_C_
 
 #include "transport.h"
+#include "control.h"
 #include <linux/debugfs.h>
 #include <linux/namei.h>
 #include <linux/delay.h>
@@ -33,6 +34,12 @@ static int _stp_probes_started = 0;
 static int _stp_exit_called = 0;
 static DEFINE_MUTEX(_stp_transport_mutex);
 
+#ifndef STP_CTL_TIMER_INTERVAL
+/* ctl timer interval in jiffies (default 20 ms) */
+#define STP_CTL_TIMER_INTERVAL         ((HZ+49)/50)
+#endif
+
+
 // For now, disable transport version 3 (unless STP_USE_RING_BUFFER is
 // defined).
 #if STP_TRANSPORT_VERSION == 3 && !defined(STP_USE_RING_BUFFER)
@@ -66,6 +73,8 @@ MODULE_PARM_DESC(_stp_bufsize, "buffer size");
 static void probe_exit(void);
 static int probe_start(void);
 
+struct timer_list _stp_ctl_work_timer;
+
 /*
  *     _stp_handle_start - handle STP_START
  */
@@ -89,7 +98,10 @@ static void _stp_handle_start(struct _stp_msg_start *st)
                if (st->res >= 0)
                        _stp_probes_started = 1;
 
-               _stp_ctl_send(STP_START, st, sizeof(*st));
+               /* Called from the user context in response to a proc
+                  file write (in _stp_ctl_write_cmd), so may notify
+                  the reader directly. */
+               _stp_ctl_send_notify(STP_START, st, sizeof(*st));
        }
        mutex_unlock(&_stp_transport_mutex);
 }
@@ -125,8 +137,13 @@ static void _stp_cleanup_and_exit(int send_exit)
                _stp_transport_data_fs_stop();
 
                dbug_trans(1, "ctl_send STP_EXIT\n");
-               if (send_exit)
-                       _stp_ctl_send(STP_EXIT, NULL, 0);
+               if (send_exit) {
+                       /* send_exit is only set to one if called from
+                          _stp_ctl_write_cmd() in response to a write
+                          to the proc cmd file, so in user context. It
+                          is safe to immediately notify the reader.  */
+                       _stp_ctl_send_notify(STP_EXIT, NULL, 0);
+               }
                dbug_trans(1, "done with ctl_send STP_EXIT\n");
        }
        mutex_unlock(&_stp_transport_mutex);
@@ -139,7 +156,9 @@ static void _stp_request_exit(void)
                /* we only want to do this once */
                called = 1;
                dbug_trans(1, "ctl_send STP_REQUEST_EXIT\n");
-               _stp_ctl_send(STP_REQUEST_EXIT, NULL, 0);
+               /* Called from the timer when _stp_exit_flag has been
+                  been set. So safe to immediately notify any readers. */
+               _stp_ctl_send_notify(STP_REQUEST_EXIT, NULL, 0);
                dbug_trans(1, "done with ctl_send STP_REQUEST_EXIT\n");
        }
 }
@@ -155,6 +174,7 @@ static void _stp_detach(void)
        if (!_stp_exit_flag)
                _stp_transport_data_fs_overwrite(1);
 
+        del_timer_sync(&_stp_ctl_work_timer);
        wake_up_interruptible(&_stp_ctl_wq);
 }
 
@@ -169,6 +189,41 @@ static void _stp_attach(void)
        dbug_trans(1, "attach\n");
        _stp_pid = current->pid;
        _stp_transport_data_fs_overwrite(0);
+       init_timer(&_stp_ctl_work_timer);
+       _stp_ctl_work_timer.expires = jiffies + STP_CTL_TIMER_INTERVAL;
+       _stp_ctl_work_timer.function = _stp_ctl_work_callback;
+       _stp_ctl_work_timer.data= 0;
+       add_timer(&_stp_ctl_work_timer);
+}
+
+/*
+ *     _stp_ctl_work_callback - periodically check for IO or exit
+ *     This IO comes from control messages like system(), warn(),
+ *     that could potentially have been send from krpobe context,
+ *     so they don't immediately trigger a wake_up of _stp_ctl_wq.
+ *     This is run by a kernel thread and may NOT sleep, but it
+ *     may call wake_up_interruptible on _stp_ctl_wq to notify
+ *     any readers, or send messages itself that are immediately
+ *     notified. Reschedules itself if someone is still attached
+ *     to the cmd channel.
+ */
+static void _stp_ctl_work_callback(unsigned long val)
+{
+       int do_io = 0;
+       unsigned long flags;
+
+       spin_lock_irqsave(&_stp_ctl_ready_lock, flags);
+       if (!list_empty(&_stp_ctl_ready_q))
+               do_io = 1;
+       spin_unlock_irqrestore(&_stp_ctl_ready_lock, flags);
+       if (do_io)
+               wake_up_interruptible(&_stp_ctl_wq);
+
+       /* if exit flag is set AND we have finished with probe_start() */
+       if (unlikely(_stp_exit_flag && _stp_probes_started))
+               _stp_request_exit();
+       if (atomic_read(& _stp_ctl_attached))
+                mod_timer (&_stp_ctl_work_timer, jiffies + STP_CTL_TIMER_INTERVAL);
 }
 
 /**
@@ -239,8 +294,10 @@ static int _stp_transport_init(void)
         /* Signal stapio to send us STP_START back.
            This is an historic convention. This was called
           STP_TRANSPORT_INFO and had a payload that described the
-          transport buffering, this is no longer the case.  */
-       _stp_ctl_send(STP_TRANSPORT, NULL, 0);
+          transport buffering, this is no longer the case.
+          Called during module initialization time, so safe to immediately
+          notify reader we are ready.  */
+       _stp_ctl_send_notify(STP_TRANSPORT, NULL, 0);
 
        dbug_trans(1, "returning 0...\n");
        return 0;
index c803d459afe2d640166fbe95f2a97561b2f3e32e..1eb1facde12e52aeb5ff9904c5ca922cd1bbe587 100644 (file)
@@ -11,8 +11,6 @@
 /* amount of data a print can send. */
 #define STP_BUFFER_SIZE 8192
 
-static void _stp_request_exit(void);
-
 /* STP_CTL_BUFFER_SIZE is the maximum size of a message */
 /* exchanged on the control channel. */
 #if STP_TRANSPORT_VERSION == 1
diff --git a/testsuite/systemtap.base/wakeup.exp b/testsuite/systemtap.base/wakeup.exp
new file mode 100644 (file)
index 0000000..19cc56c
--- /dev/null
@@ -0,0 +1,24 @@
+set test "wakeup"
+
+if {![installtest_p]} {untested $test; return}
+
+# There have been issues with kernel probes put into the wake_up path.
+# see http://sourceware.org/ml/systemtap/2011-q3/msg00163.html
+spawn stap -DMAXERRORS=8 $srcdir/$subdir/$test.stp -c {ls -laR /dev/* /proc/* > /dev/null 2>&1}
+set systems 0
+set warns 0
+set errors 0
+set prints 0
+expect {
+    -timeout 180
+    -re {^sleeping\r\n} { incr systems; exp_continue }
+    -re {^WARNING: wake_up\r\n} { incr warns; exp_continue }
+    -re {^ERROR: wake_down\r\n} { incr errors; exp_continue }
+    -re {^count: 25\r\n} { incr prints; exp_continue }
+    timeout { fail "$test (timeout)" }
+    eof { }
+}
+wait
+
+# WARNINGs get collapsed, so only 1 expected.
+if {$systems == 8 && $warns == 1 && $errors == 8 && $prints == 1} { pass "$test" } { fail "$test ($systems,$warns,$errors,$prints)" }
diff --git a/testsuite/systemtap.base/wakeup.stp b/testsuite/systemtap.base/wakeup.stp
new file mode 100644 (file)
index 0000000..253e5de
--- /dev/null
@@ -0,0 +1,13 @@
+global count;
+probe kernel.function("__wake_up_common")
+{
+  count++
+  if (count <= 8)
+    { system("echo sleeping") }
+  else if (count <= 16)
+    { warn("wake_up") }
+  else if (count <= 24)
+    { error("wake_down") }
+  else
+    { printf("count: %d\n", count); exit() }
+}
This page took 0.042639 seconds and 5 git commands to generate.