]> sourceware.org Git - systemtap.git/blob - runtime/transport/ring_buffer.c
Merge commit 'origin/master' into pr7043
[systemtap.git] / runtime / transport / ring_buffer.c
1 #include <linux/types.h>
2 #include <linux/ring_buffer.h>
3 #include <linux/wait.h>
4 #include <linux/poll.h>
5 #include <linux/cpumask.h>
6
7 struct _stp_data_entry {
8 size_t len;
9 unsigned char buf[];
10 };
11
12 static struct ring_buffer *__stp_ring_buffer = NULL;
13
14 /* _stp_poll_wait is a waitqueue for tasks blocked on
15 * _stp_data_poll_trace() */
16 static DECLARE_WAIT_QUEUE_HEAD(_stp_poll_wait);
17
18 /*
19 * Trace iterator - used by printout routines who present trace
20 * results to users and which routines might sleep, etc:
21 */
22 struct _stp_ring_buffer_data {
23 int cpu;
24 u64 ts;
25 };
26 static struct _stp_ring_buffer_data _stp_rb_data;
27
28 static cpumask_var_t _stp_trace_reader_cpumask;
29
30 static void __stp_free_ring_buffer(void)
31 {
32 free_cpumask_var(_stp_trace_reader_cpumask);
33 if (__stp_ring_buffer)
34 ring_buffer_free(__stp_ring_buffer);
35 __stp_ring_buffer = NULL;
36 }
37
38 static int __stp_alloc_ring_buffer(void)
39 {
40 int i;
41 unsigned long buffer_size = _stp_bufsize;
42
43 if (!alloc_cpumask_var(&_stp_trace_reader_cpumask, GFP_KERNEL))
44 goto fail;
45 cpumask_clear(_stp_trace_reader_cpumask);
46
47 if (buffer_size == 0) {
48 dbug_trans(1, "using default buffer size...\n");
49 buffer_size = _stp_nsubbufs * _stp_subbuf_size;
50 }
51 /* The number passed to ring_buffer_alloc() is per cpu. Our
52 * 'buffer_size' is a total number of bytes to allocate. So,
53 * we need to divide buffer_size by the number of cpus. */
54 buffer_size /= num_online_cpus();
55 dbug_trans(1, "%lu\n", buffer_size);
56 __stp_ring_buffer = ring_buffer_alloc(buffer_size, 0);
57 if (!__stp_ring_buffer)
58 goto fail;
59
60 dbug_trans(1, "size = %lu\n", ring_buffer_size(__stp_ring_buffer));
61 return 0;
62
63 fail:
64 __stp_free_ring_buffer();
65 return -ENOMEM;
66 }
67
68 static int _stp_data_open_trace(struct inode *inode, struct file *file)
69 {
70 long cpu_file = (long) inode->i_private;
71
72 /* We only allow for one reader per cpu */
73 dbug_trans(1, "trace attach\n");
74 #ifdef STP_BULKMODE
75 if (!cpumask_test_cpu(cpu_file, _stp_trace_reader_cpumask))
76 cpumask_set_cpu(cpu_file, _stp_trace_reader_cpumask);
77 else {
78 dbug_trans(1, "returning EBUSY\n");
79 return -EBUSY;
80 }
81 #else
82 if (!cpumask_empty(_stp_trace_reader_cpumask)) {
83 dbug_trans(1, "returning EBUSY\n");
84 return -EBUSY;
85 }
86 cpumask_setall(_stp_trace_reader_cpumask);
87 #endif
88 file->private_data = inode->i_private;
89 return 0;
90 }
91
92 static int _stp_data_release_trace(struct inode *inode, struct file *file)
93 {
94 long cpu_file = (long) inode->i_private;
95 dbug_trans(1, "trace detach\n");
96 #ifdef STP_BULKMODE
97 cpumask_clear_cpu(cpu_file, _stp_trace_reader_cpumask);
98 #else
99 cpumask_clear(_stp_trace_reader_cpumask);
100 #endif
101
102 return 0;
103 }
104
105 size_t
106 _stp_event_to_user(struct ring_buffer_event *event, char __user *ubuf,
107 size_t cnt)
108 {
109 int ret;
110 struct _stp_data_entry *entry;
111
112 dbug_trans(1, "event(%p), ubuf(%p), cnt(%lu)\n", event, ubuf, cnt);
113 if (event == NULL || ubuf == NULL)
114 return -EFAULT;
115
116 entry = (struct _stp_data_entry *)ring_buffer_event_data(event);
117 if (entry == NULL)
118 return -EFAULT;
119
120 /* We don't do partial entries - just fail. */
121 if (entry->len > cnt)
122 return -EBUSY;
123
124 if (cnt > entry->len)
125 cnt = entry->len;
126 ret = copy_to_user(ubuf, entry->buf, cnt);
127 if (ret)
128 return -EFAULT;
129
130 return cnt;
131 }
132
133 static ssize_t tracing_wait_pipe(struct file *filp)
134 {
135 while (ring_buffer_empty(__stp_ring_buffer)) {
136
137 if ((filp->f_flags & O_NONBLOCK)) {
138 dbug_trans(1, "returning -EAGAIN\n");
139 return -EAGAIN;
140 }
141
142 /*
143 * This is a make-shift waitqueue. The reason we don't use
144 * an actual wait queue is because:
145 * 1) we only ever have one waiter
146 * 2) the tracing, traces all functions, we don't want
147 * the overhead of calling wake_up and friends
148 * (and tracing them too)
149 * Anyway, this is really very primitive wakeup.
150 */
151 set_current_state(TASK_INTERRUPTIBLE);
152
153 /* sleep for 100 msecs, and try again. */
154 schedule_timeout(HZ/10);
155
156 if (signal_pending(current)) {
157 dbug_trans(1, "returning -EINTR\n");
158 return -EINTR;
159 }
160 }
161
162 dbug_trans(1, "returning 1\n");
163 return 1;
164 }
165
166 static struct ring_buffer_event *
167 peek_next_event(int cpu, u64 *ts)
168 {
169 return ring_buffer_peek(__stp_ring_buffer, cpu, ts);
170 }
171
172 /* Find the next real event */
173 static struct ring_buffer_event *
174 _stp_find_next_event(long cpu_file)
175 {
176 struct ring_buffer_event *event;
177
178 #ifdef STP_BULKMODE
179 /*
180 * If we are in a per_cpu trace file, don't bother by iterating over
181 * all cpus and peek directly.
182 */
183 if (ring_buffer_empty_cpu(__stp_ring_buffer, (int)cpu_file))
184 return NULL;
185 event = peek_next_event(cpu_file, &_stp_rb_data.ts);
186 _stp_rb_data.cpu = cpu_file;
187
188 return event;
189 #else
190 struct ring_buffer_event *next = NULL;
191 u64 next_ts = 0, ts;
192 int next_cpu = -1;
193 int cpu;
194
195 for_each_possible_cpu(cpu) {
196
197 if (ring_buffer_empty_cpu(__stp_ring_buffer, cpu))
198 continue;
199
200 event = peek_next_event(cpu, &ts);
201
202 /*
203 * Pick the event with the smallest timestamp:
204 */
205 if (event && (!next || ts < next_ts)) {
206 next = event;
207 next_cpu = cpu;
208 next_ts = ts;
209 }
210 }
211
212 _stp_rb_data.cpu = next_cpu;
213 _stp_rb_data.ts = next_ts;
214
215 return next;
216 #endif
217 }
218
219
220 /*
221 * Consumer reader.
222 */
223 static ssize_t
224 _stp_data_read_trace(struct file *filp, char __user *ubuf,
225 size_t cnt, loff_t *ppos)
226 {
227 ssize_t sret;
228 struct ring_buffer_event *event;
229 long cpu_file = (long) filp->private_data;
230
231 dbug_trans(1, "%lu\n", (unsigned long)cnt);
232
233 sret = tracing_wait_pipe(filp);
234 dbug_trans(1, "tracing_wait_pipe returned %ld\n", sret);
235 if (sret <= 0)
236 goto out;
237
238 /* stop when tracing is finished */
239 if (ring_buffer_empty(__stp_ring_buffer)) {
240 sret = 0;
241 goto out;
242 }
243
244 if (cnt >= PAGE_SIZE)
245 cnt = PAGE_SIZE - 1;
246
247 dbug_trans(1, "sret = %lu\n", (unsigned long)sret);
248 sret = 0;
249 while ((event = _stp_find_next_event(cpu_file)) != NULL) {
250 ssize_t len;
251
252 len = _stp_event_to_user(event, ubuf, cnt);
253 if (len <= 0)
254 break;
255
256 ring_buffer_consume(__stp_ring_buffer, _stp_rb_data.cpu,
257 &_stp_rb_data.ts);
258 ubuf += len;
259 cnt -= len;
260 sret += len;
261 if (cnt <= 0)
262 break;
263 }
264 out:
265 return sret;
266 }
267
268
269 static unsigned int
270 _stp_data_poll_trace(struct file *filp, poll_table *poll_table)
271 {
272 dbug_trans(1, "entry\n");
273 if (! ring_buffer_empty(__stp_ring_buffer))
274 return POLLIN | POLLRDNORM;
275 poll_wait(filp, &_stp_poll_wait, poll_table);
276 if (! ring_buffer_empty(__stp_ring_buffer))
277 return POLLIN | POLLRDNORM;
278
279 dbug_trans(1, "exit\n");
280 return 0;
281 }
282
283 static struct file_operations __stp_data_fops = {
284 .owner = THIS_MODULE,
285 .open = _stp_data_open_trace,
286 .release = _stp_data_release_trace,
287 .poll = _stp_data_poll_trace,
288 .read = _stp_data_read_trace,
289 #if 0
290 .splice_read = tracing_splice_read_pipe,
291 #endif
292 };
293
294 /*
295 * Here's how __STP_MAX_RESERVE_SIZE is figured. The value of
296 * BUF_PAGE_SIZE was gotten from the kernel's ring_buffer code. It
297 * is divided by 4, so we waste a maximum of 1/4 of the buffer (in
298 * the case of a small reservation).
299 */
300 #define __STP_MAX_RESERVE_SIZE ((/*BUF_PAGE_SIZE*/ 4080 / 4) \
301 - sizeof(struct _stp_data_entry) \
302 - sizeof(struct ring_buffer_event))
303
304 /*
305 * This function prepares the cpu buffer to write a sample.
306 *
307 * Struct op_entry is used during operations on the ring buffer while
308 * struct op_sample contains the data that is stored in the ring
309 * buffer. Struct entry can be uninitialized. The function reserves a
310 * data array that is specified by size. Use
311 * op_cpu_buffer_write_commit() after preparing the sample. In case of
312 * errors a null pointer is returned, otherwise the pointer to the
313 * sample.
314 *
315 */
316 static size_t
317 _stp_data_write_reserve(size_t size_request, void **entry)
318 {
319 struct ring_buffer_event *event;
320 struct _stp_data_entry *sde;
321
322 if (entry == NULL)
323 return -EINVAL;
324
325 if (size_request > __STP_MAX_RESERVE_SIZE) {
326 size_request = __STP_MAX_RESERVE_SIZE;
327 }
328
329 event = ring_buffer_lock_reserve(__stp_ring_buffer,
330 sizeof(struct _stp_data_entry) + size_request,
331 0);
332 if (unlikely(! event)) {
333 dbug_trans(1, "event = NULL (%p)?\n", event);
334 entry = NULL;
335 return 0;
336 }
337
338 sde = (struct _stp_data_entry *)ring_buffer_event_data(event);
339 sde->len = size_request;
340
341 *entry = event;
342 return size_request;
343 }
344
345 static unsigned char *_stp_data_entry_data(void *entry)
346 {
347 struct ring_buffer_event *event = entry;
348 struct _stp_data_entry *sde;
349
350 if (event == NULL)
351 return NULL;
352
353 sde = (struct _stp_data_entry *)ring_buffer_event_data(event);
354 return sde->buf;
355 }
356
357 static int _stp_data_write_commit(void *entry)
358 {
359 int ret;
360 struct ring_buffer_event *event = (struct ring_buffer_event *)entry;
361
362 if (unlikely(! entry)) {
363 dbug_trans(1, "entry = NULL, returning -EINVAL\n");
364 return -EINVAL;
365 }
366
367 ret = ring_buffer_unlock_commit(__stp_ring_buffer, event, 0);
368 dbug_trans(1, "after commit, empty returns %d\n",
369 ring_buffer_empty(__stp_ring_buffer));
370
371 wake_up_interruptible(&_stp_poll_wait);
372 return ret;
373 }
374
375
376 static struct dentry *__stp_entry[NR_CPUS] = { NULL };
377
378 static int _stp_transport_data_fs_init(void)
379 {
380 int rc;
381 long cpu;
382
383 // allocate buffer
384 dbug_trans(1, "entry...\n");
385 rc = __stp_alloc_ring_buffer();
386 if (rc != 0)
387 return rc;
388
389 // create file(s)
390 for_each_online_cpu(cpu) {
391 char cpu_file[9]; /* 5(trace) + 3(XXX) + 1(\0) = 9 */
392
393 if (cpu > 999 || cpu < 0) {
394 _stp_transport_data_fs_close();
395 return -EINVAL;
396 }
397 sprintf(cpu_file, "trace%ld", cpu);
398 __stp_entry[cpu] = debugfs_create_file(cpu_file, 0600,
399 _stp_get_module_dir(),
400 (void *)cpu,
401 &__stp_data_fops);
402
403 if (!__stp_entry[cpu]) {
404 pr_warning("Could not create debugfs 'trace' entry\n");
405 __stp_free_ring_buffer();
406 return -ENOENT;
407 }
408 else if (IS_ERR(__stp_entry[cpu])) {
409 rc = PTR_ERR(__stp_entry[cpu]);
410 pr_warning("Could not create debugfs 'trace' entry\n");
411 __stp_free_ring_buffer();
412 return rc;
413 }
414
415 __stp_entry[cpu]->d_inode->i_uid = _stp_uid;
416 __stp_entry[cpu]->d_inode->i_gid = _stp_gid;
417
418 #ifndef STP_BULKMODE
419 if (cpu != 0)
420 break;
421 #endif
422 }
423
424 dbug_trans(1, "returning 0...\n");
425 return 0;
426 }
427
428 static void _stp_transport_data_fs_start(void)
429 {
430 /* Do nothing. */
431 }
432
433 static void _stp_transport_data_fs_stop(void)
434 {
435 /* Do nothing. */
436 }
437
438 static void _stp_transport_data_fs_close(void)
439 {
440 int cpu;
441
442 for_each_possible_cpu(cpu) {
443 if (__stp_entry[cpu])
444 debugfs_remove(__stp_entry[cpu]);
445 __stp_entry[cpu] = NULL;
446 }
447
448 __stp_free_ring_buffer();
449 }
450
This page took 0.05816 seconds and 6 git commands to generate.