ptr_ring: try vmalloc() when kmalloc() fails
[sfrench/cifs-2.6.git] / include / linux / ptr_ring.h
index d72b2e7dd500ea5224a3afa330f478b3475e0a4e..b884b7794187ee4c007b512107c41b7fe5055516 100644 (file)
@@ -45,9 +45,10 @@ struct ptr_ring {
 };
 
 /* Note: callers invoking this in a loop must use a compiler barrier,
- * for example cpu_relax().  If ring is ever resized, callers must hold
- * producer_lock - see e.g. ptr_ring_full.  Otherwise, if callers don't hold
- * producer_lock, the next call to __ptr_ring_produce may fail.
+ * for example cpu_relax().
+ *
+ * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
+ * see e.g. ptr_ring_full.
  */
 static inline bool __ptr_ring_full(struct ptr_ring *r)
 {
@@ -113,7 +114,7 @@ static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
        /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
        smp_wmb();
 
-       r->queue[r->producer++] = ptr;
+       WRITE_ONCE(r->queue[r->producer++], ptr);
        if (unlikely(r->producer >= r->size))
                r->producer = 0;
        return 0;
@@ -169,32 +170,36 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
        return ret;
 }
 
-/* Note: callers invoking this in a loop must use a compiler barrier,
- * for example cpu_relax(). Callers must take consumer_lock
- * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
- * If ring is never resized, and if the pointer is merely
- * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.
- * However, if called outside the lock, and if some other CPU
- * consumes ring entries at the same time, the value returned
- * is not guaranteed to be correct.
- * In this case - to avoid incorrectly detecting the ring
- * as empty - the CPU consuming the ring entries is responsible
- * for either consuming all ring entries until the ring is empty,
- * or synchronizing with some other CPU and causing it to
- * execute __ptr_ring_peek and/or consume the ring enteries
- * after the synchronization point.
- */
 static inline void *__ptr_ring_peek(struct ptr_ring *r)
 {
        if (likely(r->size))
-               return r->queue[r->consumer_head];
+               return READ_ONCE(r->queue[r->consumer_head]);
        return NULL;
 }
 
-/* See __ptr_ring_peek above for locking rules. */
+/*
+ * Test ring empty status without taking any locks.
+ *
+ * NB: This is only safe to call if ring is never resized.
+ *
+ * However, if some other CPU consumes ring entries at the same time, the value
+ * returned is not guaranteed to be correct.
+ *
+ * In this case - to avoid incorrectly detecting the ring
+ * as empty - the CPU consuming the ring entries is responsible
+ * for either consuming all ring entries until the ring is empty,
+ * or synchronizing with some other CPU and causing it to
+ * re-test __ptr_ring_empty and/or consume the ring enteries
+ * after the synchronization point.
+ *
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
 static inline bool __ptr_ring_empty(struct ptr_ring *r)
 {
-       return !__ptr_ring_peek(r);
+       if (likely(r->size))
+               return !r->queue[READ_ONCE(r->consumer_head)];
+       return true;
 }
 
 static inline bool ptr_ring_empty(struct ptr_ring *r)
@@ -248,22 +253,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
        /* Fundamentally, what we want to do is update consumer
         * index and zero out the entry so producer can reuse it.
         * Doing it naively at each consume would be as simple as:
-        *       r->queue[r->consumer++] = NULL;
-        *       if (unlikely(r->consumer >= r->size))
-        *               r->consumer = 0;
+        *       consumer = r->consumer;
+        *       r->queue[consumer++] = NULL;
+        *       if (unlikely(consumer >= r->size))
+        *               consumer = 0;
+        *       r->consumer = consumer;
         * but that is suboptimal when the ring is full as producer is writing
         * out new entries in the same cache line.  Defer these updates until a
         * batch of entries has been consumed.
         */
-       int head = r->consumer_head++;
+       /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
+        * to work correctly.
+        */
+       int consumer_head = r->consumer_head;
+       int head = consumer_head++;
 
        /* Once we have processed enough entries invalidate them in
         * the ring all at once so producer can reuse their space in the ring.
         * We also do this when we reach end of the ring - not mandatory
         * but helps keep the implementation simple.
         */
-       if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
-                    r->consumer_head >= r->size)) {
+       if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+                    consumer_head >= r->size)) {
                /* Zero out entries in the reverse order: this way we touch the
                 * cache line that producer might currently be reading the last;
                 * producer won't make progress and touch other cache lines
@@ -271,12 +282,14 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
                 */
                while (likely(head >= r->consumer_tail))
                        r->queue[head--] = NULL;
-               r->consumer_tail = r->consumer_head;
+               r->consumer_tail = consumer_head;
        }
-       if (unlikely(r->consumer_head >= r->size)) {
-               r->consumer_head = 0;
+       if (unlikely(consumer_head >= r->size)) {
+               consumer_head = 0;
                r->consumer_tail = 0;
        }
+       /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
+       WRITE_ONCE(r->consumer_head, consumer_head);
 }
 
 static inline void *__ptr_ring_consume(struct ptr_ring *r)
@@ -451,9 +464,14 @@ static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
        __PTR_RING_PEEK_CALL_v; \
 })
 
+/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
+ * documentation for vmalloc for which of them are legal.
+ */
 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
 {
-       return kcalloc(size, sizeof(void *), gfp);
+       if (size * sizeof(void *) > KMALLOC_MAX_SIZE)
+               return NULL;
+       return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
 }
 
 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
@@ -527,7 +545,9 @@ static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
                        goto done;
                }
                r->queue[head] = batch[--n];
-               r->consumer_tail = r->consumer_head = head;
+               r->consumer_tail = head;
+               /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
+               WRITE_ONCE(r->consumer_head, head);
        }
 
 done:
@@ -586,7 +606,7 @@ static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
        spin_unlock(&(r)->producer_lock);
        spin_unlock_irqrestore(&(r)->consumer_lock, flags);
 
-       kfree(old);
+       kvfree(old);
 
        return 0;
 }
@@ -626,7 +646,7 @@ static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
        }
 
        for (i = 0; i < nrings; ++i)
-               kfree(queues[i]);
+               kvfree(queues[i]);
 
        kfree(queues);
 
@@ -634,7 +654,7 @@ static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
 
 nomem:
        while (--i >= 0)
-               kfree(queues[i]);
+               kvfree(queues[i]);
 
        kfree(queues);
 
@@ -649,7 +669,7 @@ static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
        if (destroy)
                while ((ptr = ptr_ring_consume(r)))
                        destroy(ptr);
-       kfree(r->queue);
+       kvfree(r->queue);
 }
 
 #endif /* _LINUX_PTR_RING_H  */