ptr_ring: keep consumer_head valid at all times
authorMichael S. Tsirkin <mst@redhat.com>
Thu, 25 Jan 2018 23:36:27 +0000 (01:36 +0200)
committerDavid S. Miller <davem@davemloft.net>
Mon, 29 Jan 2018 17:02:53 +0000 (12:02 -0500)
The comment near __ptr_ring_peek says:

 * If ring is never resized, and if the pointer is merely
 * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.

but this was in fact never possible since consumer_head would sometimes
point outside the ring. Refactor the code so that it's always
pointing within a ring.

Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array")
Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
Acked-by: John Fastabend <john.fastabend@gmail.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
include/linux/ptr_ring.h

index 9ca1726ff963d433a05c1c838e20f4283551259d..5ebcdd40df997c8deb7a4b92bcd6581e02d41519 100644 (file)
@@ -248,22 +248,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
        /* Fundamentally, what we want to do is update consumer
         * index and zero out the entry so producer can reuse it.
         * Doing it naively at each consume would be as simple as:
-        *       r->queue[r->consumer++] = NULL;
-        *       if (unlikely(r->consumer >= r->size))
-        *               r->consumer = 0;
+        *       consumer = r->consumer;
+        *       r->queue[consumer++] = NULL;
+        *       if (unlikely(consumer >= r->size))
+        *               consumer = 0;
+        *       r->consumer = consumer;
         * but that is suboptimal when the ring is full as producer is writing
         * out new entries in the same cache line.  Defer these updates until a
         * batch of entries has been consumed.
         */
-       int head = r->consumer_head++;
+       /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
+        * to work correctly.
+        */
+       int consumer_head = r->consumer_head;
+       int head = consumer_head++;
 
        /* Once we have processed enough entries invalidate them in
         * the ring all at once so producer can reuse their space in the ring.
         * We also do this when we reach end of the ring - not mandatory
         * but helps keep the implementation simple.
         */
-       if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
-                    r->consumer_head >= r->size)) {
+       if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+                    consumer_head >= r->size)) {
                /* Zero out entries in the reverse order: this way we touch the
                 * cache line that producer might currently be reading the last;
                 * producer won't make progress and touch other cache lines
@@ -271,12 +277,13 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
                 */
                while (likely(head >= r->consumer_tail))
                        r->queue[head--] = NULL;
-               r->consumer_tail = r->consumer_head;
+               r->consumer_tail = consumer_head;
        }
-       if (unlikely(r->consumer_head >= r->size)) {
-               r->consumer_head = 0;
+       if (unlikely(consumer_head >= r->size)) {
+               consumer_head = 0;
                r->consumer_tail = 0;
        }
+       r->consumer_head = consumer_head;
 }
 
 static inline void *__ptr_ring_consume(struct ptr_ring *r)