Merge branch 'oprofile-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[sfrench/cifs-2.6.git] / kernel / trace / ring_buffer.c
index 30d57dd01a857968ef4e3234a8aec1e2bf740e56..1d601a7c4587eb075b8fe0100705a6d00ffc9172 100644 (file)
 
 #include "trace.h"
 
-/* Global flag to disable all recording to ring buffers */
-static int ring_buffers_off __read_mostly;
+/*
+ * A fast way to enable or disable all ring buffers is to
+ * call tracing_on or tracing_off. Turning off the ring buffers
+ * prevents all ring buffers from being recorded to.
+ * Turning this switch on, makes it OK to write to the
+ * ring buffer, if the ring buffer is enabled itself.
+ *
+ * There's three layers that must be on in order to write
+ * to the ring buffer.
+ *
+ * 1) This global flag must be set.
+ * 2) The ring buffer must be enabled for recording.
+ * 3) The per cpu buffer must be enabled for recording.
+ *
+ * In case of an anomaly, this global flag has a bit set that
+ * will permantly disable all ring buffers.
+ */
+
+/*
+ * Global flag to disable all recording to ring buffers
+ *  This has two bits: ON, DISABLED
+ *
+ *  ON   DISABLED
+ * ---- ----------
+ *   0      0        : ring buffers are off
+ *   1      0        : ring buffers are on
+ *   X      1        : ring buffers are permanently disabled
+ */
+
+enum {
+       RB_BUFFERS_ON_BIT       = 0,
+       RB_BUFFERS_DISABLED_BIT = 1,
+};
+
+enum {
+       RB_BUFFERS_ON           = 1 << RB_BUFFERS_ON_BIT,
+       RB_BUFFERS_DISABLED     = 1 << RB_BUFFERS_DISABLED_BIT,
+};
+
+static long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
 
 /**
  * tracing_on - enable all tracing buffers
@@ -29,7 +67,7 @@ static int ring_buffers_off __read_mostly;
  */
 void tracing_on(void)
 {
-       ring_buffers_off = 0;
+       set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
 }
 EXPORT_SYMBOL_GPL(tracing_on);
 
@@ -43,10 +81,23 @@ EXPORT_SYMBOL_GPL(tracing_on);
  */
 void tracing_off(void)
 {
-       ring_buffers_off = 1;
+       clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
 }
 EXPORT_SYMBOL_GPL(tracing_off);
 
+/**
+ * tracing_off_permanent - permanently disable ring buffers
+ *
+ * This function, once called, will disable all ring buffers
+ * permanenty.
+ */
+void tracing_off_permanent(void)
+{
+       set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
+}
+
+#include "trace.h"
+
 /* Up this if you want to test the TIME_EXTENTS and normalization */
 #define DEBUG_SHIFT 0
 
@@ -58,7 +109,7 @@ u64 ring_buffer_time_stamp(int cpu)
        preempt_disable_notrace();
        /* shift to debug/test normalization and TIME_EXTENTS */
        time = sched_clock() << DEBUG_SHIFT;
-       preempt_enable_notrace();
+       preempt_enable_no_resched_notrace();
 
        return time;
 }
@@ -150,20 +201,24 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);
 #define TS_MASK                ((1ULL << TS_SHIFT) - 1)
 #define TS_DELTA_TEST  (~TS_MASK)
 
