btrfs: introduce a bitmap based csum range search function
[sfrench/cifs-2.6.git] / fs / btrfs / async-thread.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (C) 2007 Oracle.  All rights reserved.
4  * Copyright (C) 2014 Fujitsu.  All rights reserved.
5  */
6
7 #include <linux/kthread.h>
8 #include <linux/slab.h>
9 #include <linux/list.h>
10 #include <linux/spinlock.h>
11 #include <linux/freezer.h>
12 #include "async-thread.h"
13 #include "ctree.h"
14
15 enum {
16         WORK_DONE_BIT,
17         WORK_ORDER_DONE_BIT,
18 };
19
20 #define NO_THRESHOLD (-1)
21 #define DFT_THRESHOLD (32)
22
23 struct btrfs_workqueue {
24         struct workqueue_struct *normal_wq;
25
26         /* File system this workqueue services */
27         struct btrfs_fs_info *fs_info;
28
29         /* List head pointing to ordered work list */
30         struct list_head ordered_list;
31
32         /* Spinlock for ordered_list */
33         spinlock_t list_lock;
34
35         /* Thresholding related variants */
36         atomic_t pending;
37
38         /* Up limit of concurrency workers */
39         int limit_active;
40
41         /* Current number of concurrency workers */
42         int current_active;
43
44         /* Threshold to change current_active */
45         int thresh;
46         unsigned int count;
47         spinlock_t thres_lock;
48 };
49
50 struct btrfs_fs_info * __pure btrfs_workqueue_owner(const struct btrfs_workqueue *wq)
51 {
52         return wq->fs_info;
53 }
54
55 struct btrfs_fs_info * __pure btrfs_work_owner(const struct btrfs_work *work)
56 {
57         return work->wq->fs_info;
58 }
59
60 bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq)
61 {
62         /*
63          * We could compare wq->pending with num_online_cpus()
64          * to support "thresh == NO_THRESHOLD" case, but it requires
65          * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's
66          * postpone it until someone needs the support of that case.
67          */
68         if (wq->thresh == NO_THRESHOLD)
69                 return false;
70
71         return atomic_read(&wq->pending) > wq->thresh * 2;
72 }
73
74 struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info,
75                                               const char *name, unsigned int flags,
76                                               int limit_active, int thresh)
77 {
78         struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
79
80         if (!ret)
81                 return NULL;
82
83         ret->fs_info = fs_info;
84         ret->limit_active = limit_active;
85         atomic_set(&ret->pending, 0);
86         if (thresh == 0)
87                 thresh = DFT_THRESHOLD;
88         /* For low threshold, disabling threshold is a better choice */
89         if (thresh < DFT_THRESHOLD) {
90                 ret->current_active = limit_active;
91                 ret->thresh = NO_THRESHOLD;
92         } else {
93                 /*
94                  * For threshold-able wq, let its concurrency grow on demand.
95                  * Use minimal max_active at alloc time to reduce resource
96                  * usage.
97                  */
98                 ret->current_active = 1;
99                 ret->thresh = thresh;
100         }
101
102         ret->normal_wq = alloc_workqueue("btrfs-%s", flags, ret->current_active,
103                                          name);
104         if (!ret->normal_wq) {
105                 kfree(ret);
106                 return NULL;
107         }
108
109         INIT_LIST_HEAD(&ret->ordered_list);
110         spin_lock_init(&ret->list_lock);
111         spin_lock_init(&ret->thres_lock);
112         trace_btrfs_workqueue_alloc(ret, name);
113         return ret;
114 }
115
116 /*
117  * Hook for threshold which will be called in btrfs_queue_work.
118  * This hook WILL be called in IRQ handler context,
119  * so workqueue_set_max_active MUST NOT be called in this hook
120  */
121 static inline void thresh_queue_hook(struct btrfs_workqueue *wq)
122 {
123         if (wq->thresh == NO_THRESHOLD)
124                 return;
125         atomic_inc(&wq->pending);
126 }
127
128 /*
129  * Hook for threshold which will be called before executing the work,
130  * This hook is called in kthread content.
131  * So workqueue_set_max_active is called here.
132  */
133 static inline void thresh_exec_hook(struct btrfs_workqueue *wq)
134 {
135         int new_current_active;
136         long pending;
137         int need_change = 0;
138
139         if (wq->thresh == NO_THRESHOLD)
140                 return;
141
142         atomic_dec(&wq->pending);
143         spin_lock(&wq->thres_lock);
144         /*
145          * Use wq->count to limit the calling frequency of
146          * workqueue_set_max_active.
147          */
148         wq->count++;
149         wq->count %= (wq->thresh / 4);
150         if (!wq->count)
151                 goto  out;
152         new_current_active = wq->current_active;
153
154         /*
155          * pending may be changed later, but it's OK since we really
156          * don't need it so accurate to calculate new_max_active.
157          */
158         pending = atomic_read(&wq->pending);
159         if (pending > wq->thresh)
160                 new_current_active++;
161         if (pending < wq->thresh / 2)
162                 new_current_active--;
163         new_current_active = clamp_val(new_current_active, 1, wq->limit_active);
164         if (new_current_active != wq->current_active)  {
165                 need_change = 1;
166                 wq->current_active = new_current_active;
167         }
168 out:
169         spin_unlock(&wq->thres_lock);
170
171         if (need_change) {
172                 workqueue_set_max_active(wq->normal_wq, wq->current_active);
173         }
174 }
175
176 static void run_ordered_work(struct btrfs_workqueue *wq,
177                              struct btrfs_work *self)
178 {
179         struct list_head *list = &wq->ordered_list;
180         struct btrfs_work *work;
181         spinlock_t *lock = &wq->list_lock;
182         unsigned long flags;
183         bool free_self = false;
184
185         while (1) {
186                 spin_lock_irqsave(lock, flags);
187                 if (list_empty(list))
188                         break;
189                 work = list_entry(list->next, struct btrfs_work,
190                                   ordered_list);
191                 if (!test_bit(WORK_DONE_BIT, &work->flags))
192                         break;
193                 /*
194                  * Orders all subsequent loads after reading WORK_DONE_BIT,
195                  * paired with the smp_mb__before_atomic in btrfs_work_helper
196                  * this guarantees that the ordered function will see all
197                  * updates from ordinary work function.
198                  */
199                 smp_rmb();
200
201                 /*
202                  * we are going to call the ordered done function, but
203                  * we leave the work item on the list as a barrier so
204                  * that later work items that are done don't have their
205                  * functions called before this one returns
206                  */
207                 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
208                         break;
209                 trace_btrfs_ordered_sched(work);
210                 spin_unlock_irqrestore(lock, flags);
211                 work->ordered_func(work);
212
213                 /* now take the lock again and drop our item from the list */
214                 spin_lock_irqsave(lock, flags);
215                 list_del(&work->ordered_list);
216                 spin_unlock_irqrestore(lock, flags);
217
218                 if (work == self) {
219                         /*
220                          * This is the work item that the worker is currently
221                          * executing.
222                          *
223                          * The kernel workqueue code guarantees non-reentrancy
224                          * of work items. I.e., if a work item with the same
225                          * address and work function is queued twice, the second
226                          * execution is blocked until the first one finishes. A
227                          * work item may be freed and recycled with the same
228                          * work function; the workqueue code assumes that the
229                          * original work item cannot depend on the recycled work
230                          * item in that case (see find_worker_executing_work()).
231                          *
232                          * Note that different types of Btrfs work can depend on
233                          * each other, and one type of work on one Btrfs
234                          * filesystem may even depend on the same type of work
235                          * on another Btrfs filesystem via, e.g., a loop device.
236                          * Therefore, we must not allow the current work item to
237                          * be recycled until we are really done, otherwise we
238                          * break the above assumption and can deadlock.
239                          */
240                         free_self = true;
241                 } else {
242                         /*
243                          * We don't want to call the ordered free functions with
244                          * the lock held.
245                          */
246                         work->ordered_free(work);
247                         /* NB: work must not be dereferenced past this point. */
248                         trace_btrfs_all_work_done(wq->fs_info, work);
249                 }
250         }
251         spin_unlock_irqrestore(lock, flags);
252
253         if (free_self) {
254                 self->ordered_free(self);
255                 /* NB: self must not be dereferenced past this point. */
256                 trace_btrfs_all_work_done(wq->fs_info, self);
257         }
258 }
259
260 static void btrfs_work_helper(struct work_struct *normal_work)
261 {
262         struct btrfs_work *work = container_of(normal_work, struct btrfs_work,
263                                                normal_work);
264         struct btrfs_workqueue *wq = work->wq;
265         int need_order = 0;
266
267         /*
268          * We should not touch things inside work in the following cases:
269          * 1) after work->func() if it has no ordered_free
270          *    Since the struct is freed in work->func().
271          * 2) after setting WORK_DONE_BIT
272          *    The work may be freed in other threads almost instantly.
273          * So we save the needed things here.
274          */
275         if (work->ordered_func)
276                 need_order = 1;
277
278         trace_btrfs_work_sched(work);
279         thresh_exec_hook(wq);
280         work->func(work);
281         if (need_order) {
282                 /*
283                  * Ensures all memory accesses done in the work function are
284                  * ordered before setting the WORK_DONE_BIT. Ensuring the thread
285                  * which is going to executed the ordered work sees them.
286                  * Pairs with the smp_rmb in run_ordered_work.
287                  */
288                 smp_mb__before_atomic();
289                 set_bit(WORK_DONE_BIT, &work->flags);
290                 run_ordered_work(wq, work);
291         } else {
292                 /* NB: work must not be dereferenced past this point. */
293                 trace_btrfs_all_work_done(wq->fs_info, work);
294         }
295 }
296
297 void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func,
298                      btrfs_func_t ordered_func, btrfs_func_t ordered_free)
299 {
300         work->func = func;
301         work->ordered_func = ordered_func;
302         work->ordered_free = ordered_free;
303         INIT_WORK(&work->normal_work, btrfs_work_helper);
304         INIT_LIST_HEAD(&work->ordered_list);
305         work->flags = 0;
306 }
307
308 void btrfs_queue_work(struct btrfs_workqueue *wq, struct btrfs_work *work)
309 {
310         unsigned long flags;
311
312         work->wq = wq;
313         thresh_queue_hook(wq);
314         if (work->ordered_func) {
315                 spin_lock_irqsave(&wq->list_lock, flags);
316                 list_add_tail(&work->ordered_list, &wq->ordered_list);
317                 spin_unlock_irqrestore(&wq->list_lock, flags);
318         }
319         trace_btrfs_work_queued(work);
320         queue_work(wq->normal_wq, &work->normal_work);
321 }
322
323 void btrfs_destroy_workqueue(struct btrfs_workqueue *wq)
324 {
325         if (!wq)
326                 return;
327         destroy_workqueue(wq->normal_wq);
328         trace_btrfs_workqueue_destroy(wq);
329         kfree(wq);
330 }
331
332 void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active)
333 {
334         if (wq)
335                 wq->limit_active = limit_active;
336 }
337
338 void btrfs_flush_workqueue(struct btrfs_workqueue *wq)
339 {
340         flush_workqueue(wq->normal_wq);
341 }