]> bbs.cooldavid.org Git - net-next-2.6.git/blame - kernel/trace/ring_buffer.c
ring-buffer: move page indexes into page headers
[net-next-2.6.git] / kernel / trace / ring_buffer.c
CommitLineData
7a8e76a3
SR
1/*
2 * Generic ring buffer
3 *
4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5 */
6#include <linux/ring_buffer.h>
7#include <linux/spinlock.h>
8#include <linux/debugfs.h>
9#include <linux/uaccess.h>
10#include <linux/module.h>
11#include <linux/percpu.h>
12#include <linux/mutex.h>
13#include <linux/sched.h> /* used for sched_clock() (for now) */
14#include <linux/init.h>
15#include <linux/hash.h>
16#include <linux/list.h>
17#include <linux/fs.h>
18
19/* Up this if you want to test the TIME_EXTENTS and normalization */
20#define DEBUG_SHIFT 0
21
22/* FIXME!!! */
23u64 ring_buffer_time_stamp(int cpu)
24{
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
27}
28
29void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30{
31 /* Just stupid testing the normalize function and deltas */
32 *ts >>= DEBUG_SHIFT;
33}
34
35#define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36#define RB_ALIGNMENT_SHIFT 2
37#define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38#define RB_MAX_SMALL_DATA 28
39
40enum {
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
43};
44
45/* inline for ring buffer fast paths */
46static inline unsigned
47rb_event_length(struct ring_buffer_event *event)
48{
49 unsigned length;
50
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
53 /* undefined */
54 return -1;
55
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
58
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
61
62 case RINGBUF_TYPE_DATA:
63 if (event->len)
64 length = event->len << RB_ALIGNMENT_SHIFT;
65 else
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
68 default:
69 BUG();
70 }
71 /* not hit */
72 return 0;
73}
74
75/**
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
78 */
79unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80{
81 return rb_event_length(event);
82}
83
84/* inline for ring buffer fast paths */
85static inline void *
86rb_event_data(struct ring_buffer_event *event)
87{
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
90 if (event->len)
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
94}
95
96/**
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
99 */
100void *ring_buffer_event_data(struct ring_buffer_event *event)
101{
102 return rb_event_data(event);
103}
104
105#define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
107
108#define TS_SHIFT 27
109#define TS_MASK ((1ULL << TS_SHIFT) - 1)
110#define TS_DELTA_TEST (~TS_MASK)
111
112/*
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
116 */
117struct buffer_page {
e4c2ce82
SR
118 u64 time_stamp; /* page time stamp */
119 unsigned size; /* size of page data */
6f807acd
SR
120 unsigned write; /* index for next write */
121 unsigned read; /* index for next read */
e4c2ce82
SR
122 struct list_head list; /* list of free pages */
123 void *page; /* Actual data page */
7a8e76a3
SR
124};
125
ed56829c
SR
126/*
127 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
128 * this issue out.
129 */
130static inline void free_buffer_page(struct buffer_page *bpage)
131{
e4c2ce82
SR
132 if (bpage->page)
133 __free_page(bpage->page);
134 kfree(bpage);
ed56829c
SR
135}
136
7a8e76a3
SR
137/*
138 * We need to fit the time_stamp delta into 27 bits.
139 */
140static inline int test_time_stamp(u64 delta)
141{
142 if (delta & TS_DELTA_TEST)
143 return 1;
144 return 0;
145}
146
147#define BUF_PAGE_SIZE PAGE_SIZE
148
149/*
150 * head_page == tail_page && head == tail then buffer is empty.
151 */
152struct ring_buffer_per_cpu {
153 int cpu;
154 struct ring_buffer *buffer;
155 spinlock_t lock;
156 struct lock_class_key lock_key;
157 struct list_head pages;
6f807acd
SR
158 struct buffer_page *head_page; /* read from head */
159 struct buffer_page *tail_page; /* write to tail */
d769041f 160 struct buffer_page *reader_page;
7a8e76a3
SR
161 unsigned long overrun;
162 unsigned long entries;
163 u64 write_stamp;
164 u64 read_stamp;
165 atomic_t record_disabled;
166};
167
168struct ring_buffer {
169 unsigned long size;
170 unsigned pages;
171 unsigned flags;
172 int cpus;
173 cpumask_t cpumask;
174 atomic_t record_disabled;
175
176 struct mutex mutex;
177
178 struct ring_buffer_per_cpu **buffers;
179};
180
181struct ring_buffer_iter {
182 struct ring_buffer_per_cpu *cpu_buffer;
183 unsigned long head;
184 struct buffer_page *head_page;
185 u64 read_stamp;
186};
187
188#define RB_WARN_ON(buffer, cond) \
189 if (unlikely(cond)) { \
190 atomic_inc(&buffer->record_disabled); \
191 WARN_ON(1); \
192 return -1; \
193 }
194
195/**
196 * check_pages - integrity check of buffer pages
197 * @cpu_buffer: CPU buffer with pages to test
198 *
199 * As a safty measure we check to make sure the data pages have not
200 * been corrupted.
201 */
202static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
203{
204 struct list_head *head = &cpu_buffer->pages;
205 struct buffer_page *page, *tmp;
206
207 RB_WARN_ON(cpu_buffer, head->next->prev != head);
208 RB_WARN_ON(cpu_buffer, head->prev->next != head);
209
210 list_for_each_entry_safe(page, tmp, head, list) {
211 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
212 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
213 }
214
215 return 0;
216}
217
218static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
219{
220 return cpu_buffer->head_page->size;
221}
222
223static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
224 unsigned nr_pages)
225{
226 struct list_head *head = &cpu_buffer->pages;
227 struct buffer_page *page, *tmp;
228 unsigned long addr;
229 LIST_HEAD(pages);
230 unsigned i;
231
232 for (i = 0; i < nr_pages; i++) {
e4c2ce82 233 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
aa1e0e3b 234 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
e4c2ce82
SR
235 if (!page)
236 goto free_pages;
237 list_add(&page->list, &pages);
238
7a8e76a3
SR
239 addr = __get_free_page(GFP_KERNEL);
240 if (!addr)
241 goto free_pages;
e4c2ce82 242 page->page = (void *)addr;
7a8e76a3
SR
243 }
244
245 list_splice(&pages, head);
246
247 rb_check_pages(cpu_buffer);
248
249 return 0;
250
251 free_pages:
252 list_for_each_entry_safe(page, tmp, &pages, list) {
253 list_del_init(&page->list);
ed56829c 254 free_buffer_page(page);
7a8e76a3
SR
255 }
256 return -ENOMEM;
257}
258
259static struct ring_buffer_per_cpu *
260rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
261{
262 struct ring_buffer_per_cpu *cpu_buffer;
e4c2ce82 263 struct buffer_page *page;
d769041f 264 unsigned long addr;
7a8e76a3
SR
265 int ret;
266
267 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
268 GFP_KERNEL, cpu_to_node(cpu));
269 if (!cpu_buffer)
270 return NULL;
271
272 cpu_buffer->cpu = cpu;
273 cpu_buffer->buffer = buffer;
274 spin_lock_init(&cpu_buffer->lock);
275 INIT_LIST_HEAD(&cpu_buffer->pages);
276
e4c2ce82
SR
277 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
278 GFP_KERNEL, cpu_to_node(cpu));
279 if (!page)
280 goto fail_free_buffer;
281
282 cpu_buffer->reader_page = page;
d769041f
SR
283 addr = __get_free_page(GFP_KERNEL);
284 if (!addr)
e4c2ce82
SR
285 goto fail_free_reader;
286 page->page = (void *)addr;
287
d769041f
SR
288 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
289 cpu_buffer->reader_page->size = 0;
290
7a8e76a3
SR
291 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
292 if (ret < 0)
d769041f 293 goto fail_free_reader;
7a8e76a3
SR
294
295 cpu_buffer->head_page
296 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
297 cpu_buffer->tail_page
298 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
299
300 return cpu_buffer;
301
d769041f
SR
302 fail_free_reader:
303 free_buffer_page(cpu_buffer->reader_page);
304
7a8e76a3
SR
305 fail_free_buffer:
306 kfree(cpu_buffer);
307 return NULL;
308}
309
310static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
311{
312 struct list_head *head = &cpu_buffer->pages;
313 struct buffer_page *page, *tmp;
314
d769041f
SR
315 list_del_init(&cpu_buffer->reader_page->list);
316 free_buffer_page(cpu_buffer->reader_page);
317
7a8e76a3
SR
318 list_for_each_entry_safe(page, tmp, head, list) {
319 list_del_init(&page->list);
ed56829c 320 free_buffer_page(page);
7a8e76a3
SR
321 }
322 kfree(cpu_buffer);
323}
324
a7b13743
SR
325/*
326 * Causes compile errors if the struct buffer_page gets bigger
327 * than the struct page.
328 */
329extern int ring_buffer_page_too_big(void);
330
7a8e76a3
SR
331/**
332 * ring_buffer_alloc - allocate a new ring_buffer
333 * @size: the size in bytes that is needed.
334 * @flags: attributes to set for the ring buffer.
335 *
336 * Currently the only flag that is available is the RB_FL_OVERWRITE
337 * flag. This flag means that the buffer will overwrite old data
338 * when the buffer wraps. If this flag is not set, the buffer will
339 * drop data when the tail hits the head.
340 */
341struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
342{
343 struct ring_buffer *buffer;
344 int bsize;
345 int cpu;
346
a7b13743
SR
347 /* Paranoid! Optimizes out when all is well */
348 if (sizeof(struct buffer_page) > sizeof(struct page))
349 ring_buffer_page_too_big();
350
351
7a8e76a3
SR
352 /* keep it in its own cache line */
353 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
354 GFP_KERNEL);
355 if (!buffer)
356 return NULL;
357
358 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
359 buffer->flags = flags;
360
361 /* need at least two pages */
362 if (buffer->pages == 1)
363 buffer->pages++;
364
365 buffer->cpumask = cpu_possible_map;
366 buffer->cpus = nr_cpu_ids;
367
368 bsize = sizeof(void *) * nr_cpu_ids;
369 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
370 GFP_KERNEL);
371 if (!buffer->buffers)
372 goto fail_free_buffer;
373
374 for_each_buffer_cpu(buffer, cpu) {
375 buffer->buffers[cpu] =
376 rb_allocate_cpu_buffer(buffer, cpu);
377 if (!buffer->buffers[cpu])
378 goto fail_free_buffers;
379 }
380
381 mutex_init(&buffer->mutex);
382
383 return buffer;
384
385 fail_free_buffers:
386 for_each_buffer_cpu(buffer, cpu) {
387 if (buffer->buffers[cpu])
388 rb_free_cpu_buffer(buffer->buffers[cpu]);
389 }
390 kfree(buffer->buffers);
391
392 fail_free_buffer:
393 kfree(buffer);
394 return NULL;
395}
396
397/**
398 * ring_buffer_free - free a ring buffer.
399 * @buffer: the buffer to free.
400 */
401void
402ring_buffer_free(struct ring_buffer *buffer)
403{
404 int cpu;
405
406 for_each_buffer_cpu(buffer, cpu)
407 rb_free_cpu_buffer(buffer->buffers[cpu]);
408
409 kfree(buffer);
410}
411
412static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
413
414static void
415rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
416{
417 struct buffer_page *page;
418 struct list_head *p;
419 unsigned i;
420
421 atomic_inc(&cpu_buffer->record_disabled);
422 synchronize_sched();
423
424 for (i = 0; i < nr_pages; i++) {
425 BUG_ON(list_empty(&cpu_buffer->pages));
426 p = cpu_buffer->pages.next;
427 page = list_entry(p, struct buffer_page, list);
428 list_del_init(&page->list);
ed56829c 429 free_buffer_page(page);
7a8e76a3
SR
430 }
431 BUG_ON(list_empty(&cpu_buffer->pages));
432
433 rb_reset_cpu(cpu_buffer);
434
435 rb_check_pages(cpu_buffer);
436
437 atomic_dec(&cpu_buffer->record_disabled);
438
439}
440
441static void
442rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
443 struct list_head *pages, unsigned nr_pages)
444{
445 struct buffer_page *page;
446 struct list_head *p;
447 unsigned i;
448
449 atomic_inc(&cpu_buffer->record_disabled);
450 synchronize_sched();
451
452 for (i = 0; i < nr_pages; i++) {
453 BUG_ON(list_empty(pages));
454 p = pages->next;
455 page = list_entry(p, struct buffer_page, list);
456 list_del_init(&page->list);
457 list_add_tail(&page->list, &cpu_buffer->pages);
458 }
459 rb_reset_cpu(cpu_buffer);
460
461 rb_check_pages(cpu_buffer);
462
463 atomic_dec(&cpu_buffer->record_disabled);
464}
465
466/**
467 * ring_buffer_resize - resize the ring buffer
468 * @buffer: the buffer to resize.
469 * @size: the new size.
470 *
471 * The tracer is responsible for making sure that the buffer is
472 * not being used while changing the size.
473 * Note: We may be able to change the above requirement by using
474 * RCU synchronizations.
475 *
476 * Minimum size is 2 * BUF_PAGE_SIZE.
477 *
478 * Returns -1 on failure.
479 */
480int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
481{
482 struct ring_buffer_per_cpu *cpu_buffer;
483 unsigned nr_pages, rm_pages, new_pages;
484 struct buffer_page *page, *tmp;
485 unsigned long buffer_size;
486 unsigned long addr;
487 LIST_HEAD(pages);
488 int i, cpu;
489
490 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
491 size *= BUF_PAGE_SIZE;
492 buffer_size = buffer->pages * BUF_PAGE_SIZE;
493
494 /* we need a minimum of two pages */
495 if (size < BUF_PAGE_SIZE * 2)
496 size = BUF_PAGE_SIZE * 2;
497
498 if (size == buffer_size)
499 return size;
500
501 mutex_lock(&buffer->mutex);
502
503 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
504
505 if (size < buffer_size) {
506
507 /* easy case, just free pages */
508 BUG_ON(nr_pages >= buffer->pages);
509
510 rm_pages = buffer->pages - nr_pages;
511
512 for_each_buffer_cpu(buffer, cpu) {
513 cpu_buffer = buffer->buffers[cpu];
514 rb_remove_pages(cpu_buffer, rm_pages);
515 }
516 goto out;
517 }
518
519 /*
520 * This is a bit more difficult. We only want to add pages
521 * when we can allocate enough for all CPUs. We do this
522 * by allocating all the pages and storing them on a local
523 * link list. If we succeed in our allocation, then we
524 * add these pages to the cpu_buffers. Otherwise we just free
525 * them all and return -ENOMEM;
526 */
527 BUG_ON(nr_pages <= buffer->pages);
528 new_pages = nr_pages - buffer->pages;
529
530 for_each_buffer_cpu(buffer, cpu) {
531 for (i = 0; i < new_pages; i++) {
e4c2ce82
SR
532 page = kzalloc_node(ALIGN(sizeof(*page),
533 cache_line_size()),
534 GFP_KERNEL, cpu_to_node(cpu));
535 if (!page)
536 goto free_pages;
537 list_add(&page->list, &pages);
7a8e76a3
SR
538 addr = __get_free_page(GFP_KERNEL);
539 if (!addr)
540 goto free_pages;
e4c2ce82 541 page->page = (void *)addr;
7a8e76a3
SR
542 }
543 }
544
545 for_each_buffer_cpu(buffer, cpu) {
546 cpu_buffer = buffer->buffers[cpu];
547 rb_insert_pages(cpu_buffer, &pages, new_pages);
548 }
549
550 BUG_ON(!list_empty(&pages));
551
552 out:
553 buffer->pages = nr_pages;
554 mutex_unlock(&buffer->mutex);
555
556 return size;
557
558 free_pages:
559 list_for_each_entry_safe(page, tmp, &pages, list) {
560 list_del_init(&page->list);
ed56829c 561 free_buffer_page(page);
7a8e76a3
SR
562 }
563 return -ENOMEM;
564}
565
566static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
567{
6f807acd 568 return cpu_buffer->reader_page->read == cpu_buffer->reader_page->size &&
d769041f
SR
569 (cpu_buffer->tail_page == cpu_buffer->reader_page ||
570 (cpu_buffer->tail_page == cpu_buffer->head_page &&
6f807acd
SR
571 cpu_buffer->head_page->read ==
572 cpu_buffer->tail_page->write));
7a8e76a3
SR
573}
574
575static inline int rb_null_event(struct ring_buffer_event *event)
576{
577 return event->type == RINGBUF_TYPE_PADDING;
578}
579
6f807acd 580static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
7a8e76a3 581{
e4c2ce82 582 return page->page + index;
7a8e76a3
SR
583}
584
585static inline struct ring_buffer_event *
d769041f 586rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 587{
6f807acd
SR
588 return __rb_page_index(cpu_buffer->reader_page,
589 cpu_buffer->reader_page->read);
590}
591
592static inline struct ring_buffer_event *
593rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
594{
595 return __rb_page_index(cpu_buffer->head_page,
596 cpu_buffer->head_page->read);
7a8e76a3
SR
597}
598
599static inline struct ring_buffer_event *
600rb_iter_head_event(struct ring_buffer_iter *iter)
601{
6f807acd 602 return __rb_page_index(iter->head_page, iter->head);
7a8e76a3
SR
603}
604
605/*
606 * When the tail hits the head and the buffer is in overwrite mode,
607 * the head jumps to the next page and all content on the previous
608 * page is discarded. But before doing so, we update the overrun
609 * variable of the buffer.
610 */
611static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
612{
613 struct ring_buffer_event *event;
614 unsigned long head;
615
616 for (head = 0; head < rb_head_size(cpu_buffer);
617 head += rb_event_length(event)) {
618
6f807acd 619 event = __rb_page_index(cpu_buffer->head_page, head);
7a8e76a3
SR
620 BUG_ON(rb_null_event(event));
621 /* Only count data entries */
622 if (event->type != RINGBUF_TYPE_DATA)
623 continue;
624 cpu_buffer->overrun++;
625 cpu_buffer->entries--;
626 }
627}
628
629static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
630 struct buffer_page **page)
631{
632 struct list_head *p = (*page)->list.next;
633
634 if (p == &cpu_buffer->pages)
635 p = p->next;
636
637 *page = list_entry(p, struct buffer_page, list);
638}
639
640static inline void
641rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
642{
643 cpu_buffer->tail_page->time_stamp = *ts;
644 cpu_buffer->write_stamp = *ts;
645}
646
d769041f 647static void rb_reset_head_page(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 648{
6f807acd 649 cpu_buffer->head_page->read = 0;
7a8e76a3
SR
650}
651
d769041f 652static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 653{
d769041f 654 cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
6f807acd 655 cpu_buffer->reader_page->read = 0;
d769041f
SR
656}
657
658static inline void rb_inc_iter(struct ring_buffer_iter *iter)
659{
660 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
661
662 /*
663 * The iterator could be on the reader page (it starts there).
664 * But the head could have moved, since the reader was
665 * found. Check for this case and assign the iterator
666 * to the head page instead of next.
667 */
668 if (iter->head_page == cpu_buffer->reader_page)
669 iter->head_page = cpu_buffer->head_page;
670 else
671 rb_inc_page(cpu_buffer, &iter->head_page);
672
7a8e76a3
SR
673 iter->read_stamp = iter->head_page->time_stamp;
674 iter->head = 0;
675}
676
677/**
678 * ring_buffer_update_event - update event type and data
679 * @event: the even to update
680 * @type: the type of event
681 * @length: the size of the event field in the ring buffer
682 *
683 * Update the type and data fields of the event. The length
684 * is the actual size that is written to the ring buffer,
685 * and with this, we can determine what to place into the
686 * data field.
687 */
688static inline void
689rb_update_event(struct ring_buffer_event *event,
690 unsigned type, unsigned length)
691{
692 event->type = type;
693
694 switch (type) {
695
696 case RINGBUF_TYPE_PADDING:
697 break;
698
699 case RINGBUF_TYPE_TIME_EXTEND:
700 event->len =
701 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
702 >> RB_ALIGNMENT_SHIFT;
703 break;
704
705 case RINGBUF_TYPE_TIME_STAMP:
706 event->len =
707 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
708 >> RB_ALIGNMENT_SHIFT;
709 break;
710
711 case RINGBUF_TYPE_DATA:
712 length -= RB_EVNT_HDR_SIZE;
713 if (length > RB_MAX_SMALL_DATA) {
714 event->len = 0;
715 event->array[0] = length;
716 } else
717 event->len =
718 (length + (RB_ALIGNMENT-1))
719 >> RB_ALIGNMENT_SHIFT;
720 break;
721 default:
722 BUG();
723 }
724}
725
726static inline unsigned rb_calculate_event_length(unsigned length)
727{
728 struct ring_buffer_event event; /* Used only for sizeof array */
729
730 /* zero length can cause confusions */
731 if (!length)
732 length = 1;
733
734 if (length > RB_MAX_SMALL_DATA)
735 length += sizeof(event.array[0]);
736
737 length += RB_EVNT_HDR_SIZE;
738 length = ALIGN(length, RB_ALIGNMENT);
739
740 return length;
741}
742
743static struct ring_buffer_event *
744__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
745 unsigned type, unsigned long length, u64 *ts)
746{
d769041f 747 struct buffer_page *tail_page, *head_page, *reader_page;
7a8e76a3
SR
748 unsigned long tail;
749 struct ring_buffer *buffer = cpu_buffer->buffer;
750 struct ring_buffer_event *event;
751
752 tail_page = cpu_buffer->tail_page;
6f807acd 753 tail = cpu_buffer->tail_page->write;
7a8e76a3
SR
754
755 if (tail + length > BUF_PAGE_SIZE) {
756 struct buffer_page *next_page = tail_page;
757
d769041f 758 spin_lock(&cpu_buffer->lock);
7a8e76a3
SR
759 rb_inc_page(cpu_buffer, &next_page);
760
d769041f
SR
761 head_page = cpu_buffer->head_page;
762 reader_page = cpu_buffer->reader_page;
763
764 /* we grabbed the lock before incrementing */
765 WARN_ON(next_page == reader_page);
766
7a8e76a3 767 if (next_page == head_page) {
d769041f
SR
768 if (!(buffer->flags & RB_FL_OVERWRITE)) {
769 spin_unlock(&cpu_buffer->lock);
7a8e76a3 770 return NULL;
d769041f 771 }
7a8e76a3
SR
772
773 /* count overflows */
774 rb_update_overflow(cpu_buffer);
775
776 rb_inc_page(cpu_buffer, &head_page);
777 cpu_buffer->head_page = head_page;
d769041f 778 rb_reset_head_page(cpu_buffer);
7a8e76a3
SR
779 }
780
781 if (tail != BUF_PAGE_SIZE) {
6f807acd 782 event = __rb_page_index(tail_page, tail);
7a8e76a3
SR
783 /* page padding */
784 event->type = RINGBUF_TYPE_PADDING;
785 }
786
787 tail_page->size = tail;
788 tail_page = next_page;
789 tail_page->size = 0;
790 tail = 0;
791 cpu_buffer->tail_page = tail_page;
6f807acd 792 cpu_buffer->tail_page->write = tail;
7a8e76a3 793 rb_add_stamp(cpu_buffer, ts);
d769041f 794 spin_unlock(&cpu_buffer->lock);
7a8e76a3
SR
795 }
796
797 BUG_ON(tail + length > BUF_PAGE_SIZE);
798
6f807acd 799 event = __rb_page_index(tail_page, tail);
7a8e76a3
SR
800 rb_update_event(event, type, length);
801
802 return event;
803}
804
805static int
806rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
807 u64 *ts, u64 *delta)
808{
809 struct ring_buffer_event *event;
810 static int once;
811
812 if (unlikely(*delta > (1ULL << 59) && !once++)) {
813 printk(KERN_WARNING "Delta way too big! %llu"
814 " ts=%llu write stamp = %llu\n",
815 *delta, *ts, cpu_buffer->write_stamp);
816 WARN_ON(1);
817 }
818
819 /*
820 * The delta is too big, we to add a
821 * new timestamp.
822 */
823 event = __rb_reserve_next(cpu_buffer,
824 RINGBUF_TYPE_TIME_EXTEND,
825 RB_LEN_TIME_EXTEND,
826 ts);
827 if (!event)
828 return -1;
829
830 /* check to see if we went to the next page */
6f807acd 831 if (cpu_buffer->tail_page->write) {
7a8e76a3
SR
832 /* Still on same page, update timestamp */
833 event->time_delta = *delta & TS_MASK;
834 event->array[0] = *delta >> TS_SHIFT;
835 /* commit the time event */
6f807acd 836 cpu_buffer->tail_page->write +=
7a8e76a3
SR
837 rb_event_length(event);
838 cpu_buffer->write_stamp = *ts;
839 *delta = 0;
840 }
841
842 return 0;
843}
844
845static struct ring_buffer_event *
846rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
847 unsigned type, unsigned long length)
848{
849 struct ring_buffer_event *event;
850 u64 ts, delta;
851
852 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
853
6f807acd 854 if (cpu_buffer->tail_page->write) {
7a8e76a3
SR
855 delta = ts - cpu_buffer->write_stamp;
856
857 if (test_time_stamp(delta)) {
858 int ret;
859
860 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
861 if (ret < 0)
862 return NULL;
863 }
864 } else {
d769041f 865 spin_lock(&cpu_buffer->lock);
7a8e76a3 866 rb_add_stamp(cpu_buffer, &ts);
d769041f 867 spin_unlock(&cpu_buffer->lock);
7a8e76a3
SR
868 delta = 0;
869 }
870
871 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
872 if (!event)
873 return NULL;
874
875 /* If the reserve went to the next page, our delta is zero */
6f807acd 876 if (!cpu_buffer->tail_page->write)
7a8e76a3
SR
877 delta = 0;
878
879 event->time_delta = delta;
880
881 return event;
882}
883
884/**
885 * ring_buffer_lock_reserve - reserve a part of the buffer
886 * @buffer: the ring buffer to reserve from
887 * @length: the length of the data to reserve (excluding event header)
888 * @flags: a pointer to save the interrupt flags
889 *
890 * Returns a reseverd event on the ring buffer to copy directly to.
891 * The user of this interface will need to get the body to write into
892 * and can use the ring_buffer_event_data() interface.
893 *
894 * The length is the length of the data needed, not the event length
895 * which also includes the event header.
896 *
897 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
898 * If NULL is returned, then nothing has been allocated or locked.
899 */
900struct ring_buffer_event *
901ring_buffer_lock_reserve(struct ring_buffer *buffer,
902 unsigned long length,
903 unsigned long *flags)
904{
905 struct ring_buffer_per_cpu *cpu_buffer;
906 struct ring_buffer_event *event;
907 int cpu;
908
909 if (atomic_read(&buffer->record_disabled))
910 return NULL;
911
70255b5e 912 local_irq_save(*flags);
7a8e76a3
SR
913 cpu = raw_smp_processor_id();
914
915 if (!cpu_isset(cpu, buffer->cpumask))
d769041f 916 goto out;
7a8e76a3
SR
917
918 cpu_buffer = buffer->buffers[cpu];
7a8e76a3
SR
919
920 if (atomic_read(&cpu_buffer->record_disabled))
d769041f 921 goto out;
7a8e76a3
SR
922
923 length = rb_calculate_event_length(length);
924 if (length > BUF_PAGE_SIZE)
925 return NULL;
926
927 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
928 if (!event)
d769041f 929 goto out;
7a8e76a3
SR
930
931 return event;
932
d769041f 933 out:
7a8e76a3
SR
934 local_irq_restore(*flags);
935 return NULL;
936}
937
938static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
939 struct ring_buffer_event *event)
940{
6f807acd
SR
941 cpu_buffer->tail_page->write += rb_event_length(event);
942 cpu_buffer->tail_page->size = cpu_buffer->tail_page->write;
7a8e76a3
SR
943 cpu_buffer->write_stamp += event->time_delta;
944 cpu_buffer->entries++;
945}
946
947/**
948 * ring_buffer_unlock_commit - commit a reserved
949 * @buffer: The buffer to commit to
950 * @event: The event pointer to commit.
951 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
952 *
953 * This commits the data to the ring buffer, and releases any locks held.
954 *
955 * Must be paired with ring_buffer_lock_reserve.
956 */
957int ring_buffer_unlock_commit(struct ring_buffer *buffer,
958 struct ring_buffer_event *event,
959 unsigned long flags)
960{
961 struct ring_buffer_per_cpu *cpu_buffer;
962 int cpu = raw_smp_processor_id();
963
964 cpu_buffer = buffer->buffers[cpu];
965
7a8e76a3
SR
966 rb_commit(cpu_buffer, event);
967
70255b5e 968 local_irq_restore(flags);
7a8e76a3
SR
969
970 return 0;
971}
972
973/**
974 * ring_buffer_write - write data to the buffer without reserving
975 * @buffer: The ring buffer to write to.
976 * @length: The length of the data being written (excluding the event header)
977 * @data: The data to write to the buffer.
978 *
979 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
980 * one function. If you already have the data to write to the buffer, it
981 * may be easier to simply call this function.
982 *
983 * Note, like ring_buffer_lock_reserve, the length is the length of the data
984 * and not the length of the event which would hold the header.
985 */
986int ring_buffer_write(struct ring_buffer *buffer,
987 unsigned long length,
988 void *data)
989{
990 struct ring_buffer_per_cpu *cpu_buffer;
991 struct ring_buffer_event *event;
992 unsigned long event_length, flags;
993 void *body;
994 int ret = -EBUSY;
995 int cpu;
996
997 if (atomic_read(&buffer->record_disabled))
998 return -EBUSY;
999
1000 local_irq_save(flags);
1001 cpu = raw_smp_processor_id();
1002
1003 if (!cpu_isset(cpu, buffer->cpumask))
d769041f 1004 goto out;
7a8e76a3
SR
1005
1006 cpu_buffer = buffer->buffers[cpu];
7a8e76a3
SR
1007
1008 if (atomic_read(&cpu_buffer->record_disabled))
1009 goto out;
1010
1011 event_length = rb_calculate_event_length(length);
1012 event = rb_reserve_next_event(cpu_buffer,
1013 RINGBUF_TYPE_DATA, event_length);
1014 if (!event)
1015 goto out;
1016
1017 body = rb_event_data(event);
1018
1019 memcpy(body, data, length);
1020
1021 rb_commit(cpu_buffer, event);
1022
1023 ret = 0;
1024 out:
7a8e76a3
SR
1025 local_irq_restore(flags);
1026
1027 return ret;
1028}
1029
7a8e76a3
SR
1030/**
1031 * ring_buffer_record_disable - stop all writes into the buffer
1032 * @buffer: The ring buffer to stop writes to.
1033 *
1034 * This prevents all writes to the buffer. Any attempt to write
1035 * to the buffer after this will fail and return NULL.
1036 *
1037 * The caller should call synchronize_sched() after this.
1038 */
1039void ring_buffer_record_disable(struct ring_buffer *buffer)
1040{
1041 atomic_inc(&buffer->record_disabled);
1042}
1043
1044/**
1045 * ring_buffer_record_enable - enable writes to the buffer
1046 * @buffer: The ring buffer to enable writes
1047 *
1048 * Note, multiple disables will need the same number of enables
1049 * to truely enable the writing (much like preempt_disable).
1050 */
1051void ring_buffer_record_enable(struct ring_buffer *buffer)
1052{
1053 atomic_dec(&buffer->record_disabled);
1054}
1055
1056/**
1057 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1058 * @buffer: The ring buffer to stop writes to.
1059 * @cpu: The CPU buffer to stop
1060 *
1061 * This prevents all writes to the buffer. Any attempt to write
1062 * to the buffer after this will fail and return NULL.
1063 *
1064 * The caller should call synchronize_sched() after this.
1065 */
1066void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1067{
1068 struct ring_buffer_per_cpu *cpu_buffer;
1069
1070 if (!cpu_isset(cpu, buffer->cpumask))
1071 return;
1072
1073 cpu_buffer = buffer->buffers[cpu];
1074 atomic_inc(&cpu_buffer->record_disabled);
1075}
1076
1077/**
1078 * ring_buffer_record_enable_cpu - enable writes to the buffer
1079 * @buffer: The ring buffer to enable writes
1080 * @cpu: The CPU to enable.
1081 *
1082 * Note, multiple disables will need the same number of enables
1083 * to truely enable the writing (much like preempt_disable).
1084 */
1085void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1086{
1087 struct ring_buffer_per_cpu *cpu_buffer;
1088
1089 if (!cpu_isset(cpu, buffer->cpumask))
1090 return;
1091
1092 cpu_buffer = buffer->buffers[cpu];
1093 atomic_dec(&cpu_buffer->record_disabled);
1094}
1095
1096/**
1097 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1098 * @buffer: The ring buffer
1099 * @cpu: The per CPU buffer to get the entries from.
1100 */
1101unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1102{
1103 struct ring_buffer_per_cpu *cpu_buffer;
1104
1105 if (!cpu_isset(cpu, buffer->cpumask))
1106 return 0;
1107
1108 cpu_buffer = buffer->buffers[cpu];
1109 return cpu_buffer->entries;
1110}
1111
1112/**
1113 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1114 * @buffer: The ring buffer
1115 * @cpu: The per CPU buffer to get the number of overruns from
1116 */
1117unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1118{
1119 struct ring_buffer_per_cpu *cpu_buffer;
1120
1121 if (!cpu_isset(cpu, buffer->cpumask))
1122 return 0;
1123
1124 cpu_buffer = buffer->buffers[cpu];
1125 return cpu_buffer->overrun;
1126}
1127
1128/**
1129 * ring_buffer_entries - get the number of entries in a buffer
1130 * @buffer: The ring buffer
1131 *
1132 * Returns the total number of entries in the ring buffer
1133 * (all CPU entries)
1134 */
1135unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1136{
1137 struct ring_buffer_per_cpu *cpu_buffer;
1138 unsigned long entries = 0;
1139 int cpu;
1140
1141 /* if you care about this being correct, lock the buffer */
1142 for_each_buffer_cpu(buffer, cpu) {
1143 cpu_buffer = buffer->buffers[cpu];
1144 entries += cpu_buffer->entries;
1145 }
1146
1147 return entries;
1148}
1149
1150/**
1151 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1152 * @buffer: The ring buffer
1153 *
1154 * Returns the total number of overruns in the ring buffer
1155 * (all CPU entries)
1156 */
1157unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1158{
1159 struct ring_buffer_per_cpu *cpu_buffer;
1160 unsigned long overruns = 0;
1161 int cpu;
1162
1163 /* if you care about this being correct, lock the buffer */
1164 for_each_buffer_cpu(buffer, cpu) {
1165 cpu_buffer = buffer->buffers[cpu];
1166 overruns += cpu_buffer->overrun;
1167 }
1168
1169 return overruns;
1170}
1171
1172/**
1173 * ring_buffer_iter_reset - reset an iterator
1174 * @iter: The iterator to reset
1175 *
1176 * Resets the iterator, so that it will start from the beginning
1177 * again.
1178 */
1179void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1180{
1181 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1182
d769041f
SR
1183 /* Iterator usage is expected to have record disabled */
1184 if (list_empty(&cpu_buffer->reader_page->list)) {
1185 iter->head_page = cpu_buffer->head_page;
6f807acd 1186 iter->head = cpu_buffer->head_page->read;
d769041f
SR
1187 } else {
1188 iter->head_page = cpu_buffer->reader_page;
6f807acd 1189 iter->head = cpu_buffer->reader_page->read;
d769041f
SR
1190 }
1191 if (iter->head)
1192 iter->read_stamp = cpu_buffer->read_stamp;
1193 else
1194 iter->read_stamp = iter->head_page->time_stamp;
7a8e76a3
SR
1195}
1196
1197/**
1198 * ring_buffer_iter_empty - check if an iterator has no more to read
1199 * @iter: The iterator to check
1200 */
1201int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1202{
1203 struct ring_buffer_per_cpu *cpu_buffer;
1204
1205 cpu_buffer = iter->cpu_buffer;
1206
1207 return iter->head_page == cpu_buffer->tail_page &&
6f807acd 1208 iter->head == cpu_buffer->tail_page->write;
7a8e76a3
SR
1209}
1210
1211static void
1212rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1213 struct ring_buffer_event *event)
1214{
1215 u64 delta;
1216
1217 switch (event->type) {
1218 case RINGBUF_TYPE_PADDING:
1219 return;
1220
1221 case RINGBUF_TYPE_TIME_EXTEND:
1222 delta = event->array[0];
1223 delta <<= TS_SHIFT;
1224 delta += event->time_delta;
1225 cpu_buffer->read_stamp += delta;
1226 return;
1227
1228 case RINGBUF_TYPE_TIME_STAMP:
1229 /* FIXME: not implemented */
1230 return;
1231
1232 case RINGBUF_TYPE_DATA:
1233 cpu_buffer->read_stamp += event->time_delta;
1234 return;
1235
1236 default:
1237 BUG();
1238 }
1239 return;
1240}
1241
1242static void
1243rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1244 struct ring_buffer_event *event)
1245{
1246 u64 delta;
1247
1248 switch (event->type) {
1249 case RINGBUF_TYPE_PADDING:
1250 return;
1251
1252 case RINGBUF_TYPE_TIME_EXTEND:
1253 delta = event->array[0];
1254 delta <<= TS_SHIFT;
1255 delta += event->time_delta;
1256 iter->read_stamp += delta;
1257 return;
1258
1259 case RINGBUF_TYPE_TIME_STAMP:
1260 /* FIXME: not implemented */
1261 return;
1262
1263 case RINGBUF_TYPE_DATA:
1264 iter->read_stamp += event->time_delta;
1265 return;
1266
1267 default:
1268 BUG();
1269 }
1270 return;
1271}
1272
d769041f
SR
1273static struct buffer_page *
1274rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
7a8e76a3 1275{
d769041f
SR
1276 struct buffer_page *reader = NULL;
1277 unsigned long flags;
1278
1279 spin_lock_irqsave(&cpu_buffer->lock, flags);
1280
1281 again:
1282 reader = cpu_buffer->reader_page;
1283
1284 /* If there's more to read, return this page */
6f807acd 1285 if (cpu_buffer->reader_page->read < reader->size)
d769041f
SR
1286 goto out;
1287
1288 /* Never should we have an index greater than the size */
6f807acd 1289 WARN_ON(cpu_buffer->reader_page->read > reader->size);
d769041f
SR
1290
1291 /* check if we caught up to the tail */
1292 reader = NULL;
1293 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1294 goto out;
7a8e76a3
SR
1295
1296 /*
d769041f
SR
1297 * Splice the empty reader page into the list around the head.
1298 * Reset the reader page to size zero.
7a8e76a3 1299 */
7a8e76a3 1300
d769041f
SR
1301 reader = cpu_buffer->head_page;
1302 cpu_buffer->reader_page->list.next = reader->list.next;
1303 cpu_buffer->reader_page->list.prev = reader->list.prev;
1304 cpu_buffer->reader_page->size = 0;
7a8e76a3 1305
d769041f
SR
1306 /* Make the reader page now replace the head */
1307 reader->list.prev->next = &cpu_buffer->reader_page->list;
1308 reader->list.next->prev = &cpu_buffer->reader_page->list;
7a8e76a3
SR
1309
1310 /*
d769041f
SR
1311 * If the tail is on the reader, then we must set the head
1312 * to the inserted page, otherwise we set it one before.
7a8e76a3 1313 */
d769041f 1314 cpu_buffer->head_page = cpu_buffer->reader_page;
7a8e76a3 1315
d769041f
SR
1316 if (cpu_buffer->tail_page != reader)
1317 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1318
1319 /* Finally update the reader page to the new head */
1320 cpu_buffer->reader_page = reader;
1321 rb_reset_reader_page(cpu_buffer);
1322
1323 goto again;
1324
1325 out:
1326 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1327
1328 return reader;
1329}
1330
1331static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1332{
1333 struct ring_buffer_event *event;
1334 struct buffer_page *reader;
1335 unsigned length;
1336
1337 reader = rb_get_reader_page(cpu_buffer);
7a8e76a3 1338
d769041f
SR
1339 /* This function should not be called when buffer is empty */
1340 BUG_ON(!reader);
7a8e76a3 1341
d769041f
SR
1342 event = rb_reader_event(cpu_buffer);
1343
1344 if (event->type == RINGBUF_TYPE_DATA)
1345 cpu_buffer->entries--;
1346
1347 rb_update_read_stamp(cpu_buffer, event);
1348
1349 length = rb_event_length(event);
6f807acd 1350 cpu_buffer->reader_page->read += length;
7a8e76a3
SR
1351}
1352
1353static void rb_advance_iter(struct ring_buffer_iter *iter)
1354{
1355 struct ring_buffer *buffer;
1356 struct ring_buffer_per_cpu *cpu_buffer;
1357 struct ring_buffer_event *event;
1358 unsigned length;
1359
1360 cpu_buffer = iter->cpu_buffer;
1361 buffer = cpu_buffer->buffer;
1362
1363 /*
1364 * Check if we are at the end of the buffer.
1365 */
1366 if (iter->head >= iter->head_page->size) {
1367 BUG_ON(iter->head_page == cpu_buffer->tail_page);
d769041f 1368 rb_inc_iter(iter);
7a8e76a3
SR
1369 return;
1370 }
1371
1372 event = rb_iter_head_event(iter);
1373
1374 length = rb_event_length(event);
1375
1376 /*
1377 * This should not be called to advance the header if we are
1378 * at the tail of the buffer.
1379 */
1380 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
6f807acd 1381 (iter->head + length > cpu_buffer->tail_page->write));
7a8e76a3
SR
1382
1383 rb_update_iter_read_stamp(iter, event);
1384
1385 iter->head += length;
1386
1387 /* check for end of page padding */
1388 if ((iter->head >= iter->head_page->size) &&
1389 (iter->head_page != cpu_buffer->tail_page))
1390 rb_advance_iter(iter);
1391}
1392
1393/**
1394 * ring_buffer_peek - peek at the next event to be read
1395 * @buffer: The ring buffer to read
1396 * @cpu: The cpu to peak at
1397 * @ts: The timestamp counter of this event.
1398 *
1399 * This will return the event that will be read next, but does
1400 * not consume the data.
1401 */
1402struct ring_buffer_event *
1403ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1404{
1405 struct ring_buffer_per_cpu *cpu_buffer;
1406 struct ring_buffer_event *event;
d769041f 1407 struct buffer_page *reader;
7a8e76a3
SR
1408
1409 if (!cpu_isset(cpu, buffer->cpumask))
1410 return NULL;
1411
1412 cpu_buffer = buffer->buffers[cpu];
1413
1414 again:
d769041f
SR
1415 reader = rb_get_reader_page(cpu_buffer);
1416 if (!reader)
7a8e76a3
SR
1417 return NULL;
1418
d769041f 1419 event = rb_reader_event(cpu_buffer);
7a8e76a3
SR
1420
1421 switch (event->type) {
1422 case RINGBUF_TYPE_PADDING:
d769041f
SR
1423 WARN_ON(1);
1424 rb_advance_reader(cpu_buffer);
1425 return NULL;
7a8e76a3
SR
1426
1427 case RINGBUF_TYPE_TIME_EXTEND:
1428 /* Internal data, OK to advance */
d769041f 1429 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
1430 goto again;
1431
1432 case RINGBUF_TYPE_TIME_STAMP:
1433 /* FIXME: not implemented */
d769041f 1434 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
1435 goto again;
1436
1437 case RINGBUF_TYPE_DATA:
1438 if (ts) {
1439 *ts = cpu_buffer->read_stamp + event->time_delta;
1440 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1441 }
1442 return event;
1443
1444 default:
1445 BUG();
1446 }
1447
1448 return NULL;
1449}
1450
1451/**
1452 * ring_buffer_iter_peek - peek at the next event to be read
1453 * @iter: The ring buffer iterator
1454 * @ts: The timestamp counter of this event.
1455 *
1456 * This will return the event that will be read next, but does
1457 * not increment the iterator.
1458 */
1459struct ring_buffer_event *
1460ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1461{
1462 struct ring_buffer *buffer;
1463 struct ring_buffer_per_cpu *cpu_buffer;
1464 struct ring_buffer_event *event;
1465
1466 if (ring_buffer_iter_empty(iter))
1467 return NULL;
1468
1469 cpu_buffer = iter->cpu_buffer;
1470 buffer = cpu_buffer->buffer;
1471
1472 again:
1473 if (rb_per_cpu_empty(cpu_buffer))
1474 return NULL;
1475
1476 event = rb_iter_head_event(iter);
1477
1478 switch (event->type) {
1479 case RINGBUF_TYPE_PADDING:
d769041f 1480 rb_inc_iter(iter);
7a8e76a3
SR
1481 goto again;
1482
1483 case RINGBUF_TYPE_TIME_EXTEND:
1484 /* Internal data, OK to advance */
1485 rb_advance_iter(iter);
1486 goto again;
1487
1488 case RINGBUF_TYPE_TIME_STAMP:
1489 /* FIXME: not implemented */
1490 rb_advance_iter(iter);
1491 goto again;
1492
1493 case RINGBUF_TYPE_DATA:
1494 if (ts) {
1495 *ts = iter->read_stamp + event->time_delta;
1496 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1497 }
1498 return event;
1499
1500 default:
1501 BUG();
1502 }
1503
1504 return NULL;
1505}
1506
1507/**
1508 * ring_buffer_consume - return an event and consume it
1509 * @buffer: The ring buffer to get the next event from
1510 *
1511 * Returns the next event in the ring buffer, and that event is consumed.
1512 * Meaning, that sequential reads will keep returning a different event,
1513 * and eventually empty the ring buffer if the producer is slower.
1514 */
1515struct ring_buffer_event *
1516ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1517{
1518 struct ring_buffer_per_cpu *cpu_buffer;
1519 struct ring_buffer_event *event;
1520
1521 if (!cpu_isset(cpu, buffer->cpumask))
1522 return NULL;
1523
1524 event = ring_buffer_peek(buffer, cpu, ts);
1525 if (!event)
1526 return NULL;
1527
1528 cpu_buffer = buffer->buffers[cpu];
d769041f 1529 rb_advance_reader(cpu_buffer);
7a8e76a3
SR
1530
1531 return event;
1532}
1533
1534/**
1535 * ring_buffer_read_start - start a non consuming read of the buffer
1536 * @buffer: The ring buffer to read from
1537 * @cpu: The cpu buffer to iterate over
1538 *
1539 * This starts up an iteration through the buffer. It also disables
1540 * the recording to the buffer until the reading is finished.
1541 * This prevents the reading from being corrupted. This is not
1542 * a consuming read, so a producer is not expected.
1543 *
1544 * Must be paired with ring_buffer_finish.
1545 */
1546struct ring_buffer_iter *
1547ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1548{
1549 struct ring_buffer_per_cpu *cpu_buffer;
1550 struct ring_buffer_iter *iter;
d769041f 1551 unsigned long flags;
7a8e76a3
SR
1552
1553 if (!cpu_isset(cpu, buffer->cpumask))
1554 return NULL;
1555
1556 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1557 if (!iter)
1558 return NULL;
1559
1560 cpu_buffer = buffer->buffers[cpu];
1561
1562 iter->cpu_buffer = cpu_buffer;
1563
1564 atomic_inc(&cpu_buffer->record_disabled);
1565 synchronize_sched();
1566
d769041f
SR
1567 spin_lock_irqsave(&cpu_buffer->lock, flags);
1568 ring_buffer_iter_reset(iter);
1569 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
7a8e76a3
SR
1570
1571 return iter;
1572}
1573
1574/**
1575 * ring_buffer_finish - finish reading the iterator of the buffer
1576 * @iter: The iterator retrieved by ring_buffer_start
1577 *
1578 * This re-enables the recording to the buffer, and frees the
1579 * iterator.
1580 */
1581void
1582ring_buffer_read_finish(struct ring_buffer_iter *iter)
1583{
1584 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1585
1586 atomic_dec(&cpu_buffer->record_disabled);
1587 kfree(iter);
1588}
1589
1590/**
1591 * ring_buffer_read - read the next item in the ring buffer by the iterator
1592 * @iter: The ring buffer iterator
1593 * @ts: The time stamp of the event read.
1594 *
1595 * This reads the next event in the ring buffer and increments the iterator.
1596 */
1597struct ring_buffer_event *
1598ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1599{
1600 struct ring_buffer_event *event;
1601
1602 event = ring_buffer_iter_peek(iter, ts);
1603 if (!event)
1604 return NULL;
1605
1606 rb_advance_iter(iter);
1607
1608 return event;
1609}
1610
1611/**
1612 * ring_buffer_size - return the size of the ring buffer (in bytes)
1613 * @buffer: The ring buffer.
1614 */
1615unsigned long ring_buffer_size(struct ring_buffer *buffer)
1616{
1617 return BUF_PAGE_SIZE * buffer->pages;
1618}
1619
1620static void
1621rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1622{
1623 cpu_buffer->head_page
1624 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
d769041f
SR
1625 cpu_buffer->head_page->size = 0;
1626 cpu_buffer->tail_page = cpu_buffer->head_page;
1627 cpu_buffer->tail_page->size = 0;
1628 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1629 cpu_buffer->reader_page->size = 0;
1630
6f807acd
SR
1631 cpu_buffer->head_page->read = 0;
1632 cpu_buffer->tail_page->write = 0;
1633 cpu_buffer->reader_page->read = 0;
7a8e76a3 1634
7a8e76a3
SR
1635 cpu_buffer->overrun = 0;
1636 cpu_buffer->entries = 0;
1637}
1638
1639/**
1640 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1641 * @buffer: The ring buffer to reset a per cpu buffer of
1642 * @cpu: The CPU buffer to be reset
1643 */
1644void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1645{
1646 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1647 unsigned long flags;
1648
1649 if (!cpu_isset(cpu, buffer->cpumask))
1650 return;
1651
d769041f 1652 spin_lock_irqsave(&cpu_buffer->lock, flags);
7a8e76a3
SR
1653
1654 rb_reset_cpu(cpu_buffer);
1655
d769041f 1656 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
7a8e76a3
SR
1657}
1658
1659/**
1660 * ring_buffer_reset - reset a ring buffer
1661 * @buffer: The ring buffer to reset all cpu buffers
1662 */
1663void ring_buffer_reset(struct ring_buffer *buffer)
1664{
7a8e76a3
SR
1665 int cpu;
1666
7a8e76a3 1667 for_each_buffer_cpu(buffer, cpu)
d769041f 1668 ring_buffer_reset_cpu(buffer, cpu);
7a8e76a3
SR
1669}
1670
1671/**
1672 * rind_buffer_empty - is the ring buffer empty?
1673 * @buffer: The ring buffer to test
1674 */
1675int ring_buffer_empty(struct ring_buffer *buffer)
1676{
1677 struct ring_buffer_per_cpu *cpu_buffer;
1678 int cpu;
1679
1680 /* yes this is racy, but if you don't like the race, lock the buffer */
1681 for_each_buffer_cpu(buffer, cpu) {
1682 cpu_buffer = buffer->buffers[cpu];
1683 if (!rb_per_cpu_empty(cpu_buffer))
1684 return 0;
1685 }
1686 return 1;
1687}
1688
1689/**
1690 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1691 * @buffer: The ring buffer
1692 * @cpu: The CPU buffer to test
1693 */
1694int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1695{
1696 struct ring_buffer_per_cpu *cpu_buffer;
1697
1698 if (!cpu_isset(cpu, buffer->cpumask))
1699 return 1;
1700
1701 cpu_buffer = buffer->buffers[cpu];
1702 return rb_per_cpu_empty(cpu_buffer);
1703}
1704
1705/**
1706 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1707 * @buffer_a: One buffer to swap with
1708 * @buffer_b: The other buffer to swap with
1709 *
1710 * This function is useful for tracers that want to take a "snapshot"
1711 * of a CPU buffer and has another back up buffer lying around.
1712 * it is expected that the tracer handles the cpu buffer not being
1713 * used at the moment.
1714 */
1715int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1716 struct ring_buffer *buffer_b, int cpu)
1717{
1718 struct ring_buffer_per_cpu *cpu_buffer_a;
1719 struct ring_buffer_per_cpu *cpu_buffer_b;
1720
1721 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1722 !cpu_isset(cpu, buffer_b->cpumask))
1723 return -EINVAL;
1724
1725 /* At least make sure the two buffers are somewhat the same */
1726 if (buffer_a->size != buffer_b->size ||
1727 buffer_a->pages != buffer_b->pages)
1728 return -EINVAL;
1729
1730 cpu_buffer_a = buffer_a->buffers[cpu];
1731 cpu_buffer_b = buffer_b->buffers[cpu];
1732
1733 /*
1734 * We can't do a synchronize_sched here because this
1735 * function can be called in atomic context.
1736 * Normally this will be called from the same CPU as cpu.
1737 * If not it's up to the caller to protect this.
1738 */
1739 atomic_inc(&cpu_buffer_a->record_disabled);
1740 atomic_inc(&cpu_buffer_b->record_disabled);
1741
1742 buffer_a->buffers[cpu] = cpu_buffer_b;
1743 buffer_b->buffers[cpu] = cpu_buffer_a;
1744
1745 cpu_buffer_b->buffer = buffer_a;
1746 cpu_buffer_a->buffer = buffer_b;
1747
1748 atomic_dec(&cpu_buffer_a->record_disabled);
1749 atomic_dec(&cpu_buffer_b->record_disabled);
1750
1751 return 0;
1752}
1753