-/*
- * This hack stolen from mm/slob.c.
- * We can store per page timing information in the page frame of the page.
- * Thanks to Peter Zijlstra for suggesting this idea.
- */
-struct buffer_page {
+struct buffer_data_page {
        u64              time_stamp;    /* page time stamp */
-       local_t          write;         /* index for next write */
        local_t          commit;        /* write commited index */
+       unsigned char    data[];        /* data of buffer page */
+};
+
+struct buffer_page {
+       local_t          write;         /* index for next write */
        unsigned         read;          /* index for next read */
        struct list_head list;          /* list of free pages */
-       void *page;                     /* Actual data page */
+       struct buffer_data_page *page;  /* Actual data page */
 };
 
+static void rb_init_page(struct buffer_data_page *bpage)
+{
+       local_set(&bpage->commit, 0);
+}
+
 /*
  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
  * this issue out.
@@ -185,7 +240,7 @@ static inline int test_time_stamp(u64 delta)
        return 0;
 }
 
-#define BUF_PAGE_SIZE PAGE_SIZE
+#define BUF_PAGE_SIZE (PAGE_SIZE - sizeof(struct buffer_data_page))
 
 /*
  * head_page == tail_page && head == tail then buffer is empty.
@@ -193,7 +248,8 @@ static inline int test_time_stamp(u64 delta)
 struct ring_buffer_per_cpu {
        int                             cpu;
        struct ring_buffer              *buffer;
-       spinlock_t                      lock;
+       spinlock_t                      reader_lock; /* serialize readers */
+       raw_spinlock_t                  lock;
        struct lock_class_key           lock_key;
        struct list_head                pages;
        struct buffer_page              *head_page;     /* read from head */
@@ -208,7 +264,6 @@ struct ring_buffer_per_cpu {
 };
 
 struct ring_buffer {
-       unsigned long                   size;
        unsigned                        pages;
        unsigned                        flags;
        int                             cpus;
@@ -227,32 +282,16 @@ struct ring_buffer_iter {
        u64                             read_stamp;
 };
 
+/* buffer may be either ring_buffer or ring_buffer_per_cpu */
 #define RB_WARN_ON(buffer, cond)                               \
-       do {                                                    \
-               if (unlikely(cond)) {                           \
-                       atomic_inc(&buffer->record_disabled);   \
-                       WARN_ON(1);                             \
-               }                                               \
-       } while (0)
-
-#define RB_WARN_ON_RET(buffer, cond)                           \
-       do {                                                    \
-               if (unlikely(cond)) {                           \
-                       atomic_inc(&buffer->record_disabled);   \
-                       WARN_ON(1);                             \
-                       return -1;                              \
-               }                                               \
-       } while (0)
-
-#define RB_WARN_ON_ONCE(buffer, cond)                          \
-       do {                                                    \
-               static int once;                                \
-               if (unlikely(cond) && !once) {                  \
-                       once++;                                 \
+       ({                                                      \
+               int _____ret = unlikely(cond);                  \
+               if (_____ret) {                                 \
                        atomic_inc(&buffer->record_disabled);   \
                        WARN_ON(1);                             \
                }                                               \
-       } while (0)
+               _____ret;                                       \
+       })
 
 /**
  * check_pages - integrity check of buffer pages
@@ -264,16 +303,20 @@ struct ring_buffer_iter {
 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
 {
        struct list_head *head = &cpu_buffer->pages;
-       struct buffer_page *page, *tmp;
+       struct buffer_page *bpage, *tmp;
 
-       RB_WARN_ON_RET(cpu_buffer, head->next->prev != head);
-       RB_WARN_ON_RET(cpu_buffer, head->prev->next != head);
+       if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
+               return -1;
+       if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
+               return -1;
 
-       list_for_each_entry_safe(page, tmp, head, list) {
-               RB_WARN_ON_RET(cpu_buffer,
-                              page->list.next->prev != &page->list);
-               RB_WARN_ON_RET(cpu_buffer,
-                              page->list.prev->next != &page->list);
+       list_for_each_entry_safe(bpage, tmp, head, list) {
+               if (RB_WARN_ON(cpu_buffer,
+                              bpage->list.next->prev != &bpage->list))
+                       return -1;
+               if (RB_WARN_ON(cpu_buffer,
+                              bpage->list.prev->next != &bpage->list))
+                       return -1;
        }
 
        return 0;
@@ -283,22 +326,23 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
                             unsigned nr_pages)
 {
        struct list_head *head = &cpu_buffer->pages;
-       struct buffer_page *page, *tmp;
+       struct buffer_page *bpage, *tmp;
        unsigned long addr;
        LIST_HEAD(pages);
        unsigned i;
 
        for (i = 0; i < nr_pages; i++) {
-               page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
+               bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                                    GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
-               if (!page)
+               if (!bpage)
                        goto free_pages;
-               list_add(&page->list, &pages);
+               list_add(&bpage->list, &pages);
 
                addr = __get_free_page(GFP_KERNEL);
                if (!addr)
                        goto free_pages;
-               page->page = (void *)addr;
+               bpage->page = (void *)addr;
+               rb_init_page(bpage->page);
        }
 
        list_splice(&pages, head);
@@ -308,9 +352,9 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
        return 0;
 
  free_pages:
-       list_for_each_entry_safe(page, tmp, &pages, list) {
-               list_del_init(&page->list);
-               free_buffer_page(page);
+       list_for_each_entry_safe(bpage, tmp, &pages, list) {
+               list_del_init(&bpage->list);
+               free_buffer_page(bpage);
        }
        return -ENOMEM;
 }
@@ -319,7 +363,7 @@ static struct ring_buffer_per_cpu *
 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
-       struct buffer_page *page;
+       struct buffer_page *bpage;
        unsigned long addr;
        int ret;
 
@@ -330,19 +374,21 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
 
        cpu_buffer->cpu = cpu;
        cpu_buffer->buffer = buffer;
-       spin_lock_init(&cpu_buffer->lock);
+       spin_lock_init(&cpu_buffer->reader_lock);
+       cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
        INIT_LIST_HEAD(&cpu_buffer->pages);
 
-       page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
+       bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                            GFP_KERNEL, cpu_to_node(cpu));
-       if (!page)
+       if (!bpage)
                goto fail_free_buffer;
 
-       cpu_buffer->reader_page = page;
+       cpu_buffer->reader_page = bpage;
        addr = __get_free_page(GFP_KERNEL);
        if (!addr)
                goto fail_free_reader;
-       page->page = (void *)addr;
+       bpage->page = (void *)addr;
+       rb_init_page(bpage->page);
 
        INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
 
@@ -367,14 +413,14 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
 {
        struct list_head *head = &cpu_buffer->pages;
-       struct buffer_page *page, *tmp;
+       struct buffer_page *bpage, *tmp;
 
        list_del_init(&cpu_buffer->reader_page->list);
        free_buffer_page(cpu_buffer->reader_page);
 
-       list_for_each_entry_safe(page, tmp, head, list) {
-               list_del_init(&page->list);
-               free_buffer_page(page);
+       list_for_each_entry_safe(bpage, tmp, head, list) {
+               list_del_init(&bpage->list);
+               free_buffer_page(bpage);
        }
        kfree(cpu_buffer);
 }
@@ -473,7 +519,7 @@ static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
 static void
 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
 {
-       struct buffer_page *page;
+       struct buffer_page *bpage;
        struct list_head *p;
        unsigned i;
 
@@ -481,13 +527,15 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
        synchronize_sched();
 
        for (i = 0; i < nr_pages; i++) {
-               BUG_ON(list_empty(&cpu_buffer->pages));
+               if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
+                       return;
                p = cpu_buffer->pages.next;
-               page = list_entry(p, struct buffer_page, list);
-               list_del_init(&page->list);
-               free_buffer_page(page);
+               bpage = list_entry(p, struct buffer_page, list);
+               list_del_init(&bpage->list);
+               free_buffer_page(bpage);
        }
-       BUG_ON(list_empty(&cpu_buffer->pages));
+       if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
+               return;
 
        rb_reset_cpu(cpu_buffer);
 
@@ -501,7 +549,7 @@ static void
 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
                struct list_head *pages, unsigned nr_pages)
 {
-       struct buffer_page *page;
+       struct buffer_page *bpage;
        struct list_head *p;
        unsigned i;
 
@@ -509,11 +557,12 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
        synchronize_sched();
 
        for (i = 0; i < nr_pages; i++) {
-               BUG_ON(list_empty(pages));
+               if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
+                       return;
                p = pages->next;
-               page = list_entry(p, struct buffer_page, list);
-               list_del_init(&page->list);
-               list_add_tail(&page->list, &cpu_buffer->pages);
+               bpage = list_entry(p, struct buffer_page, list);
+               list_del_init(&bpage->list);
+               list_add_tail(&bpage->list, &cpu_buffer->pages);
        }
        rb_reset_cpu(cpu_buffer);
 
@@ -540,7 +589,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        unsigned nr_pages, rm_pages, new_pages;
-       struct buffer_page *page, *tmp;
+       struct buffer_page *bpage, *tmp;
        unsigned long buffer_size;
        unsigned long addr;
        LIST_HEAD(pages);
@@ -570,7 +619,10 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        if (size < buffer_size) {
 
                /* easy case, just free pages */
-               BUG_ON(nr_pages >= buffer->pages);
+               if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) {
+                       mutex_unlock(&buffer->mutex);
+                       return -1;
+               }
 
                rm_pages = buffer->pages - nr_pages;
 
@@ -589,21 +641,26 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
         * add these pages to the cpu_buffers. Otherwise we just free
         * them all and return -ENOMEM;
         */
-       BUG_ON(nr_pages <= buffer->pages);
+       if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) {
+               mutex_unlock(&buffer->mutex);
+               return -1;
+       }
+
        new_pages = nr_pages - buffer->pages;
 
        for_each_buffer_cpu(buffer, cpu) {
                for (i = 0; i < new_pages; i++) {
-                       page = kzalloc_node(ALIGN(sizeof(*page),
+                       bpage = kzalloc_node(ALIGN(sizeof(*bpage),
                                                  cache_line_size()),
                                            GFP_KERNEL, cpu_to_node(cpu));
-                       if (!page)
+                       if (!bpage)
                                goto free_pages;
-                       list_add(&page->list, &pages);
+                       list_add(&bpage->list, &pages);
                        addr = __get_free_page(GFP_KERNEL);
                        if (!addr)
                                goto free_pages;
-                       page->page = (void *)addr;
+                       bpage->page = (void *)addr;
+                       rb_init_page(bpage->page);
                }
        }
 
@@ -612,7 +669,10 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
                rb_insert_pages(cpu_buffer, &pages, new_pages);
        }
 
-       BUG_ON(!list_empty(&pages));
+       if (RB_WARN_ON(buffer, !list_empty(&pages))) {
+               mutex_unlock(&buffer->mutex);
+               return -1;
+       }
 
  out:
        buffer->pages = nr_pages;
@@ -621,9 +681,9 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        return size;
 
  free_pages:
-       list_for_each_entry_safe(page, tmp, &pages, list) {
-               list_del_init(&page->list);
-               free_buffer_page(page);
+       list_for_each_entry_safe(bpage, tmp, &pages, list) {
+               list_del_init(&bpage->list);
+               free_buffer_page(bpage);
        }
        mutex_unlock(&buffer->mutex);
        return -ENOMEM;
@@ -635,9 +695,15 @@ static inline int rb_null_event(struct ring_buffer_event *event)
        return event->type == RINGBUF_TYPE_PADDING;
 }
 
-static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
+static inline void *
+__rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
+{
+       return bpage->data + index;
+}
+
+static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
 {
-       return page->page + index;
+       return bpage->page->data + index;
 }
 
 static inline struct ring_buffer_event *
@@ -667,7 +733,7 @@ static inline unsigned rb_page_write(struct buffer_page *bpage)
 
 static inline unsigned rb_page_commit(struct buffer_page *bpage)
 {
-       return local_read(&bpage->commit);
+       return local_read(&bpage->page->commit);
 }
 
 /* Size is determined by what has been commited */
@@ -702,7 +768,8 @@ static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
             head += rb_event_length(event)) {
 
                event = __rb_page_index(cpu_buffer->head_page, head);
-               BUG_ON(rb_null_event(event));
+               if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
+                       return;
                /* Only count data entries */
                if (event->type != RINGBUF_TYPE_DATA)
                        continue;
@@ -712,14 +779,14 @@ static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
 }
 
 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
-                              struct buffer_page **page)
+                              struct buffer_page **bpage)
 {
-       struct list_head *p = (*page)->list.next;
+       struct list_head *p = (*bpage)->list.next;
 
        if (p == &cpu_buffer->pages)
                p = p->next;
 
-       *page = list_entry(p, struct buffer_page, list);
+       *bpage = list_entry(p, struct buffer_page, list);
 }
 
 static inline unsigned
