Merge remote-tracking branch 'asoc/topic/intel' into asoc-next
[sfrench/cifs-2.6.git] / include / linux / ptr_ring.h
index 6c70444da3b9d3e5c31f04193b96986ff0e4b04b..6b2e0dd88569b13c66ef445609b2f12419fdd3ac 100644 (file)
 struct ptr_ring {
        int producer ____cacheline_aligned_in_smp;
        spinlock_t producer_lock;
-       int consumer ____cacheline_aligned_in_smp;
+       int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
+       int consumer_tail; /* next entry to invalidate */
        spinlock_t consumer_lock;
        /* Shared consumer/producer data */
        /* Read-only by both the producer and the consumer */
        int size ____cacheline_aligned_in_smp; /* max entries in queue */
+       int batch; /* number of entries to consume in a batch */
        void **queue;
 };
 
@@ -170,7 +172,7 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
 static inline void *__ptr_ring_peek(struct ptr_ring *r)
 {
        if (likely(r->size))
-               return r->queue[r->consumer];
+               return r->queue[r->consumer_head];
        return NULL;
 }
 
@@ -231,9 +233,38 @@ static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
 /* Must only be called after __ptr_ring_peek returned !NULL */
 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
 {
-       r->queue[r->consumer++] = NULL;
-       if (unlikely(r->consumer >= r->size))
-               r->consumer = 0;
+       /* Fundamentally, what we want to do is update consumer
+        * index and zero out the entry so producer can reuse it.
+        * Doing it naively at each consume would be as simple as:
+        *       r->queue[r->consumer++] = NULL;
+        *       if (unlikely(r->consumer >= r->size))
+        *               r->consumer = 0;
+        * but that is suboptimal when the ring is full as producer is writing
+        * out new entries in the same cache line.  Defer these updates until a
+        * batch of entries has been consumed.
+        */
+       int head = r->consumer_head++;
+
+       /* Once we have processed enough entries invalidate them in
+        * the ring all at once so producer can reuse their space in the ring.
+        * We also do this when we reach end of the ring - not mandatory
+        * but helps keep the implementation simple.
+        */
+       if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
+                    r->consumer_head >= r->size)) {
+               /* Zero out entries in the reverse order: this way we touch the
+                * cache line that producer might currently be reading the last;
+                * producer won't make progress and touch other cache lines
+                * besides the first one until we write out all entries.
+                */
+               while (likely(head >= r->consumer_tail))
+                       r->queue[head--] = NULL;
+               r->consumer_tail = r->consumer_head;
+       }
+       if (unlikely(r->consumer_head >= r->size)) {
+               r->consumer_head = 0;
+               r->consumer_tail = 0;
+       }
 }
 
 static inline void *__ptr_ring_consume(struct ptr_ring *r)
@@ -345,14 +376,27 @@ static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
        return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
 }
 
+static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
+{
+       r->size = size;
+       r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
+       /* We need to set batch at least to 1 to make logic
+        * in __ptr_ring_discard_one work correctly.
+        * Batching too much (because ring is small) would cause a lot of
+        * burstiness. Needs tuning, for now disable batching.
+        */
+       if (r->batch > r->size / 2 || !r->batch)
+               r->batch = 1;
+}
+
 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 {
        r->queue = __ptr_ring_init_queue_alloc(size, gfp);
        if (!r->queue)
                return -ENOMEM;
 
-       r->size = size;
-       r->producer = r->consumer = 0;
+       __ptr_ring_set_size(r, size);
+       r->producer = r->consumer_head = r->consumer_tail = 0;
        spin_lock_init(&r->producer_lock);
        spin_lock_init(&r->consumer_lock);
 
@@ -373,9 +417,10 @@ static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
                else if (destroy)
                        destroy(ptr);
 
-       r->size = size;
+       __ptr_ring_set_size(r, size);
        r->producer = producer;
-       r->consumer = 0;
+       r->consumer_head = 0;
+       r->consumer_tail = 0;
        old = r->queue;
        r->queue = queue;