Merge tag 'for-6.9/dm-vdo' of git://git.kernel.org/pub/scm/linux/kernel/git/device...
[sfrench/cifs-2.6.git] / drivers / md / dm-vdo / funnel-queue.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 #include "funnel-queue.h"
7
8 #include "cpu.h"
9 #include "memory-alloc.h"
10 #include "permassert.h"
11
12 int vdo_make_funnel_queue(struct funnel_queue **queue_ptr)
13 {
14         int result;
15         struct funnel_queue *queue;
16
17         result = vdo_allocate(1, struct funnel_queue, "funnel queue", &queue);
18         if (result != VDO_SUCCESS)
19                 return result;
20
21         /*
22          * Initialize the stub entry and put it in the queue, establishing the invariant that
23          * queue->newest and queue->oldest are never null.
24          */
25         queue->stub.next = NULL;
26         queue->newest = &queue->stub;
27         queue->oldest = &queue->stub;
28
29         *queue_ptr = queue;
30         return VDO_SUCCESS;
31 }
32
33 void vdo_free_funnel_queue(struct funnel_queue *queue)
34 {
35         vdo_free(queue);
36 }
37
38 static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
39 {
40         /*
41          * Barrier requirements: We need a read barrier between reading a "next" field pointer
42          * value and reading anything it points to. There's an accompanying barrier in
43          * vdo_funnel_queue_put() between its caller setting up the entry and making it visible.
44          */
45         struct funnel_queue_entry *oldest = queue->oldest;
46         struct funnel_queue_entry *next = READ_ONCE(oldest->next);
47
48         if (oldest == &queue->stub) {
49                 /*
50                  * When the oldest entry is the stub and it has no successor, the queue is
51                  * logically empty.
52                  */
53                 if (next == NULL)
54                         return NULL;
55                 /*
56                  * The stub entry has a successor, so the stub can be dequeued and ignored without
57                  * breaking the queue invariants.
58                  */
59                 oldest = next;
60                 queue->oldest = oldest;
61                 next = READ_ONCE(oldest->next);
62         }
63
64         /*
65          * We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
66          * stub entry back on the queue first.
67          */
68         if (next == NULL) {
69                 struct funnel_queue_entry *newest = READ_ONCE(queue->newest);
70
71                 if (oldest != newest) {
72                         /*
73                          * Another thread has already swung queue->newest atomically, but not yet
74                          * assigned previous->next. The queue is really still empty.
75                          */
76                         return NULL;
77                 }
78
79                 /*
80                  * Put the stub entry back on the queue, ensuring a successor will eventually be
81                  * seen.
82                  */
83                 vdo_funnel_queue_put(queue, &queue->stub);
84
85                 /* Check again for a successor. */
86                 next = READ_ONCE(oldest->next);
87                 if (next == NULL) {
88                         /*
89                          * We lost a race with a producer who swapped queue->newest before we did,
90                          * but who hasn't yet updated previous->next. Try again later.
91                          */
92                         return NULL;
93                 }
94         }
95
96         return oldest;
97 }
98
99 /*
100  * Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
101  * called from a single consumer thread.
102  */
103 struct funnel_queue_entry *vdo_funnel_queue_poll(struct funnel_queue *queue)
104 {
105         struct funnel_queue_entry *oldest = get_oldest(queue);
106
107         if (oldest == NULL)
108                 return oldest;
109
110         /*
111          * Dequeue the oldest entry and return it. Only one consumer thread may call this function,
112          * so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
113          * consumer and oldest->next is never used by a producer thread after it is swung from NULL
114          * to non-NULL.
115          */
116         queue->oldest = READ_ONCE(oldest->next);
117         /*
118          * Make sure the caller sees the proper stored data for this entry. Since we've already
119          * fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
120          * to the next call we'll properly see the dependent data.
121          */
122         smp_rmb();
123         /*
124          * If "oldest" is a very light-weight work item, we'll be looking for the next one very
125          * soon, so prefetch it now.
126          */
127         uds_prefetch_address(queue->oldest, true);
128         WRITE_ONCE(oldest->next, NULL);
129         return oldest;
130 }
131
132 /*
133  * Check whether the funnel queue is empty or not. If the queue is in a transition state with one
134  * or more entries being added such that the list view is incomplete, this function will report the
135  * queue as empty.
136  */
137 bool vdo_is_funnel_queue_empty(struct funnel_queue *queue)
138 {
139         return get_oldest(queue) == NULL;
140 }
141
142 /*
143  * Check whether the funnel queue is idle or not. If the queue has entries available to be
144  * retrieved, it is not idle. If the queue is in a transition state with one or more entries being
145  * added such that the list view is incomplete, it may not be possible to retrieve an entry with
146  * the vdo_funnel_queue_poll() function, but the queue will not be considered idle.
147  */
148 bool vdo_is_funnel_queue_idle(struct funnel_queue *queue)
149 {
150         /*
151          * Oldest is not the stub, so there's another entry, though if next is NULL we can't
152          * retrieve it yet.
153          */
154         if (queue->oldest != &queue->stub)
155                 return false;
156
157         /*
158          * Oldest is the stub, but newest has been updated by _put(); either there's another,
159          * retrievable entry in the list, or the list is officially empty but in the intermediate
160          * state of having an entry added.
161          *
162          * Whether anything is retrievable depends on whether stub.next has been updated and become
163          * visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
164          * update to newest would be visible to us at the same time or sooner.
165          */
166         if (READ_ONCE(queue->newest) != &queue->stub)
167                 return false;
168
169         return true;
170 }