@@ -755,16 +822,18 @@ rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
        addr &= PAGE_MASK;
 
        while (cpu_buffer->commit_page->page != (void *)addr) {
-               RB_WARN_ON(cpu_buffer,
-                          cpu_buffer->commit_page == cpu_buffer->tail_page);
-               cpu_buffer->commit_page->commit =
+               if (RB_WARN_ON(cpu_buffer,
+                         cpu_buffer->commit_page == cpu_buffer->tail_page))
+                       return;
+               cpu_buffer->commit_page->page->commit =
                        cpu_buffer->commit_page->write;
                rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
-               cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
+               cpu_buffer->write_stamp =
+                       cpu_buffer->commit_page->page->time_stamp;
        }
 
        /* Now set the commit to the event's index */
-       local_set(&cpu_buffer->commit_page->commit, index);
+       local_set(&cpu_buffer->commit_page->page->commit, index);
 }
 
 static inline void
@@ -778,25 +847,38 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
         * back to us). This allows us to do a simple loop to
         * assign the commit to the tail.
         */
+ again:
        while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
-               cpu_buffer->commit_page->commit =
+               cpu_buffer->commit_page->page->commit =
                        cpu_buffer->commit_page->write;
                rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
-               cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
+               cpu_buffer->write_stamp =
+                       cpu_buffer->commit_page->page->time_stamp;
                /* add barrier to keep gcc from optimizing too much */
                barrier();
        }
        while (rb_commit_index(cpu_buffer) !=
               rb_page_write(cpu_buffer->commit_page)) {
-               cpu_buffer->commit_page->commit =
+               cpu_buffer->commit_page->page->commit =
                        cpu_buffer->commit_page->write;
                barrier();
        }
