]> bbs.cooldavid.org Git - net-next-2.6.git/blame - kernel/trace/ring_buffer.c
ftrace: restructure tracing start/stop infrastructure
[net-next-2.6.git] / kernel / trace / ring_buffer.c
CommitLineData
7a8e76a3
SR
1/*
2 * Generic ring buffer
3 *
4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5 */
6#include <linux/ring_buffer.h>
7#include <linux/spinlock.h>
8#include <linux/debugfs.h>
9#include <linux/uaccess.h>
10#include <linux/module.h>
11#include <linux/percpu.h>
12#include <linux/mutex.h>
13#include <linux/sched.h> /* used for sched_clock() (for now) */
14#include <linux/init.h>
15#include <linux/hash.h>
16#include <linux/list.h>
17#include <linux/fs.h>
18
182e9f5f
SR
19#include "trace.h"
20
7a8e76a3
SR
21/* Up this if you want to test the TIME_EXTENTS and normalization */
22#define DEBUG_SHIFT 0
23
24/* FIXME!!! */
25u64 ring_buffer_time_stamp(int cpu)
26{
27 /* shift to debug/test normalization and TIME_EXTENTS */
28 return sched_clock() << DEBUG_SHIFT;
29}
30
31void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
32{
33 /* Just stupid testing the normalize function and deltas */
34 *ts >>= DEBUG_SHIFT;
35}
36
37#define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
38#define RB_ALIGNMENT_SHIFT 2
39#define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
40#define RB_MAX_SMALL_DATA 28
41
42enum {
43 RB_LEN_TIME_EXTEND = 8,
44 RB_LEN_TIME_STAMP = 16,
45};
46
47/* inline for ring buffer fast paths */
48static inline unsigned
49rb_event_length(struct ring_buffer_event *event)
50{
51 unsigned length;
52
53 switch (event->type) {
54 case RINGBUF_TYPE_PADDING:
55 /* undefined */
56 return -1;
57
58 case RINGBUF_TYPE_TIME_EXTEND:
59 return RB_LEN_TIME_EXTEND;
60
61 case RINGBUF_TYPE_TIME_STAMP:
62 return RB_LEN_TIME_STAMP;
63
64 case RINGBUF_TYPE_DATA:
65 if (event->len)
66 length = event->len << RB_ALIGNMENT_SHIFT;
67 else
68 length = event->array[0];
69 return length + RB_EVNT_HDR_SIZE;
70 default:
71 BUG();
72 }
73 /* not hit */
74 return 0;
75}
76
77/**
78 * ring_buffer_event_length - return the length of the event
79 * @event: the event to get the length of
80 */
81unsigned ring_buffer_event_length(struct ring_buffer_event *event)
82{
83 return rb_event_length(event);
84}
85
86/* inline for ring buffer fast paths */
87static inline void *
88rb_event_data(struct ring_buffer_event *event)
89{
90 BUG_ON(event->type != RINGBUF_TYPE_DATA);
91 /* If length is in len field, then array[0] has the data */
92 if (event->len)
93 return (void *)&event->array[0];
94 /* Otherwise length is in array[0] and array[1] has the data */
95 return (void *)&event->array[1];
96}
97
98/**
99 * ring_buffer_event_data - return the data of the event
100 * @event: the event to get the data from
101 */
102void *ring_buffer_event_data(struct ring_buffer_event *event)
103{
104 return rb_event_data(event);
105}
106
107#define for_each_buffer_cpu(buffer, cpu) \
108 for_each_cpu_mask(cpu, buffer->cpumask)
109
110#define TS_SHIFT 27
111#define TS_MASK ((1ULL << TS_SHIFT) - 1)
112#define TS_DELTA_TEST (~TS_MASK)
113
114/*
115 * This hack stolen from mm/slob.c.
116 * We can store per page timing information in the page frame of the page.
117 * Thanks to Peter Zijlstra for suggesting this idea.
118 */
119struct buffer_page {
e4c2ce82 120 u64 time_stamp; /* page time stamp */
bf41a158
SR
121 local_t write; /* index for next write */
122 local_t commit; /* write commited index */
6f807acd 123 unsigned read; /* index for next read */
e4c2ce82
SR
124 struct list_head list; /* list of free pages */
125 void *page; /* Actual data page */
7a8e76a3
SR
126};
127
ed56829c
SR
128/*
129 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
130 * this issue out.
131 */
132static inline void free_buffer_page(struct buffer_page *bpage)
133{
e4c2ce82 134 if (bpage->page)
6ae2a076 135 free_page((unsigned long)bpage->page);
e4c2ce82 136 kfree(bpage);
ed56829c
SR
137}
138
7a8e76a3
SR
139/*
140 * We need to fit the time_stamp delta into 27 bits.
141 */
142static inline int test_time_stamp(u64 delta)
143{
144 if (delta & TS_DELTA_TEST)
145 return 1;
146 return 0;
147}
148
149#define BUF_PAGE_SIZE PAGE_SIZE
150
151/*
152 * head_page == tail_page && head == tail then buffer is empty.
153 */
154struct ring_buffer_per_cpu {
155 int cpu;
156 struct ring_buffer *buffer;
157 spinlock_t lock;
158 struct lock_class_key lock_key;
159 struct list_head pages;
6f807acd
SR
160 struct buffer_page *head_page; /* read from head */
161 struct buffer_page *tail_page; /* write to tail */
bf41a158 162 struct buffer_page *commit_page; /* commited pages */
d769041f 163 struct buffer_page *reader_page;
7a8e76a3
SR
164 unsigned long overrun;
165 unsigned long entries;
166 u64 write_stamp;
167 u64 read_stamp;
168 atomic_t record_disabled;
169};
170
171struct ring_buffer {
172 unsigned long size;
173 unsigned pages;
174 unsigned flags;
175 int cpus;
176 cpumask_t cpumask;
177 atomic_t record_disabled;
178
179 struct mutex mutex;
180
181 struct ring_buffer_per_cpu **buffers;
182};
183
184struct ring_buffer_iter {
185 struct ring_buffer_per_cpu *cpu_buffer;
186 unsigned long head;
187 struct buffer_page *head_page;
188 u64 read_stamp;
189};
190
bf41a158
SR
191#define RB_WARN_ON(buffer, cond) \
192 do { \
193 if (unlikely(cond)) { \
194 atomic_inc(&buffer->record_disabled); \
195 WARN_ON(1); \
196 } \
197 } while (0)
198
199#define RB_WARN_ON_RET(buffer, cond) \
200 do { \
201 if (unlikely(cond)) { \
202 atomic_inc(&buffer->record_disabled); \
203 WARN_ON(1); \
204 return -1; \
205 } \
206 } while (0)
207
208#define RB_WARN_ON_ONCE(buffer, cond) \
209 do { \
210 static int once; \
211 if (unlikely(cond) && !once) { \
212 once++; \
213 atomic_inc(&buffer->record_disabled); \
214 WARN_ON(1); \
215 } \
216 } while (0)
7a8e76a3
SR
217
218/**
219 * check_pages - integrity check of buffer pages
220 * @cpu_buffer: CPU buffer with pages to test
221 *
222 * As a safty measure we check to make sure the data pages have not
223 * been corrupted.
224 */
225static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
226{
227 struct list_head *head = &cpu_buffer->pages;
228 struct buffer_page *page, *tmp;
229
bf41a158
SR
230 RB_WARN_ON_RET(cpu_buffer, head->next->prev != head);
231 RB_WARN_ON_RET(cpu_buffer, head->prev->next != head);
7a8e76a3
SR
232
233 list_for_each_entry_safe(page, tmp, head, list) {
bf41a158
SR
234 RB_WARN_ON_RET(cpu_buffer,
235 page->list.next->prev != &page->list);
236 RB_WARN_ON_RET(cpu_buffer,
237 page->list.prev->next != &page->list);
7a8e76a3
SR
238 }
239
240 return 0;
241}
242
7a8e76a3
SR
243static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
244 unsigned nr_pages)
245{
246 struct list_head *head = &cpu_buffer->pages;
247 struct buffer_page *page, *tmp;
248 unsigned long addr;
249 LIST_HEAD(pages);
250 unsigned i;
251
252 for (i = 0; i < nr_pages; i++) {
e4c2ce82 253 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
aa1e0e3b 254 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
e4c2ce82
SR
255 if (!page)
256 goto free_pages;
257 list_add(&page->list, &pages);
258
7a8e76a3
SR
259 addr = __get_free_page(GFP_KERNEL);
260 if (!addr)
261 goto free_pages;
e4c2ce82 262 page->page = (void *)addr;
7a8e76a3
SR
263 }
264
265 list_splice(&pages, head);
266
267 rb_check_pages(cpu_buffer);
268
269 return 0;
270
271 free_pages:
272 list_for_each_entry_safe(page, tmp, &pages, list) {
273 list_del_init(&page->list);
ed56829c 274 free_buffer_page(page);
7a8e76a3
SR
275 }
276 return -ENOMEM;
277}
278
279static struct ring_buffer_per_cpu *
280rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
281{
282 struct ring_buffer_per_cpu *cpu_buffer;
e4c2ce82 283 struct buffer_page *page;
d769041f 284 unsigned long addr;
7a8e76a3
SR
285 int ret;
286
287 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
288 GFP_KERNEL, cpu_to_node(cpu));
289 if (!cpu_buffer)
290 return NULL;
291
292 cpu_buffer->cpu = cpu;
293 cpu_buffer->buffer = buffer;
294 spin_lock_init(&cpu_buffer->lock);
295 INIT_LIST_HEAD(&cpu_buffer->pages);
296
e4c2ce82
SR
297 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
298 GFP_KERNEL, cpu_to_node(cpu));
299 if (!page)
300 goto fail_free_buffer;
301
302 cpu_buffer->reader_page = page;
d769041f
SR
303 addr = __get_free_page(GFP_KERNEL);
304 if (!addr)
e4c2ce82
SR
305 goto fail_free_reader;
306 page->page = (void *)addr;
307
d769041f 308 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
d769041f 309
7a8e76a3
SR
310 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
311 if (ret < 0)
d769041f 312 goto fail_free_reader;
7a8e76a3
SR
313
314 cpu_buffer->head_page
315 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
bf41a158 316 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
7a8e76a3
SR
317
318 return cpu_buffer;
319
d769041f
SR
320 fail_free_reader:
321 free_buffer_page(cpu_buffer->reader_page);
322
7a8e76a3
SR
323 fail_free_buffer:
324 kfree(cpu_buffer);
325 return NULL;
326}
327
328static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
329{
330 struct list_head *head = &cpu_buffer->pages;
331 struct buffer_page *page, *tmp;
332
d769041f
SR
333 list_del_init(&cpu_buffer->reader_page->list);
334 free_buffer_page(cpu_buffer->reader_page);
335
7a8e76a3
SR
336 list_for_each_entry_safe(page, tmp, head, list) {
337 list_del_init(&page->list);
ed56829c 338 free_buffer_page(page);
7a8e76a3
SR
339 }
340 kfree(cpu_buffer);
341}
342
a7b13743
SR
343/*
344 * Causes compile errors if the struct buffer_page gets bigger
345 * than the struct page.
346 */
347extern int ring_buffer_page_too_big(void);
348
7a8e76a3
SR
349/**
350 * ring_buffer_alloc - allocate a new ring_buffer
351 * @size: the size in bytes that is needed.
352 * @flags: attributes to set for the ring buffer.
353 *
354 * Currently the only flag that is available is the RB_FL_OVERWRITE
355 * flag. This flag means that the buffer will overwrite old data
356 * when the buffer wraps. If this flag is not set, the buffer will
357 * drop data when the tail hits the head.
358 */
359struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
360{
361 struct ring_buffer *buffer;
362 int bsize;
363 int cpu;
364
a7b13743
SR
365 /* Paranoid! Optimizes out when all is well */
366 if (sizeof(struct buffer_page) > sizeof(struct page))
367 ring_buffer_page_too_big();
368
369
7a8e76a3
SR
370 /* keep it in its own cache line */
371 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
372 GFP_KERNEL);
373 if (!buffer)
374 return NULL;
375
376 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
377 buffer->flags = flags;
378
379 /* need at least two pages */
380 if (buffer->pages == 1)
381 buffer->pages++;
382
383 buffer->cpumask = cpu_possible_map;
384 buffer->cpus = nr_cpu_ids;
385
386 bsize = sizeof(void *) * nr_cpu_ids;
387 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
388 GFP_KERNEL);
389 if (!buffer->buffers)
390 goto fail_free_buffer;
391
392 for_each_buffer_cpu(buffer, cpu) {
393 buffer->buffers[cpu] =
394 rb_allocate_cpu_buffer(buffer, cpu);
395 if (!buffer->buffers[cpu])
396 goto fail_free_buffers;
397 }
398
399 mutex_init(&buffer->mutex);
400
401 return buffer;
402
403 fail_free_buffers:
404 for_each_buffer_cpu(buffer, cpu) {
405 if (buffer->buffers[cpu])
406 rb_free_cpu_buffer(buffer->buffers[cpu]);
407 }
408 kfree(buffer->buffers);
409
410 fail_free_buffer:
411 kfree(buffer);
412 return NULL;
413}
414
415/**
416 * ring_buffer_free - free a ring buffer.
417 * @buffer: the buffer to free.
418 */
419void
420ring_buffer_free(struct ring_buffer *buffer)
421{
422 int cpu;
423
424 for_each_buffer_cpu(buffer, cpu)
425 rb_free_cpu_buffer(buffer->buffers[cpu]);
426
427 kfree(buffer);
428}
429
430static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
431
432static void
433rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
434{
435 struct buffer_page *page;
436 struct list_head *p;
437 unsigned i;
438
439 atomic_inc(&cpu_buffer->record_disabled);
440 synchronize_sched();
441
442 for (i = 0; i < nr_pages; i++) {
443 BUG_ON(list_empty(&cpu_buffer->pages));
444 p = cpu_buffer->pages.next;
445 page = list_entry(p, struct buffer_page, list);
446 list_del_init(&page->list);
ed56829c 447 free_buffer_page(page);
7a8e76a3
SR
448 }
449 BUG_ON(list_empty(&cpu_buffer->pages));
450
451 rb_reset_cpu(cpu_buffer);
452
453 rb_check_pages(cpu_buffer);
454
455 atomic_dec(&cpu_buffer->record_disabled);
456
457}
458
459static void
460rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
461 struct list_head *pages, unsigned nr_pages)
462{
463 struct buffer_page *page;
464 struct list_head *p;
465 unsigned i;
466
467 atomic_inc(&cpu_buffer->record_disabled);
468 synchronize_sched();
469
470 for (i = 0; i < nr_pages; i++) {
471 BUG_ON(list_empty(pages));
472 p = pages->next;
473 page = list_entry(p, struct buffer_page, list);
474 list_del_init(&page->list);
475 list_add_tail(&page->list, &cpu_buffer->pages);
476 }
477 rb_reset_cpu(cpu_buffer);
478
479 rb_check_pages(cpu_buffer);
480
481 atomic_dec(&cpu_buffer->record_disabled);
482}
483
484/**
485 * ring_buffer_resize - resize the ring buffer
486 * @buffer: the buffer to resize.
487 * @size: the new size.
488 *
489 * The tracer is responsible for making sure that the buffer is
490 * not being used while changing the size.
491 * Note: We may be able to change the above requirement by using
492 * RCU synchronizations.
493 *
494 * Minimum size is 2 * BUF_PAGE_SIZE.
495 *
496 * Returns -1 on failure.
497 */
498int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
499{
500 struct ring_buffer_per_cpu *cpu_buffer;
501 unsigned nr_pages, rm_pages, new_pages;
502 struct buffer_page *page, *tmp;
503 unsigned long buffer_size;
504 unsigned long addr;
505 LIST_HEAD(pages);
506 int i, cpu;
507
508 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
509 size *= BUF_PAGE_SIZE;
510 buffer_size = buffer->pages * BUF_PAGE_SIZE;
511
512 /* we need a minimum of two pages */
513 if (size < BUF_PAGE_SIZE * 2)
514 size = BUF_PAGE_SIZE * 2;
515
516 if (size == buffer_size)
517 return size;
518
519 mutex_lock(&buffer->mutex);
520
521 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
522
523 if (size < buffer_size) {
524
525 /* easy case, just free pages */
526 BUG_ON(nr_pages >= buffer->pages);
527
528 rm_pages = buffer->pages - nr_pages;
529
530 for_each_buffer_cpu(buffer, cpu) {
531 cpu_buffer = buffer->buffers[cpu];
532 rb_remove_pages(cpu_buffer, rm_pages);
533 }
534 goto out;
535 }
536
537 /*
538 * This is a bit more difficult. We only want to add pages
539 * when we can allocate enough for all CPUs. We do this
540 * by allocating all the pages and storing them on a local
541 * link list. If we succeed in our allocation, then we
542 * add these pages to the cpu_buffers. Otherwise we just free
543 * them all and return -ENOMEM;
544 */
545 BUG_ON(nr_pages <= buffer->pages);
546 new_pages = nr_pages - buffer->pages;
547
548 for_each_buffer_cpu(buffer, cpu) {
549 for (i = 0; i < new_pages; i++) {
e4c2ce82
SR
550 page = kzalloc_node(ALIGN(sizeof(*page),
551 cache_line_size()),
552 GFP_KERNEL, cpu_to_node(cpu));
553 if (!page)
554 goto free_pages;
555 list_add(&page->list, &pages);
7a8e76a3
SR
556 addr = __get_free_page(GFP_KERNEL);
557 if (!addr)
558 goto free_pages;
e4c2ce82 559 page->page = (void *)addr;
7a8e76a3
SR
560 }
561 }
562
563 for_each_buffer_cpu(buffer, cpu) {
564 cpu_buffer = buffer->buffers[cpu];
565 rb_insert_pages(cpu_buffer, &pages, new_pages);
566 }
567
568 BUG_ON(!list_empty(&pages));
569
570 out:
571 buffer->pages = nr_pages;
572 mutex_unlock(&buffer->mutex);
573
574 return size;
575
576 free_pages:
577 list_for_each_entry_safe(page, tmp, &pages, list) {
578 list_del_init(&page->list);
ed56829c 579 free_buffer_page(page);
7a8e76a3
SR
580 }
581 return -ENOMEM;
582}
583
7a8e76a3
SR
584static inline int rb_null_event(struct ring_buffer_event *event)
585{
586 return event->type == RINGBUF_TYPE_PADDING;
587}
588
6f807acd 589static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
7a8e76a3 590{
e4c2ce82 591 return page->page + index;
7a8e76a3
SR
592}
593
594static inline struct ring_buffer_event *
d769041f 595rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 596{
6f807acd
SR
597 return __rb_page_index(cpu_buffer->reader_page,
598 cpu_buffer->reader_page->read);
599}
600
601static inline struct ring_buffer_event *
602rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
603{
604 return __rb_page_index(cpu_buffer->head_page,
605 cpu_buffer->head_page->read);
7a8e76a3
SR
606}
607
608static inline struct ring_buffer_event *
609rb_iter_head_event(struct ring_buffer_iter *iter)
610{
6f807acd 611 return __rb_page_index(iter->head_page, iter->head);
7a8e76a3
SR
612}
613
bf41a158
SR
614static inline unsigned rb_page_write(struct buffer_page *bpage)
615{
616 return local_read(&bpage->write);
617}
618
619static inline unsigned rb_page_commit(struct buffer_page *bpage)
620{
621 return local_read(&bpage->commit);
622}
623
624/* Size is determined by what has been commited */
625static inline unsigned rb_page_size(struct buffer_page *bpage)
626{
627 return rb_page_commit(bpage);
628}
629
630static inline unsigned
631rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
632{
633 return rb_page_commit(cpu_buffer->commit_page);
634}
635
636static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
637{
638 return rb_page_commit(cpu_buffer->head_page);
639}
640
7a8e76a3
SR
641/*
642 * When the tail hits the head and the buffer is in overwrite mode,
643 * the head jumps to the next page and all content on the previous
644 * page is discarded. But before doing so, we update the overrun
645 * variable of the buffer.
646 */
647static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
648{
649 struct ring_buffer_event *event;
650 unsigned long head;
651
652 for (head = 0; head < rb_head_size(cpu_buffer);
653 head += rb_event_length(event)) {
654
6f807acd 655 event = __rb_page_index(cpu_buffer->head_page, head);
7a8e76a3
SR
656 BUG_ON(rb_null_event(event));
657 /* Only count data entries */
658 if (event->type != RINGBUF_TYPE_DATA)
659 continue;
660 cpu_buffer->overrun++;
661 cpu_buffer->entries--;
662 }
663}
664
665static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
666 struct buffer_page **page)
667{
668 struct list_head *p = (*page)->list.next;
669
670 if (p == &cpu_buffer->pages)
671 p = p->next;
672
673 *page = list_entry(p, struct buffer_page, list);
674}
675
bf41a158
SR
676static inline unsigned
677rb_event_index(struct ring_buffer_event *event)
678{
679 unsigned long addr = (unsigned long)event;
680
681 return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
682}
683
684static inline int
685rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
686 struct ring_buffer_event *event)
687{
688 unsigned long addr = (unsigned long)event;
689 unsigned long index;
690
691 index = rb_event_index(event);
692 addr &= PAGE_MASK;
693
694 return cpu_buffer->commit_page->page == (void *)addr &&
695 rb_commit_index(cpu_buffer) == index;
696}
697
7a8e76a3 698static inline void
bf41a158
SR
699rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
700 struct ring_buffer_event *event)
7a8e76a3 701{
bf41a158
SR
702 unsigned long addr = (unsigned long)event;
703 unsigned long index;
704
705 index = rb_event_index(event);
706 addr &= PAGE_MASK;
707
708 while (cpu_buffer->commit_page->page != (void *)addr) {
709 RB_WARN_ON(cpu_buffer,
710 cpu_buffer->commit_page == cpu_buffer->tail_page);
711 cpu_buffer->commit_page->commit =
712 cpu_buffer->commit_page->write;
713 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
714 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
715 }
716
717 /* Now set the commit to the event's index */
718 local_set(&cpu_buffer->commit_page->commit, index);
7a8e76a3
SR
719}
720
bf41a158
SR
721static inline void
722rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 723{
bf41a158
SR
724 /*
725 * We only race with interrupts and NMIs on this CPU.
726 * If we own the commit event, then we can commit
727 * all others that interrupted us, since the interruptions
728 * are in stack format (they finish before they come
729 * back to us). This allows us to do a simple loop to
730 * assign the commit to the tail.
731 */
732 while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
733 cpu_buffer->commit_page->commit =
734 cpu_buffer->commit_page->write;
735 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
736 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
737 /* add barrier to keep gcc from optimizing too much */
738 barrier();
739 }
740 while (rb_commit_index(cpu_buffer) !=
741 rb_page_write(cpu_buffer->commit_page)) {
742 cpu_buffer->commit_page->commit =
743 cpu_buffer->commit_page->write;
744 barrier();
745 }
7a8e76a3
SR
746}
747
d769041f 748static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 749{
d769041f 750 cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
6f807acd 751 cpu_buffer->reader_page->read = 0;
d769041f
SR
752}
753
754static inline void rb_inc_iter(struct ring_buffer_iter *iter)
755{
756 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
757
758 /*
759 * The iterator could be on the reader page (it starts there).
760 * But the head could have moved, since the reader was
761 * found. Check for this case and assign the iterator
762 * to the head page instead of next.
763 */
764 if (iter->head_page == cpu_buffer->reader_page)
765 iter->head_page = cpu_buffer->head_page;
766 else
767 rb_inc_page(cpu_buffer, &iter->head_page);
768
7a8e76a3
SR
769 iter->read_stamp = iter->head_page->time_stamp;
770 iter->head = 0;
771}
772
773/**
774 * ring_buffer_update_event - update event type and data
775 * @event: the even to update
776 * @type: the type of event
777 * @length: the size of the event field in the ring buffer
778 *
779 * Update the type and data fields of the event. The length
780 * is the actual size that is written to the ring buffer,
781 * and with this, we can determine what to place into the
782 * data field.
783 */
784static inline void
785rb_update_event(struct ring_buffer_event *event,
786 unsigned type, unsigned length)
787{
788 event->type = type;
789
790 switch (type) {
791
792 case RINGBUF_TYPE_PADDING:
793 break;
794
795 case RINGBUF_TYPE_TIME_EXTEND:
796 event->len =
797 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
798 >> RB_ALIGNMENT_SHIFT;
799 break;
800
801 case RINGBUF_TYPE_TIME_STAMP:
802 event->len =
803 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
804 >> RB_ALIGNMENT_SHIFT;
805 break;
806
807 case RINGBUF_TYPE_DATA:
808 length -= RB_EVNT_HDR_SIZE;
809 if (length > RB_MAX_SMALL_DATA) {
810 event->len = 0;
811 event->array[0] = length;
812 } else
813 event->len =
814 (length + (RB_ALIGNMENT-1))
815 >> RB_ALIGNMENT_SHIFT;
816 break;
817 default:
818 BUG();
819 }
820}
821
822static inline unsigned rb_calculate_event_length(unsigned length)
823{
824 struct ring_buffer_event event; /* Used only for sizeof array */
825
826 /* zero length can cause confusions */
827 if (!length)
828 length = 1;
829
830 if (length > RB_MAX_SMALL_DATA)
831 length += sizeof(event.array[0]);
832
833 length += RB_EVNT_HDR_SIZE;
834 length = ALIGN(length, RB_ALIGNMENT);
835
836 return length;
837}
838
839static struct ring_buffer_event *
840__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
841 unsigned type, unsigned long length, u64 *ts)
842{
d769041f 843 struct buffer_page *tail_page, *head_page, *reader_page;
bf41a158 844 unsigned long tail, write;
7a8e76a3
SR
845 struct ring_buffer *buffer = cpu_buffer->buffer;
846 struct ring_buffer_event *event;
bf41a158 847 unsigned long flags;
7a8e76a3
SR
848
849 tail_page = cpu_buffer->tail_page;
bf41a158
SR
850 write = local_add_return(length, &tail_page->write);
851 tail = write - length;
7a8e76a3 852
bf41a158
SR
853 /* See if we shot pass the end of this buffer page */
854 if (write > BUF_PAGE_SIZE) {
7a8e76a3
SR
855 struct buffer_page *next_page = tail_page;
856
bf41a158
SR
857 spin_lock_irqsave(&cpu_buffer->lock, flags);
858
7a8e76a3
SR
859 rb_inc_page(cpu_buffer, &next_page);
860
d769041f
SR
861 head_page = cpu_buffer->head_page;
862 reader_page = cpu_buffer->reader_page;
863
864 /* we grabbed the lock before incrementing */
bf41a158
SR
865 RB_WARN_ON(cpu_buffer, next_page == reader_page);
866
867 /*
868 * If for some reason, we had an interrupt storm that made
869 * it all the way around the buffer, bail, and warn
870 * about it.
871 */
872 if (unlikely(next_page == cpu_buffer->commit_page)) {
873 WARN_ON_ONCE(1);
874 goto out_unlock;
875 }
d769041f 876
7a8e76a3 877 if (next_page == head_page) {
d769041f 878 if (!(buffer->flags & RB_FL_OVERWRITE)) {
bf41a158
SR
879 /* reset write */
880 if (tail <= BUF_PAGE_SIZE)
881 local_set(&tail_page->write, tail);
882 goto out_unlock;
d769041f 883 }
7a8e76a3 884
bf41a158
SR
885 /* tail_page has not moved yet? */
886 if (tail_page == cpu_buffer->tail_page) {
887 /* count overflows */
888 rb_update_overflow(cpu_buffer);
889
890 rb_inc_page(cpu_buffer, &head_page);
891 cpu_buffer->head_page = head_page;
892 cpu_buffer->head_page->read = 0;
893 }
894 }
7a8e76a3 895
bf41a158
SR
896 /*
897 * If the tail page is still the same as what we think
898 * it is, then it is up to us to update the tail
899 * pointer.
900 */
901 if (tail_page == cpu_buffer->tail_page) {
902 local_set(&next_page->write, 0);
903 local_set(&next_page->commit, 0);
904 cpu_buffer->tail_page = next_page;
905
906 /* reread the time stamp */
907 *ts = ring_buffer_time_stamp(cpu_buffer->cpu);
908 cpu_buffer->tail_page->time_stamp = *ts;
7a8e76a3
SR
909 }
910
bf41a158
SR
911 /*
912 * The actual tail page has moved forward.
913 */
914 if (tail < BUF_PAGE_SIZE) {
915 /* Mark the rest of the page with padding */
6f807acd 916 event = __rb_page_index(tail_page, tail);
7a8e76a3
SR
917 event->type = RINGBUF_TYPE_PADDING;
918 }
919
bf41a158
SR
920 if (tail <= BUF_PAGE_SIZE)
921 /* Set the write back to the previous setting */
922 local_set(&tail_page->write, tail);
923
924 /*
925 * If this was a commit entry that failed,
926 * increment that too
927 */
928 if (tail_page == cpu_buffer->commit_page &&
929 tail == rb_commit_index(cpu_buffer)) {
930 rb_set_commit_to_write(cpu_buffer);
931 }
932
933 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
934
935 /* fail and let the caller try again */
936 return ERR_PTR(-EAGAIN);
7a8e76a3
SR
937 }
938
bf41a158
SR
939 /* We reserved something on the buffer */
940
941 BUG_ON(write > BUF_PAGE_SIZE);
7a8e76a3 942
6f807acd 943 event = __rb_page_index(tail_page, tail);
7a8e76a3
SR
944 rb_update_event(event, type, length);
945
bf41a158
SR
946 /*
947 * If this is a commit and the tail is zero, then update
948 * this page's time stamp.
949 */
950 if (!tail && rb_is_commit(cpu_buffer, event))
951 cpu_buffer->commit_page->time_stamp = *ts;
952
7a8e76a3 953 return event;
bf41a158
SR
954
955 out_unlock:
956 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
957 return NULL;
7a8e76a3
SR
958}
959
960static int
961rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
962 u64 *ts, u64 *delta)
963{
964 struct ring_buffer_event *event;
965 static int once;
bf41a158 966 int ret;
7a8e76a3
SR
967
968 if (unlikely(*delta > (1ULL << 59) && !once++)) {
969 printk(KERN_WARNING "Delta way too big! %llu"
970 " ts=%llu write stamp = %llu\n",
e2862c94
SR
971 (unsigned long long)*delta,
972 (unsigned long long)*ts,
973 (unsigned long long)cpu_buffer->write_stamp);
7a8e76a3
SR
974 WARN_ON(1);
975 }
976
977 /*
978 * The delta is too big, we to add a
979 * new timestamp.
980 */
981 event = __rb_reserve_next(cpu_buffer,
982 RINGBUF_TYPE_TIME_EXTEND,
983 RB_LEN_TIME_EXTEND,
984 ts);
985 if (!event)
bf41a158 986 return -EBUSY;
7a8e76a3 987
bf41a158
SR
988 if (PTR_ERR(event) == -EAGAIN)
989 return -EAGAIN;
990
991 /* Only a commited time event can update the write stamp */
992 if (rb_is_commit(cpu_buffer, event)) {
993 /*
994 * If this is the first on the page, then we need to
995 * update the page itself, and just put in a zero.
996 */
997 if (rb_event_index(event)) {
998 event->time_delta = *delta & TS_MASK;
999 event->array[0] = *delta >> TS_SHIFT;
1000 } else {
1001 cpu_buffer->commit_page->time_stamp = *ts;
1002 event->time_delta = 0;
1003 event->array[0] = 0;
1004 }
7a8e76a3 1005 cpu_buffer->write_stamp = *ts;
bf41a158
SR
1006 /* let the caller know this was the commit */
1007 ret = 1;
1008 } else {
1009 /* Darn, this is just wasted space */
1010 event->time_delta = 0;
1011 event->array[0] = 0;
1012 ret = 0;
7a8e76a3
SR
1013 }
1014
bf41a158
SR
1015 *delta = 0;
1016
1017 return ret;
7a8e76a3
SR
1018}
1019
1020static struct ring_buffer_event *
1021rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1022 unsigned type, unsigned long length)
1023{
1024 struct ring_buffer_event *event;
1025 u64 ts, delta;
bf41a158 1026 int commit = 0;
7a8e76a3 1027
bf41a158 1028 again:
7a8e76a3
SR
1029 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1030
bf41a158
SR
1031 /*
1032 * Only the first commit can update the timestamp.
1033 * Yes there is a race here. If an interrupt comes in
1034 * just after the conditional and it traces too, then it
1035 * will also check the deltas. More than one timestamp may
1036 * also be made. But only the entry that did the actual
1037 * commit will be something other than zero.
1038 */
1039 if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1040 rb_page_write(cpu_buffer->tail_page) ==
1041 rb_commit_index(cpu_buffer)) {
1042
7a8e76a3
SR
1043 delta = ts - cpu_buffer->write_stamp;
1044
bf41a158
SR
1045 /* make sure this delta is calculated here */
1046 barrier();
1047
1048 /* Did the write stamp get updated already? */
1049 if (unlikely(ts < cpu_buffer->write_stamp))
1050 goto again;
1051
7a8e76a3 1052 if (test_time_stamp(delta)) {
7a8e76a3 1053
bf41a158
SR
1054 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1055
1056 if (commit == -EBUSY)
7a8e76a3 1057 return NULL;
bf41a158
SR
1058
1059 if (commit == -EAGAIN)
1060 goto again;
1061
1062 RB_WARN_ON(cpu_buffer, commit < 0);
7a8e76a3 1063 }
bf41a158
SR
1064 } else
1065 /* Non commits have zero deltas */
7a8e76a3 1066 delta = 0;
7a8e76a3
SR
1067
1068 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
bf41a158
SR
1069 if (PTR_ERR(event) == -EAGAIN)
1070 goto again;
1071
1072 if (!event) {
1073 if (unlikely(commit))
1074 /*
1075 * Ouch! We needed a timestamp and it was commited. But
1076 * we didn't get our event reserved.
1077 */
1078 rb_set_commit_to_write(cpu_buffer);
7a8e76a3 1079 return NULL;
bf41a158 1080 }
7a8e76a3 1081
bf41a158
SR
1082 /*
1083 * If the timestamp was commited, make the commit our entry
1084 * now so that we will update it when needed.
1085 */
1086 if (commit)
1087 rb_set_commit_event(cpu_buffer, event);
1088 else if (!rb_is_commit(cpu_buffer, event))
7a8e76a3
SR
1089 delta = 0;
1090
1091 event->time_delta = delta;
1092
1093 return event;
1094}
1095
bf41a158
SR
1096static DEFINE_PER_CPU(int, rb_need_resched);
1097
7a8e76a3
SR
1098/**
1099 * ring_buffer_lock_reserve - reserve a part of the buffer
1100 * @buffer: the ring buffer to reserve from
1101 * @length: the length of the data to reserve (excluding event header)
1102 * @flags: a pointer to save the interrupt flags
1103 *
1104 * Returns a reseverd event on the ring buffer to copy directly to.
1105 * The user of this interface will need to get the body to write into
1106 * and can use the ring_buffer_event_data() interface.
1107 *
1108 * The length is the length of the data needed, not the event length
1109 * which also includes the event header.
1110 *
1111 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1112 * If NULL is returned, then nothing has been allocated or locked.
1113 */
1114struct ring_buffer_event *
1115ring_buffer_lock_reserve(struct ring_buffer *buffer,
1116 unsigned long length,
1117 unsigned long *flags)
1118{
1119 struct ring_buffer_per_cpu *cpu_buffer;
1120 struct ring_buffer_event *event;
bf41a158 1121 int cpu, resched;
7a8e76a3
SR
1122
1123 if (atomic_read(&buffer->record_disabled))
1124 return NULL;
1125
bf41a158 1126 /* If we are tracing schedule, we don't want to recurse */
182e9f5f 1127 resched = ftrace_preempt_disable();
bf41a158 1128
7a8e76a3
SR
1129 cpu = raw_smp_processor_id();
1130
1131 if (!cpu_isset(cpu, buffer->cpumask))
d769041f 1132 goto out;
7a8e76a3
SR
1133
1134 cpu_buffer = buffer->buffers[cpu];
7a8e76a3
SR
1135
1136 if (atomic_read(&cpu_buffer->record_disabled))
d769041f 1137 goto out;
7a8e76a3
SR
1138
1139 length = rb_calculate_event_length(length);
1140 if (length > BUF_PAGE_SIZE)
bf41a158 1141 goto out;
7a8e76a3
SR
1142
1143 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1144 if (!event)
d769041f 1145 goto out;
7a8e76a3 1146
bf41a158
SR
1147 /*
1148 * Need to store resched state on this cpu.
1149 * Only the first needs to.
1150 */
1151
1152 if (preempt_count() == 1)
1153 per_cpu(rb_need_resched, cpu) = resched;
1154
7a8e76a3
SR
1155 return event;
1156
d769041f 1157 out:
182e9f5f 1158 ftrace_preempt_enable(resched);
7a8e76a3
SR
1159 return NULL;
1160}
1161
1162static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1163 struct ring_buffer_event *event)
1164{
7a8e76a3 1165 cpu_buffer->entries++;
bf41a158
SR
1166
1167 /* Only process further if we own the commit */
1168 if (!rb_is_commit(cpu_buffer, event))
1169 return;
1170
1171 cpu_buffer->write_stamp += event->time_delta;
1172
1173 rb_set_commit_to_write(cpu_buffer);
7a8e76a3
SR
1174}
1175
1176/**
1177 * ring_buffer_unlock_commit - commit a reserved
1178 * @buffer: The buffer to commit to
1179 * @event: The event pointer to commit.
1180 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
1181 *
1182 * This commits the data to the ring buffer, and releases any locks held.
1183 *
1184 * Must be paired with ring_buffer_lock_reserve.
1185 */
1186int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1187 struct ring_buffer_event *event,
1188 unsigned long flags)
1189{
1190 struct ring_buffer_per_cpu *cpu_buffer;
1191 int cpu = raw_smp_processor_id();
1192
1193 cpu_buffer = buffer->buffers[cpu];
1194
7a8e76a3
SR
1195 rb_commit(cpu_buffer, event);
1196
bf41a158
SR
1197 /*
1198 * Only the last preempt count needs to restore preemption.
1199 */
182e9f5f
SR
1200 if (preempt_count() == 1)
1201 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1202 else
bf41a158 1203 preempt_enable_no_resched_notrace();
7a8e76a3
SR
1204
1205 return 0;
1206}
1207
1208/**
1209 * ring_buffer_write - write data to the buffer without reserving
1210 * @buffer: The ring buffer to write to.
1211 * @length: The length of the data being written (excluding the event header)
1212 * @data: The data to write to the buffer.
1213 *
1214 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1215 * one function. If you already have the data to write to the buffer, it
1216 * may be easier to simply call this function.
1217 *
1218 * Note, like ring_buffer_lock_reserve, the length is the length of the data
1219 * and not the length of the event which would hold the header.
1220 */
1221int ring_buffer_write(struct ring_buffer *buffer,
1222 unsigned long length,
1223 void *data)
1224{
1225 struct ring_buffer_per_cpu *cpu_buffer;
1226 struct ring_buffer_event *event;
bf41a158 1227 unsigned long event_length;
7a8e76a3
SR
1228 void *body;
1229 int ret = -EBUSY;
bf41a158 1230 int cpu, resched;
7a8e76a3
SR
1231
1232 if (atomic_read(&buffer->record_disabled))
1233 return -EBUSY;
1234
182e9f5f 1235 resched = ftrace_preempt_disable();
bf41a158 1236
7a8e76a3
SR
1237 cpu = raw_smp_processor_id();
1238
1239 if (!cpu_isset(cpu, buffer->cpumask))
d769041f 1240 goto out;
7a8e76a3
SR
1241
1242 cpu_buffer = buffer->buffers[cpu];
7a8e76a3
SR
1243
1244 if (atomic_read(&cpu_buffer->record_disabled))
1245 goto out;
1246
1247 event_length = rb_calculate_event_length(length);
1248 event = rb_reserve_next_event(cpu_buffer,
1249 RINGBUF_TYPE_DATA, event_length);
1250 if (!event)
1251 goto out;
1252
1253 body = rb_event_data(event);
1254
1255 memcpy(body, data, length);
1256
1257 rb_commit(cpu_buffer, event);
1258
1259 ret = 0;
1260 out:
182e9f5f 1261 ftrace_preempt_enable(resched);
7a8e76a3
SR
1262
1263 return ret;
1264}
1265
bf41a158
SR
1266static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1267{
1268 struct buffer_page *reader = cpu_buffer->reader_page;
1269 struct buffer_page *head = cpu_buffer->head_page;
1270 struct buffer_page *commit = cpu_buffer->commit_page;
1271
1272 return reader->read == rb_page_commit(reader) &&
1273 (commit == reader ||
1274 (commit == head &&
1275 head->read == rb_page_commit(commit)));
1276}
1277
7a8e76a3
SR
1278/**
1279 * ring_buffer_record_disable - stop all writes into the buffer
1280 * @buffer: The ring buffer to stop writes to.
1281 *
1282 * This prevents all writes to the buffer. Any attempt to write
1283 * to the buffer after this will fail and return NULL.
1284 *
1285 * The caller should call synchronize_sched() after this.
1286 */
1287void ring_buffer_record_disable(struct ring_buffer *buffer)
1288{
1289 atomic_inc(&buffer->record_disabled);
1290}
1291
1292/**
1293 * ring_buffer_record_enable - enable writes to the buffer
1294 * @buffer: The ring buffer to enable writes
1295 *
1296 * Note, multiple disables will need the same number of enables
1297 * to truely enable the writing (much like preempt_disable).
1298 */
1299void ring_buffer_record_enable(struct ring_buffer *buffer)
1300{
1301 atomic_dec(&buffer->record_disabled);
1302}
1303
1304/**
1305 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1306 * @buffer: The ring buffer to stop writes to.
1307 * @cpu: The CPU buffer to stop
1308 *
1309 * This prevents all writes to the buffer. Any attempt to write
1310 * to the buffer after this will fail and return NULL.
1311 *
1312 * The caller should call synchronize_sched() after this.
1313 */
1314void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1315{
1316 struct ring_buffer_per_cpu *cpu_buffer;
1317
1318 if (!cpu_isset(cpu, buffer->cpumask))
1319 return;
1320
1321 cpu_buffer = buffer->buffers[cpu];
1322 atomic_inc(&cpu_buffer->record_disabled);
1323}
1324
1325/**
1326 * ring_buffer_record_enable_cpu - enable writes to the buffer
1327 * @buffer: The ring buffer to enable writes
1328 * @cpu: The CPU to enable.
1329 *
1330 * Note, multiple disables will need the same number of enables
1331 * to truely enable the writing (much like preempt_disable).
1332 */
1333void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1334{
1335 struct ring_buffer_per_cpu *cpu_buffer;
1336
1337 if (!cpu_isset(cpu, buffer->cpumask))
1338 return;
1339
1340 cpu_buffer = buffer->buffers[cpu];
1341 atomic_dec(&cpu_buffer->record_disabled);
1342}
1343
1344/**
1345 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1346 * @buffer: The ring buffer
1347 * @cpu: The per CPU buffer to get the entries from.
1348 */
1349unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1350{
1351 struct ring_buffer_per_cpu *cpu_buffer;
1352
1353 if (!cpu_isset(cpu, buffer->cpumask))
1354 return 0;
1355
1356 cpu_buffer = buffer->buffers[cpu];
1357 return cpu_buffer->entries;
1358}
1359
1360/**
1361 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1362 * @buffer: The ring buffer
1363 * @cpu: The per CPU buffer to get the number of overruns from
1364 */
1365unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1366{
1367 struct ring_buffer_per_cpu *cpu_buffer;
1368
1369 if (!cpu_isset(cpu, buffer->cpumask))
1370 return 0;
1371
1372 cpu_buffer = buffer->buffers[cpu];
1373 return cpu_buffer->overrun;
1374}
1375
1376/**
1377 * ring_buffer_entries - get the number of entries in a buffer
1378 * @buffer: The ring buffer
1379 *
1380 * Returns the total number of entries in the ring buffer
1381 * (all CPU entries)
1382 */
1383unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1384{
1385 struct ring_buffer_per_cpu *cpu_buffer;
1386 unsigned long entries = 0;
1387 int cpu;
1388
1389 /* if you care about this being correct, lock the buffer */
1390 for_each_buffer_cpu(buffer, cpu) {
1391 cpu_buffer = buffer->buffers[cpu];
1392 entries += cpu_buffer->entries;
1393 }
1394
1395 return entries;
1396}
1397
1398/**
1399 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1400 * @buffer: The ring buffer
1401 *
1402 * Returns the total number of overruns in the ring buffer
1403 * (all CPU entries)
1404 */
1405unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1406{
1407 struct ring_buffer_per_cpu *cpu_buffer;
1408 unsigned long overruns = 0;
1409 int cpu;
1410
1411 /* if you care about this being correct, lock the buffer */
1412 for_each_buffer_cpu(buffer, cpu) {
1413 cpu_buffer = buffer->buffers[cpu];
1414 overruns += cpu_buffer->overrun;
1415 }
1416
1417 return overruns;
1418}
1419
1420/**
1421 * ring_buffer_iter_reset - reset an iterator
1422 * @iter: The iterator to reset
1423 *
1424 * Resets the iterator, so that it will start from the beginning
1425 * again.
1426 */
1427void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1428{
1429 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1430
d769041f
SR
1431 /* Iterator usage is expected to have record disabled */
1432 if (list_empty(&cpu_buffer->reader_page->list)) {
1433 iter->head_page = cpu_buffer->head_page;
6f807acd 1434 iter->head = cpu_buffer->head_page->read;
d769041f
SR
1435 } else {
1436 iter->head_page = cpu_buffer->reader_page;
6f807acd 1437 iter->head = cpu_buffer->reader_page->read;
d769041f
SR
1438 }
1439 if (iter->head)
1440 iter->read_stamp = cpu_buffer->read_stamp;
1441 else
1442 iter->read_stamp = iter->head_page->time_stamp;
7a8e76a3
SR
1443}
1444
1445/**
1446 * ring_buffer_iter_empty - check if an iterator has no more to read
1447 * @iter: The iterator to check
1448 */
1449int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1450{
1451 struct ring_buffer_per_cpu *cpu_buffer;
1452
1453 cpu_buffer = iter->cpu_buffer;
1454
bf41a158
SR
1455 return iter->head_page == cpu_buffer->commit_page &&
1456 iter->head == rb_commit_index(cpu_buffer);
7a8e76a3
SR
1457}
1458
1459static void
1460rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1461 struct ring_buffer_event *event)
1462{
1463 u64 delta;
1464
1465 switch (event->type) {
1466 case RINGBUF_TYPE_PADDING:
1467 return;
1468
1469 case RINGBUF_TYPE_TIME_EXTEND:
1470 delta = event->array[0];
1471 delta <<= TS_SHIFT;
1472 delta += event->time_delta;
1473 cpu_buffer->read_stamp += delta;
1474 return;
1475
1476 case RINGBUF_TYPE_TIME_STAMP:
1477 /* FIXME: not implemented */
1478 return;
1479
1480 case RINGBUF_TYPE_DATA:
1481 cpu_buffer->read_stamp += event->time_delta;
1482 return;
1483
1484 default:
1485 BUG();
1486 }
1487 return;
1488}
1489
1490static void
1491rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1492 struct ring_buffer_event *event)
1493{
1494 u64 delta;
1495
1496 switch (event->type) {
1497 case RINGBUF_TYPE_PADDING:
1498 return;
1499
1500 case RINGBUF_TYPE_TIME_EXTEND:
1501 delta = event->array[0];
1502 delta <<= TS_SHIFT;
1503 delta += event->time_delta;
1504 iter->read_stamp += delta;
1505 return;
1506
1507 case RINGBUF_TYPE_TIME_STAMP:
1508 /* FIXME: not implemented */
1509 return;
1510
1511 case RINGBUF_TYPE_DATA:
1512 iter->read_stamp += event->time_delta;
1513 return;
1514
1515 default:
1516 BUG();
1517 }
1518 return;
1519}
1520
d769041f
SR
1521static struct buffer_page *
1522rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 1523{
d769041f
SR
1524 struct buffer_page *reader = NULL;
1525 unsigned long flags;
1526
1527 spin_lock_irqsave(&cpu_buffer->lock, flags);
1528
1529 again:
1530 reader = cpu_buffer->reader_page;
1531
1532 /* If there's more to read, return this page */
bf41a158 1533 if (cpu_buffer->reader_page->read < rb_page_size(reader))
d769041f
SR
1534 goto out;
1535
1536 /* Never should we have an index greater than the size */
bf41a158
SR
1537 RB_WARN_ON(cpu_buffer,
1538 cpu_buffer->reader_page->read > rb_page_size(reader));
d769041f
SR
1539
1540 /* check if we caught up to the tail */
1541 reader = NULL;
bf41a158 1542 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
d769041f 1543 goto out;
7a8e76a3
SR
1544
1545 /*
d769041f
SR
1546 * Splice the empty reader page into the list around the head.
1547 * Reset the reader page to size zero.
7a8e76a3 1548 */
7a8e76a3 1549
d769041f
SR
1550 reader = cpu_buffer->head_page;
1551 cpu_buffer->reader_page->list.next = reader->list.next;
1552 cpu_buffer->reader_page->list.prev = reader->list.prev;
bf41a158
SR
1553
1554 local_set(&cpu_buffer->reader_page->write, 0);
1555 local_set(&cpu_buffer->reader_page->commit, 0);
7a8e76a3 1556
d769041f
SR
1557 /* Make the reader page now replace the head */
1558 reader->list.prev->next = &cpu_buffer->reader_page->list;
1559 reader->list.next->prev = &cpu_buffer->reader_page->list;
7a8e76a3
SR
1560
1561 /*
d769041f
SR
1562 * If the tail is on the reader, then we must set the head
1563 * to the inserted page, otherwise we set it one before.
7a8e76a3 1564 */
d769041f 1565 cpu_buffer->head_page = cpu_buffer->reader_page;
7a8e76a3 1566
bf41a158 1567 if (cpu_buffer->commit_page != reader)
d769041f
SR
1568 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1569
1570 /* Finally update the reader page to the new head */
1571 cpu_buffer->reader_page = reader;
1572 rb_reset_reader_page(cpu_buffer);
1573
1574 goto again;
1575
1576 out:
1577 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1578
1579 return reader;
1580}
1581
1582static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1583{
1584 struct ring_buffer_event *event;
1585 struct buffer_page *reader;
1586 unsigned length;
1587
1588 reader = rb_get_reader_page(cpu_buffer);
7a8e76a3 1589
d769041f
SR
1590 /* This function should not be called when buffer is empty */
1591 BUG_ON(!reader);
7a8e76a3 1592
d769041f
SR
1593 event = rb_reader_event(cpu_buffer);
1594
1595 if (event->type == RINGBUF_TYPE_DATA)
1596 cpu_buffer->entries--;
1597
1598 rb_update_read_stamp(cpu_buffer, event);
1599
1600 length = rb_event_length(event);
6f807acd 1601 cpu_buffer->reader_page->read += length;
7a8e76a3
SR
1602}
1603
1604static void rb_advance_iter(struct ring_buffer_iter *iter)
1605{
1606 struct ring_buffer *buffer;
1607 struct ring_buffer_per_cpu *cpu_buffer;
1608 struct ring_buffer_event *event;
1609 unsigned length;
1610
1611 cpu_buffer = iter->cpu_buffer;
1612 buffer = cpu_buffer->buffer;
1613
1614 /*
1615 * Check if we are at the end of the buffer.
1616 */
bf41a158
SR
1617 if (iter->head >= rb_page_size(iter->head_page)) {
1618 BUG_ON(iter->head_page == cpu_buffer->commit_page);
d769041f 1619 rb_inc_iter(iter);
7a8e76a3
SR
1620 return;
1621 }
1622
1623 event = rb_iter_head_event(iter);
1624
1625 length = rb_event_length(event);
1626
1627 /*
1628 * This should not be called to advance the header if we are
1629 * at the tail of the buffer.
1630 */
bf41a158
SR
1631 BUG_ON((iter->head_page == cpu_buffer->commit_page) &&
1632 (iter->head + length > rb_commit_index(cpu_buffer)));
7a8e76a3
SR
1633
1634 rb_update_iter_read_stamp(iter, event);
1635
1636 iter->head += length;
1637
1638 /* check for end of page padding */
bf41a158
SR
1639 if ((iter->head >= rb_page_size(iter->head_page)) &&
1640 (iter->head_page != cpu_buffer->commit_page))
7a8e76a3
SR
1641 rb_advance_iter(iter);
1642}
1643
1644/**
1645 * ring_buffer_peek - peek at the next event to be read
1646 * @buffer: The ring buffer to read
1647 * @cpu: The cpu to peak at
1648 * @ts: The timestamp counter of this event.
1649 *
1650 * This will return the event that will be read next, but does
1651 * not consume the data.
1652 */
1653struct ring_buffer_event *
1654ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1655{
1656 struct ring_buffer_per_cpu *cpu_buffer;
1657 struct ring_buffer_event *event;
d769041f 1658 struct buffer_page *reader;
7a8e76a3
SR
1659
1660 if (!cpu_isset(cpu, buffer->cpumask))
1661 return NULL;
1662
1663 cpu_buffer = buffer->buffers[cpu];
1664
1665 again:
d769041f
SR
1666 reader = rb_get_reader_page(cpu_buffer);
1667 if (!reader)
7a8e76a3
SR
1668 return NULL;
1669
d769041f 1670 event = rb_reader_event(cpu_buffer);
7a8e76a3
SR
1671
1672 switch (event->type) {
1673 case RINGBUF_TYPE_PADDING:
bf41a158 1674 RB_WARN_ON(cpu_buffer, 1);
d769041f
SR
1675 rb_advance_reader(cpu_buffer);
1676 return NULL;
7a8e76a3
SR
1677
1678 case RINGBUF_TYPE_TIME_EXTEND:
1679 /* Internal data, OK to advance */
d769041f 1680 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
1681 goto again;
1682
1683 case RINGBUF_TYPE_TIME_STAMP:
1684 /* FIXME: not implemented */
d769041f 1685 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
1686 goto again;
1687
1688 case RINGBUF_TYPE_DATA:
1689 if (ts) {
1690 *ts = cpu_buffer->read_stamp + event->time_delta;
1691 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1692 }
1693 return event;
1694
1695 default:
1696 BUG();
1697 }
1698
1699 return NULL;
1700}
1701
1702/**
1703 * ring_buffer_iter_peek - peek at the next event to be read
1704 * @iter: The ring buffer iterator
1705 * @ts: The timestamp counter of this event.
1706 *
1707 * This will return the event that will be read next, but does
1708 * not increment the iterator.
1709 */
1710struct ring_buffer_event *
1711ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1712{
1713 struct ring_buffer *buffer;
1714 struct ring_buffer_per_cpu *cpu_buffer;
1715 struct ring_buffer_event *event;
1716
1717 if (ring_buffer_iter_empty(iter))
1718 return NULL;
1719
1720 cpu_buffer = iter->cpu_buffer;
1721 buffer = cpu_buffer->buffer;
1722
1723 again:
1724 if (rb_per_cpu_empty(cpu_buffer))
1725 return NULL;
1726
1727 event = rb_iter_head_event(iter);
1728
1729 switch (event->type) {
1730 case RINGBUF_TYPE_PADDING:
d769041f 1731 rb_inc_iter(iter);
7a8e76a3
SR
1732 goto again;
1733
1734 case RINGBUF_TYPE_TIME_EXTEND:
1735 /* Internal data, OK to advance */
1736 rb_advance_iter(iter);
1737 goto again;
1738
1739 case RINGBUF_TYPE_TIME_STAMP:
1740 /* FIXME: not implemented */
1741 rb_advance_iter(iter);
1742 goto again;
1743
1744 case RINGBUF_TYPE_DATA:
1745 if (ts) {
1746 *ts = iter->read_stamp + event->time_delta;
1747 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1748 }
1749 return event;
1750
1751 default:
1752 BUG();
1753 }
1754
1755 return NULL;
1756}
1757
1758/**
1759 * ring_buffer_consume - return an event and consume it
1760 * @buffer: The ring buffer to get the next event from
1761 *
1762 * Returns the next event in the ring buffer, and that event is consumed.
1763 * Meaning, that sequential reads will keep returning a different event,
1764 * and eventually empty the ring buffer if the producer is slower.
1765 */
1766struct ring_buffer_event *
1767ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1768{
1769 struct ring_buffer_per_cpu *cpu_buffer;
1770 struct ring_buffer_event *event;
1771
1772 if (!cpu_isset(cpu, buffer->cpumask))
1773 return NULL;
1774
1775 event = ring_buffer_peek(buffer, cpu, ts);
1776 if (!event)
1777 return NULL;
1778
1779 cpu_buffer = buffer->buffers[cpu];
d769041f 1780 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
1781
1782 return event;
1783}
1784
1785/**
1786 * ring_buffer_read_start - start a non consuming read of the buffer
1787 * @buffer: The ring buffer to read from
1788 * @cpu: The cpu buffer to iterate over
1789 *
1790 * This starts up an iteration through the buffer. It also disables
1791 * the recording to the buffer until the reading is finished.
1792 * This prevents the reading from being corrupted. This is not
1793 * a consuming read, so a producer is not expected.
1794 *
1795 * Must be paired with ring_buffer_finish.
1796 */
1797struct ring_buffer_iter *
1798ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1799{
1800 struct ring_buffer_per_cpu *cpu_buffer;
1801 struct ring_buffer_iter *iter;
d769041f 1802 unsigned long flags;
7a8e76a3
SR
1803
1804 if (!cpu_isset(cpu, buffer->cpumask))
1805 return NULL;
1806
1807 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1808 if (!iter)
1809 return NULL;
1810
1811 cpu_buffer = buffer->buffers[cpu];
1812
1813 iter->cpu_buffer = cpu_buffer;
1814
1815 atomic_inc(&cpu_buffer->record_disabled);
1816 synchronize_sched();
1817
d769041f
SR
1818 spin_lock_irqsave(&cpu_buffer->lock, flags);
1819 ring_buffer_iter_reset(iter);
1820 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
7a8e76a3
SR
1821
1822 return iter;
1823}
1824
1825/**
1826 * ring_buffer_finish - finish reading the iterator of the buffer
1827 * @iter: The iterator retrieved by ring_buffer_start
1828 *
1829 * This re-enables the recording to the buffer, and frees the
1830 * iterator.
1831 */
1832void
1833ring_buffer_read_finish(struct ring_buffer_iter *iter)
1834{
1835 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1836
1837 atomic_dec(&cpu_buffer->record_disabled);
1838 kfree(iter);
1839}
1840
1841/**
1842 * ring_buffer_read - read the next item in the ring buffer by the iterator
1843 * @iter: The ring buffer iterator
1844 * @ts: The time stamp of the event read.
1845 *
1846 * This reads the next event in the ring buffer and increments the iterator.
1847 */
1848struct ring_buffer_event *
1849ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1850{
1851 struct ring_buffer_event *event;
1852
1853 event = ring_buffer_iter_peek(iter, ts);
1854 if (!event)
1855 return NULL;
1856
1857 rb_advance_iter(iter);
1858
1859 return event;
1860}
1861
1862/**
1863 * ring_buffer_size - return the size of the ring buffer (in bytes)
1864 * @buffer: The ring buffer.
1865 */
1866unsigned long ring_buffer_size(struct ring_buffer *buffer)
1867{
1868 return BUF_PAGE_SIZE * buffer->pages;
1869}
1870
1871static void
1872rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1873{
1874 cpu_buffer->head_page
1875 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
bf41a158
SR
1876 local_set(&cpu_buffer->head_page->write, 0);
1877 local_set(&cpu_buffer->head_page->commit, 0);
d769041f 1878
6f807acd 1879 cpu_buffer->head_page->read = 0;
bf41a158
SR
1880
1881 cpu_buffer->tail_page = cpu_buffer->head_page;
1882 cpu_buffer->commit_page = cpu_buffer->head_page;
1883
1884 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1885 local_set(&cpu_buffer->reader_page->write, 0);
1886 local_set(&cpu_buffer->reader_page->commit, 0);
6f807acd 1887 cpu_buffer->reader_page->read = 0;
7a8e76a3 1888
7a8e76a3
SR
1889 cpu_buffer->overrun = 0;
1890 cpu_buffer->entries = 0;
1891}
1892
1893/**
1894 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1895 * @buffer: The ring buffer to reset a per cpu buffer of
1896 * @cpu: The CPU buffer to be reset
1897 */
1898void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1899{
1900 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1901 unsigned long flags;
1902
1903 if (!cpu_isset(cpu, buffer->cpumask))
1904 return;
1905
d769041f 1906 spin_lock_irqsave(&cpu_buffer->lock, flags);
7a8e76a3
SR
1907
1908 rb_reset_cpu(cpu_buffer);
1909
d769041f 1910 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
7a8e76a3
SR
1911}
1912
1913/**
1914 * ring_buffer_reset - reset a ring buffer
1915 * @buffer: The ring buffer to reset all cpu buffers
1916 */
1917void ring_buffer_reset(struct ring_buffer *buffer)
1918{
7a8e76a3
SR
1919 int cpu;
1920
7a8e76a3 1921 for_each_buffer_cpu(buffer, cpu)
d769041f 1922 ring_buffer_reset_cpu(buffer, cpu);
7a8e76a3
SR
1923}
1924
1925/**
1926 * rind_buffer_empty - is the ring buffer empty?
1927 * @buffer: The ring buffer to test
1928 */
1929int ring_buffer_empty(struct ring_buffer *buffer)
1930{
1931 struct ring_buffer_per_cpu *cpu_buffer;
1932 int cpu;
1933
1934 /* yes this is racy, but if you don't like the race, lock the buffer */
1935 for_each_buffer_cpu(buffer, cpu) {
1936 cpu_buffer = buffer->buffers[cpu];
1937 if (!rb_per_cpu_empty(cpu_buffer))
1938 return 0;
1939 }
1940 return 1;
1941}
1942
1943/**
1944 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1945 * @buffer: The ring buffer
1946 * @cpu: The CPU buffer to test
1947 */
1948int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1949{
1950 struct ring_buffer_per_cpu *cpu_buffer;
1951
1952 if (!cpu_isset(cpu, buffer->cpumask))
1953 return 1;
1954
1955 cpu_buffer = buffer->buffers[cpu];
1956 return rb_per_cpu_empty(cpu_buffer);
1957}
1958
1959/**
1960 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1961 * @buffer_a: One buffer to swap with
1962 * @buffer_b: The other buffer to swap with
1963 *
1964 * This function is useful for tracers that want to take a "snapshot"
1965 * of a CPU buffer and has another back up buffer lying around.
1966 * it is expected that the tracer handles the cpu buffer not being
1967 * used at the moment.
1968 */
1969int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1970 struct ring_buffer *buffer_b, int cpu)
1971{
1972 struct ring_buffer_per_cpu *cpu_buffer_a;
1973 struct ring_buffer_per_cpu *cpu_buffer_b;
1974
1975 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1976 !cpu_isset(cpu, buffer_b->cpumask))
1977 return -EINVAL;
1978
1979 /* At least make sure the two buffers are somewhat the same */
1980 if (buffer_a->size != buffer_b->size ||
1981 buffer_a->pages != buffer_b->pages)
1982 return -EINVAL;
1983
1984 cpu_buffer_a = buffer_a->buffers[cpu];
1985 cpu_buffer_b = buffer_b->buffers[cpu];
1986
1987 /*
1988 * We can't do a synchronize_sched here because this
1989 * function can be called in atomic context.
1990 * Normally this will be called from the same CPU as cpu.
1991 * If not it's up to the caller to protect this.
1992 */
1993 atomic_inc(&cpu_buffer_a->record_disabled);
1994 atomic_inc(&cpu_buffer_b->record_disabled);
1995
1996 buffer_a->buffers[cpu] = cpu_buffer_b;
1997 buffer_b->buffers[cpu] = cpu_buffer_a;
1998
1999 cpu_buffer_b->buffer = buffer_a;
2000 cpu_buffer_a->buffer = buffer_b;
2001
2002 atomic_dec(&cpu_buffer_a->record_disabled);
2003 atomic_dec(&cpu_buffer_b->record_disabled);
2004
2005 return 0;
2006}
2007