+
+       /* again, keep gcc from optimizing */
+       barrier();
+
+       /*
+        * If an interrupt came in just after the first while loop
+        * and pushed the tail page forward, we will be left with
+        * a dangling commit that will never go forward.
+        */
+       if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
+               goto again;
 }
 
 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 {
-       cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
+       cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
        cpu_buffer->reader_page->read = 0;
 }
 
@@ -815,7 +897,7 @@ static inline void rb_inc_iter(struct ring_buffer_iter *iter)
        else
                rb_inc_page(cpu_buffer, &iter->head_page);
 
-       iter->read_stamp = iter->head_page->time_stamp;
+       iter->read_stamp = iter->head_page->page->time_stamp;
        iter->head = 0;
 }
 
@@ -889,12 +971,15 @@ static struct ring_buffer_event *
 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                  unsigned type, unsigned long length, u64 *ts)
 {
-       struct buffer_page *tail_page, *head_page, *reader_page;
+       struct buffer_page *tail_page, *head_page, *reader_page, *commit_page;
        unsigned long tail, write;
        struct ring_buffer *buffer = cpu_buffer->buffer;
        struct ring_buffer_event *event;
        unsigned long flags;
 
+       commit_page = cpu_buffer->commit_page;
+       /* we just need to protect against interrupts */
+       barrier();
        tail_page = cpu_buffer->tail_page;
        write = local_add_return(length, &tail_page->write);
        tail = write - length;
@@ -903,7 +988,8 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
        if (write > BUF_PAGE_SIZE) {
                struct buffer_page *next_page = tail_page;
 
-               spin_lock_irqsave(&cpu_buffer->lock, flags);
+               local_irq_save(flags);
+               __raw_spin_lock(&cpu_buffer->lock);
 
                rb_inc_page(cpu_buffer, &next_page);
 
@@ -911,14 +997,15 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                reader_page = cpu_buffer->reader_page;
 
                /* we grabbed the lock before incrementing */
-               RB_WARN_ON(cpu_buffer, next_page == reader_page);
+               if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
+                       goto out_unlock;
 
                /*
                 * If for some reason, we had an interrupt storm that made
                 * it all the way around the buffer, bail, and warn
                 * about it.
                 */
-               if (unlikely(next_page == cpu_buffer->commit_page)) {
+               if (unlikely(next_page == commit_page)) {
                        WARN_ON_ONCE(1);
                        goto out_unlock;
                }
@@ -949,12 +1036,12 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                 */
                if (tail_page == cpu_buffer->tail_page) {
                        local_set(&next_page->write, 0);
-                       local_set(&next_page->commit, 0);
+                       local_set(&next_page->page->commit, 0);
                        cpu_buffer->tail_page = next_page;
 
                        /* reread the time stamp */
                        *ts = ring_buffer_time_stamp(cpu_buffer->cpu);
-                       cpu_buffer->tail_page->time_stamp = *ts;
+                       cpu_buffer->tail_page->page->time_stamp = *ts;
                }
 
                /*
@@ -979,7 +1066,8 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                        rb_set_commit_to_write(cpu_buffer);
                }
 
-               spin_unlock_irqrestore(&cpu_buffer->lock, flags);
+               __raw_spin_unlock(&cpu_buffer->lock);
+               local_irq_restore(flags);
 
                /* fail and let the caller try again */
                return ERR_PTR(-EAGAIN);
@@ -987,7 +1075,8 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 
        /* We reserved something on the buffer */
 
-       BUG_ON(write > BUF_PAGE_SIZE);
+       if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
+               return NULL;
 
        event = __rb_page_index(tail_page, tail);
        rb_update_event(event, type, length);
@@ -997,12 +1086,13 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
         * this page's time stamp.
         */
        if (!tail && rb_is_commit(cpu_buffer, event))
-               cpu_buffer->commit_page->time_stamp = *ts;
+               cpu_buffer->commit_page->page->time_stamp = *ts;
 
        return event;
 
  out_unlock:
-       spin_unlock_irqrestore(&cpu_buffer->lock, flags);
+       __raw_spin_unlock(&cpu_buffer->lock);
+       local_irq_restore(flags);
        return NULL;
 }
 
@@ -1047,7 +1137,7 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
                        event->time_delta = *delta & TS_MASK;
                        event->array[0] = *delta >> TS_SHIFT;
                } else {
-                       cpu_buffer->commit_page->time_stamp = *ts;
+                       cpu_buffer->commit_page->page->time_stamp = *ts;
                        event->time_delta = 0;
                        event->array[0] = 0;
                }
@@ -1085,10 +1175,8 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
         * storm or we have something buggy.
         * Bail!
         */
-       if (unlikely(++nr_loops > 1000)) {
-               RB_WARN_ON(cpu_buffer, 1);
+       if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
                return NULL;
-       }
 
        ts = ring_buffer_time_stamp(cpu_buffer->cpu);
 
@@ -1184,15 +1272,14 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer,
        struct ring_buffer_event *event;
        int cpu, resched;
 
-       if (ring_buffers_off)
+       if (ring_buffer_flags != RB_BUFFERS_ON)
                return NULL;
 
        if (atomic_read(&buffer->record_disabled))
                return NULL;
 
        /* If we are tracing schedule, we don't want to recurse */
-       resched = need_resched();
-       preempt_disable_notrace();
+       resched = ftrace_preempt_disable();
 
        cpu = raw_smp_processor_id();
 
@@ -1223,10 +1310,7 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer,
        return event;
 
  out:
-       if (resched)
-               preempt_enable_no_resched_notrace();
-       else
-               preempt_enable_notrace();
+       ftrace_preempt_enable(resched);
        return NULL;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
@@ -1269,12 +1353,9 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer,
        /*
         * Only the last preempt count needs to restore preemption.
         */
-       if (preempt_count() == 1) {
-               if (per_cpu(rb_need_resched, cpu))
-                       preempt_enable_no_resched_notrace();
-               else
-                       preempt_enable_notrace();
-       } else
+       if (preempt_count() == 1)
+               ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
+       else
                preempt_enable_no_resched_notrace();
 
        return 0;
@@ -1305,14 +1386,13 @@ int ring_buffer_write(struct ring_buffer *buffer,
        int ret = -EBUSY;
        int cpu, resched;
 
-       if (ring_buffers_off)
+       if (ring_buffer_flags != RB_BUFFERS_ON)
                return -EBUSY;
 
        if (atomic_read(&buffer->record_disabled))
                return -EBUSY;
 
-       resched = need_resched();
-       preempt_disable_notrace();
+       resched = ftrace_preempt_disable();
 
        cpu = raw_smp_processor_id();
 
@@ -1338,10 +1418,7 @@ int ring_buffer_write(struct ring_buffer *buffer,
 
        ret = 0;
  out:
-       if (resched)
-               preempt_enable_no_resched_notrace();
-       else
-               preempt_enable_notrace();
+       ftrace_preempt_enable(resched);
 
        return ret;
 }
@@ -1509,14 +1586,7 @@ unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
 
-/**
- * ring_buffer_iter_reset - reset an iterator
- * @iter: The iterator to reset
- *
- * Resets the iterator, so that it will start from the beginning
- * again.
- */
-void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
+static void rb_iter_reset(struct ring_buffer_iter *iter)
 {
        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
 
@@ -1531,7 +1601,24 @@ void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
        if (iter->head)
                iter->read_stamp = cpu_buffer->read_stamp;
        else
-               iter->read_stamp = iter->head_page->time_stamp;
+               iter->read_stamp = iter->head_page->page->time_stamp;
+}
+
+/**
+ * ring_buffer_iter_reset - reset an iterator
+ * @iter: The iterator to reset
+ *
+ * Resets the iterator, so that it will start from the beginning
+ * again.
+ */
+void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
+{
+       struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
+       unsigned long flags;
+
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+       rb_iter_reset(iter);
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 }
 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
 
@@ -1619,7 +1706,8 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        unsigned long flags;
        int nr_loops = 0;
 
-       spin_lock_irqsave(&cpu_buffer->lock, flags);
+       local_irq_save(flags);
+       __raw_spin_lock(&cpu_buffer->lock);
 
  again:
        /*
@@ -1628,8 +1716,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
         * a case where we will loop three times. There should be no
         * reason to loop four times (that I know of).
         */
-       if (unlikely(++nr_loops > 3)) {
-               RB_WARN_ON(cpu_buffer, 1);
+       if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
                reader = NULL;
                goto out;
        }
@@ -1641,8 +1728,9 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
                goto out;
 
        /* Never should we have an index greater than the size */
-       RB_WARN_ON(cpu_buffer,
-                  cpu_buffer->reader_page->read > rb_page_size(reader));
+       if (RB_WARN_ON(cpu_buffer,
+                      cpu_buffer->reader_page->read > rb_page_size(reader)))
+               goto out;
 
        /* check if we caught up to the tail */
        reader = NULL;
@@ -1659,7 +1747,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        cpu_buffer->reader_page->list.prev = reader->list.prev;
 
        local_set(&cpu_buffer->reader_page->write, 0);
-       local_set(&cpu_buffer->reader_page->commit, 0);
+       local_set(&cpu_buffer->reader_page->page->commit, 0);
 
        /* Make the reader page now replace the head */
        reader->list.prev->next = &cpu_buffer->reader_page->list;
@@ -1681,7 +1769,8 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        goto again;
 
  out:
-       spin_unlock_irqrestore(&cpu_buffer->lock, flags);
+       __raw_spin_unlock(&cpu_buffer->lock);
+       local_irq_restore(flags);
 
        return reader;
 }
@@ -1695,7 +1784,8 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
        reader = rb_get_reader_page(cpu_buffer);
 
        /* This function should not be called when buffer is empty */
-       BUG_ON(!reader);
+       if (RB_WARN_ON(cpu_buffer, !reader))
+               return;
 
        event = rb_reader_event(cpu_buffer);
 
@@ -1722,7 +1812,9 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
         * Check if we are at the end of the buffer.
         */
        if (iter->head >= rb_page_size(iter->head_page)) {
-               BUG_ON(iter->head_page == cpu_buffer->commit_page);
+               if (RB_WARN_ON(buffer,
+                              iter->head_page == cpu_buffer->commit_page))
+                       return;
                rb_inc_iter(iter);
                return;
        }
@@ -1735,8 +1827,10 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
         * This should not be called to advance the header if we are
         * at the tail of the buffer.
         */
-       BUG_ON((iter->head_page == cpu_buffer->commit_page) &&
-              (iter->head + length > rb_commit_index(cpu_buffer)));
+       if (RB_WARN_ON(cpu_buffer,
+                      (iter->head_page == cpu_buffer->commit_page) &&
+                      (iter->head + length > rb_commit_index(cpu_buffer))))
+               return;
 
        rb_update_iter_read_stamp(iter, event);
 
@@ -1748,17 +1842,8 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
                rb_advance_iter(iter);
 }
 
-/**
- * ring_buffer_peek - peek at the next event to be read
- * @buffer: The ring buffer to read
- * @cpu: The cpu to peak at
- * @ts: The timestamp counter of this event.
- *
- * This will return the event that will be read next, but does
- * not consume the data.
- */
-struct ring_buffer_event *
-ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
+static struct ring_buffer_event *
+rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        struct ring_buffer_event *event;
@@ -1779,10 +1864,8 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
         * can have.  Nesting 10 deep of interrupts is clearly
         * an anomaly.
         */
-       if (unlikely(++nr_loops > 10)) {
-               RB_WARN_ON(cpu_buffer, 1);
+       if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
                return NULL;
-       }
 
        reader = rb_get_reader_page(cpu_buffer);
        if (!reader)
@@ -1821,16 +1904,8 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_peek);
 
-/**
- * ring_buffer_iter_peek - peek at the next event to be read
- * @iter: The ring buffer iterator
- * @ts: The timestamp counter of this event.
- *
- * This will return the event that will be read next, but does
- * not increment the iterator.
- */
-struct ring_buffer_event *
-ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
+static struct ring_buffer_event *
+rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
 {
        struct ring_buffer *buffer;
        struct ring_buffer_per_cpu *cpu_buffer;
@@ -1852,10 +1927,8 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
         * can have. Nesting 10 deep of interrupts is clearly
         * an anomaly.
         */
-       if (unlikely(++nr_loops > 10)) {
-               RB_WARN_ON(cpu_buffer, 1);
+       if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
                return NULL;
-       }
 
        if (rb_per_cpu_empty(cpu_buffer))
                return NULL;
@@ -1892,6 +1965,51 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
 
+/**
+ * ring_buffer_peek - peek at the next event to be read
+ * @buffer: The ring buffer to read
+ * @cpu: The cpu to peak at
+ * @ts: The timestamp counter of this event.
+ *
+ * This will return the event that will be read next, but does
+ * not consume the data.
+ */
+struct ring_buffer_event *
+ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
+{
+       struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+       struct ring_buffer_event *event;
+       unsigned long flags;
+
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+       event = rb_buffer_peek(buffer, cpu, ts);
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+       return event;
+}
+
+/**
+ * ring_buffer_iter_peek - peek at the next event to be read
+ * @iter: The ring buffer iterator
+ * @ts: The timestamp counter of this event.
+ *
+ * This will return the event that will be read next, but does
+ * not increment the iterator.
+ */
+struct ring_buffer_event *
+ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
+{
+       struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
+       struct ring_buffer_event *event;
+       unsigned long flags;
+
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+       event = rb_iter_peek(iter, ts);
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+       return event;
+}
+
 /**
  * ring_buffer_consume - return an event and consume it
  * @buffer: The ring buffer to get the next event from
@@ -1903,19 +2021,24 @@ EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
 struct ring_buffer_event *
 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
 {
-       struct ring_buffer_per_cpu *cpu_buffer;
+       struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
        struct ring_buffer_event *event;
+       unsigned long flags;
 
        if (!cpu_isset(cpu, buffer->cpumask))
                return NULL;
 
-       event = ring_buffer_peek(buffer, cpu, ts);
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+       event = rb_buffer_peek(buffer, cpu, ts);
        if (!event)
-               return NULL;
+               goto out;
 
-       cpu_buffer = buffer->buffers[cpu];
        rb_advance_reader(cpu_buffer);
 
+ out:
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
        return event;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_consume);
@@ -1953,9 +2076,11 @@ ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
        atomic_inc(&cpu_buffer->record_disabled);
        synchronize_sched();
 
-       spin_lock_irqsave(&cpu_buffer->lock, flags);
-       ring_buffer_iter_reset(iter);
-       spin_unlock_irqrestore(&cpu_buffer->lock, flags);
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+       __raw_spin_lock(&cpu_buffer->lock);
+       rb_iter_reset(iter);
+       __raw_spin_unlock(&cpu_buffer->lock);
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
        return iter;
 }
@@ -1989,12 +2114,17 @@ struct ring_buffer_event *
 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
 {
        struct ring_buffer_event *event;
+       struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
+       unsigned long flags;
 
-       event = ring_buffer_iter_peek(iter, ts);
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+       event = rb_iter_peek(iter, ts);
        if (!event)
-               return NULL;
+               goto out;
 
        rb_advance_iter(iter);
+ out:
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
        return event;
 }
@@ -2016,7 +2146,7 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
        cpu_buffer->head_page
                = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
        local_set(&cpu_buffer->head_page->write, 0);
-       local_set(&cpu_buffer->head_page->commit, 0);
+       local_set(&cpu_buffer->head_page->page->commit, 0);
 
        cpu_buffer->head_page->read = 0;
 
@@ -2025,7 +2155,7 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 
        INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
        local_set(&cpu_buffer->reader_page->write, 0);
-       local_set(&cpu_buffer->reader_page->commit, 0);
+       local_set(&cpu_buffer->reader_page->page->commit, 0);
        cpu_buffer->reader_page->read = 0;
 
        cpu_buffer->overrun = 0;
@@ -2045,11 +2175,15 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
        if (!cpu_isset(cpu, buffer->cpumask))
                return;
 
-       spin_lock_irqsave(&cpu_buffer->lock, flags);
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+       __raw_spin_lock(&cpu_buffer->lock);
 
        rb_reset_cpu(cpu_buffer);
 
-       spin_unlock_irqrestore(&cpu_buffer->lock, flags);
+       __raw_spin_unlock(&cpu_buffer->lock);
+
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 }
 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
 
@@ -2123,8 +2257,7 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
                return -EINVAL;
 
        /* At least make sure the two buffers are somewhat the same */
-       if (buffer_a->size != buffer_b->size ||
-           buffer_a->pages != buffer_b->pages)
+       if (buffer_a->pages != buffer_b->pages)
                return -EINVAL;
 
        cpu_buffer_a = buffer_a->buffers[cpu];
@@ -2152,16 +2285,178 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
 }
 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
 
+static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer,
+                             struct buffer_data_page *bpage)
+{
+       struct ring_buffer_event *event;
+       unsigned long head;
+
+       __raw_spin_lock(&cpu_buffer->lock);
+       for (head = 0; head < local_read(&bpage->commit);
+            head += rb_event_length(event)) {
+
+               event = __rb_data_page_index(bpage, head);
+               if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
+                       return;
+               /* Only count data entries */
+               if (event->type != RINGBUF_TYPE_DATA)
+                       continue;
+               cpu_buffer->entries--;
+       }
+       __raw_spin_unlock(&cpu_buffer->lock);
+}
+
+/**
+ * ring_buffer_alloc_read_page - allocate a page to read from buffer
+ * @buffer: the buffer to allocate for.
+ *
+ * This function is used in conjunction with ring_buffer_read_page.
+ * When reading a full page from the ring buffer, these functions
+ * can be used to speed up the process. The calling function should
+ * allocate a few pages first with this function. Then when it
+ * needs to get pages from the ring buffer, it passes the result
+ * of this function into ring_buffer_read_page, which will swap
+ * the page that was allocated, with the read page of the buffer.
+ *
+ * Returns:
+ *  The page allocated, or NULL on error.
+ */
+void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
+{
+       unsigned long addr;
+       struct buffer_data_page *bpage;
+
+       addr = __get_free_page(GFP_KERNEL);
+       if (!addr)
+               return NULL;
+
+       bpage = (void *)addr;
+
+       return bpage;
+}
+
+/**
+ * ring_buffer_free_read_page - free an allocated read page
+ * @buffer: the buffer the page was allocate for
+ * @data: the page to free
+ *
+ * Free a page allocated from ring_buffer_alloc_read_page.
+ */
+void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
+{
+       free_page((unsigned long)data);
+}
+
+/**
+ * ring_buffer_read_page - extract a page from the ring buffer
+ * @buffer: buffer to extract from
+ * @data_page: the page to use allocated from ring_buffer_alloc_read_page
+ * @cpu: the cpu of the buffer to extract
+ * @full: should the extraction only happen when the page is full.
+ *
+ * This function will pull out a page from the ring buffer and consume it.
+ * @data_page must be the address of the variable that was returned
+ * from ring_buffer_alloc_read_page. This is because the page might be used
+ * to swap with a page in the ring buffer.
+ *
+ * for example:
+ *     rpage = ring_buffer_alloc_page(buffer);
+ *     if (!rpage)
+ *             return error;
+ *     ret = ring_buffer_read_page(buffer, &rpage, cpu, 0);
+ *     if (ret)
+ *             process_page(rpage);
+ *
+ * When @full is set, the function will not return true unless
+ * the writer is off the reader page.
+ *
+ * Note: it is up to the calling functions to handle sleeps and wakeups.
+ *  The ring buffer can be used anywhere in the kernel and can not
+ *  blindly call wake_up. The layer that uses the ring buffer must be
+ *  responsible for that.
+ *
+ * Returns:
+ *  1 if data has been transferred
+ *  0 if no data has been transferred.
+ */
+int ring_buffer_read_page(struct ring_buffer *buffer,
+                           void **data_page, int cpu, int full)
+{
+       struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+       struct ring_buffer_event *event;
+       struct buffer_data_page *bpage;
+       unsigned long flags;
+       int ret = 0;
+
+       if (!data_page)
+               return 0;
+
+       bpage = *data_page;
+       if (!bpage)
+               return 0;
+
+       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+       /*
+        * rb_buffer_peek will get the next ring buffer if
+        * the current reader page is empty.
+        */
+       event = rb_buffer_peek(buffer, cpu, NULL);
+       if (!event)
+               goto out;
+
+       /* check for data */
+       if (!local_read(&cpu_buffer->reader_page->page->commit))
+               goto out;
+       /*
+        * If the writer is already off of the read page, then simply
+        * switch the read page with the given page. Otherwise
+        * we need to copy the data from the reader to the writer.
+        */
+       if (cpu_buffer->reader_page == cpu_buffer->commit_page) {
+               unsigned int read = cpu_buffer->reader_page->read;
+
+               if (full)
+                       goto out;
+               /* The writer is still on the reader page, we must copy */
+               bpage = cpu_buffer->reader_page->page;
+               memcpy(bpage->data,
+                      cpu_buffer->reader_page->page->data + read,
+                      local_read(&bpage->commit) - read);
+
+               /* consume what was read */
+               cpu_buffer->reader_page += read;
+
+       } else {
+               /* swap the pages */
+               rb_init_page(bpage);
+               bpage = cpu_buffer->reader_page->page;
+               cpu_buffer->reader_page->page = *data_page;
+               cpu_buffer->reader_page->read = 0;
+               *data_page = bpage;
+       }
+       ret = 1;
+
+       /* update the entry counter */
+       rb_remove_entries(cpu_buffer, bpage);
+ out:
+       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+       return ret;
+}
+
 static ssize_t
 rb_simple_read(struct file *filp, char __user *ubuf,
               size_t cnt, loff_t *ppos)
 {
-       int *p = filp->private_data;
+       long *p = filp->private_data;
        char buf[64];
        int r;
 
-       /* !ring_buffers_off == tracing_on */
-       r = sprintf(buf, "%d\n", !*p);
+       if (test_bit(RB_BUFFERS_DISABLED_BIT, p))
+               r = sprintf(buf, "permanently disabled\n");
+       else
+               r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p));
 
        return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
 }
@@ -2170,7 +2465,7 @@ static ssize_t
 rb_simple_write(struct file *filp, const char __user *ubuf,
                size_t cnt, loff_t *ppos)
 {
-       int *p = filp->private_data;
+       long *p = filp->private_data;
        char buf[64];
        long val;
        int ret;
@@ -2187,8 +2482,10 @@ rb_simple_write(struct file *filp, const char __user *ubuf,
        if (ret < 0)
                return ret;
 
-       /* !ring_buffers_off == tracing_on */
-       *p = !val;
+       if (val)
+               set_bit(RB_BUFFERS_ON_BIT, p);
+       else
+               clear_bit(RB_BUFFERS_ON_BIT, p);
 
        (*ppos)++;
 
@@ -2210,7 +2507,7 @@ static __init int rb_init_debugfs(void)
        d_tracer = tracing_init_dentry();
 
        entry = debugfs_create_file("tracing_on", 0644, d_tracer,
-                                   &ring_buffers_off, &rb_simple_fops);
+                                   &ring_buffer_flags, &rb_simple_fops);
        if (!entry)
                pr_warning("Could not create debugfs 'tracing_on' entry\n");