Merge tag 'drm-misc-fixes-2019-11-25' of git://anongit.freedesktop.org/drm/drm-misc...
[sfrench/cifs-2.6.git] / fs / io-wq.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Basic worker thread pool for io_uring
4  *
5  * Copyright (C) 2019 Jens Axboe
6  *
7  */
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
12 #include <linux/mm.h>
13 #include <linux/mmu_context.h>
14 #include <linux/sched/mm.h>
15 #include <linux/percpu.h>
16 #include <linux/slab.h>
17 #include <linux/kthread.h>
18 #include <linux/rculist_nulls.h>
19
20 #include "io-wq.h"
21
22 #define WORKER_IDLE_TIMEOUT     (5 * HZ)
23
24 enum {
25         IO_WORKER_F_UP          = 1,    /* up and active */
26         IO_WORKER_F_RUNNING     = 2,    /* account as running */
27         IO_WORKER_F_FREE        = 4,    /* worker on free list */
28         IO_WORKER_F_EXITING     = 8,    /* worker exiting */
29         IO_WORKER_F_FIXED       = 16,   /* static idle worker */
30         IO_WORKER_F_BOUND       = 32,   /* is doing bounded work */
31 };
32
33 enum {
34         IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
35         IO_WQ_BIT_CANCEL        = 1,    /* cancel work on list */
36         IO_WQ_BIT_ERROR         = 2,    /* error on setup */
37 };
38
39 enum {
40         IO_WQE_FLAG_STALLED     = 1,    /* stalled on hash */
41 };
42
43 /*
44  * One for each thread in a wqe pool
45  */
46 struct io_worker {
47         refcount_t ref;
48         unsigned flags;
49         struct hlist_nulls_node nulls_node;
50         struct list_head all_list;
51         struct task_struct *task;
52         wait_queue_head_t wait;
53         struct io_wqe *wqe;
54
55         struct io_wq_work *cur_work;
56         spinlock_t lock;
57
58         struct rcu_head rcu;
59         struct mm_struct *mm;
60         const struct cred *creds;
61         struct files_struct *restore_files;
62 };
63
64 #if BITS_PER_LONG == 64
65 #define IO_WQ_HASH_ORDER        6
66 #else
67 #define IO_WQ_HASH_ORDER        5
68 #endif
69
70 struct io_wqe_acct {
71         unsigned nr_workers;
72         unsigned max_workers;
73         atomic_t nr_running;
74 };
75
76 enum {
77         IO_WQ_ACCT_BOUND,
78         IO_WQ_ACCT_UNBOUND,
79 };
80
81 /*
82  * Per-node worker thread pool
83  */
84 struct io_wqe {
85         struct {
86                 spinlock_t lock;
87                 struct io_wq_work_list work_list;
88                 unsigned long hash_map;
89                 unsigned flags;
90         } ____cacheline_aligned_in_smp;
91
92         int node;
93         struct io_wqe_acct acct[2];
94
95         struct hlist_nulls_head free_list;
96         struct hlist_nulls_head busy_list;
97         struct list_head all_list;
98
99         struct io_wq *wq;
100 };
101
102 /*
103  * Per io_wq state
104   */
105 struct io_wq {
106         struct io_wqe **wqes;
107         unsigned long state;
108
109         get_work_fn *get_work;
110         put_work_fn *put_work;
111
112         struct task_struct *manager;
113         struct user_struct *user;
114         const struct cred *creds;
115         struct mm_struct *mm;
116         refcount_t refs;
117         struct completion done;
118 };
119
120 static bool io_worker_get(struct io_worker *worker)
121 {
122         return refcount_inc_not_zero(&worker->ref);
123 }
124
125 static void io_worker_release(struct io_worker *worker)
126 {
127         if (refcount_dec_and_test(&worker->ref))
128                 wake_up_process(worker->task);
129 }
130
131 /*
132  * Note: drops the wqe->lock if returning true! The caller must re-acquire
133  * the lock in that case. Some callers need to restart handling if this
134  * happens, so we can't just re-acquire the lock on behalf of the caller.
135  */
136 static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
137 {
138         bool dropped_lock = false;
139
140         if (worker->creds) {
141                 revert_creds(worker->creds);
142                 worker->creds = NULL;
143         }
144
145         if (current->files != worker->restore_files) {
146                 __acquire(&wqe->lock);
147                 spin_unlock_irq(&wqe->lock);
148                 dropped_lock = true;
149
150                 task_lock(current);
151                 current->files = worker->restore_files;
152                 task_unlock(current);
153         }
154
155         /*
156          * If we have an active mm, we need to drop the wq lock before unusing
157          * it. If we do, return true and let the caller retry the idle loop.
158          */
159         if (worker->mm) {
160                 if (!dropped_lock) {
161                         __acquire(&wqe->lock);
162                         spin_unlock_irq(&wqe->lock);
163                         dropped_lock = true;
164                 }
165                 __set_current_state(TASK_RUNNING);
166                 set_fs(KERNEL_DS);
167                 unuse_mm(worker->mm);
168                 mmput(worker->mm);
169                 worker->mm = NULL;
170         }
171
172         return dropped_lock;
173 }
174
175 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
176                                                    struct io_wq_work *work)
177 {
178         if (work->flags & IO_WQ_WORK_UNBOUND)
179                 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
180
181         return &wqe->acct[IO_WQ_ACCT_BOUND];
182 }
183
184 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
185                                                   struct io_worker *worker)
186 {
187         if (worker->flags & IO_WORKER_F_BOUND)
188                 return &wqe->acct[IO_WQ_ACCT_BOUND];
189
190         return &wqe->acct[IO_WQ_ACCT_UNBOUND];
191 }
192
193 static void io_worker_exit(struct io_worker *worker)
194 {
195         struct io_wqe *wqe = worker->wqe;
196         struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
197         unsigned nr_workers;
198
199         /*
200          * If we're not at zero, someone else is holding a brief reference
201          * to the worker. Wait for that to go away.
202          */
203         set_current_state(TASK_INTERRUPTIBLE);
204         if (!refcount_dec_and_test(&worker->ref))
205                 schedule();
206         __set_current_state(TASK_RUNNING);
207
208         preempt_disable();
209         current->flags &= ~PF_IO_WORKER;
210         if (worker->flags & IO_WORKER_F_RUNNING)
211                 atomic_dec(&acct->nr_running);
212         if (!(worker->flags & IO_WORKER_F_BOUND))
213                 atomic_dec(&wqe->wq->user->processes);
214         worker->flags = 0;
215         preempt_enable();
216
217         spin_lock_irq(&wqe->lock);
218         hlist_nulls_del_rcu(&worker->nulls_node);
219         list_del_rcu(&worker->all_list);
220         if (__io_worker_unuse(wqe, worker)) {
221                 __release(&wqe->lock);
222                 spin_lock_irq(&wqe->lock);
223         }
224         acct->nr_workers--;
225         nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
226                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
227         spin_unlock_irq(&wqe->lock);
228
229         /* all workers gone, wq exit can proceed */
230         if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
231                 complete(&wqe->wq->done);
232
233         kfree_rcu(worker, rcu);
234 }
235
236 static inline bool io_wqe_run_queue(struct io_wqe *wqe)
237         __must_hold(wqe->lock)
238 {
239         if (!wq_list_empty(&wqe->work_list) &&
240             !(wqe->flags & IO_WQE_FLAG_STALLED))
241                 return true;
242         return false;
243 }
244
245 /*
246  * Check head of free list for an available worker. If one isn't available,
247  * caller must wake up the wq manager to create one.
248  */
249 static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
250         __must_hold(RCU)
251 {
252         struct hlist_nulls_node *n;
253         struct io_worker *worker;
254
255         n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
256         if (is_a_nulls(n))
257                 return false;
258
259         worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
260         if (io_worker_get(worker)) {
261                 wake_up(&worker->wait);
262                 io_worker_release(worker);
263                 return true;
264         }
265
266         return false;
267 }
268
269 /*
270  * We need a worker. If we find a free one, we're good. If not, and we're
271  * below the max number of workers, wake up the manager to create one.
272  */
273 static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
274 {
275         bool ret;
276
277         /*
278          * Most likely an attempt to queue unbounded work on an io_wq that
279          * wasn't setup with any unbounded workers.
280          */
281         WARN_ON_ONCE(!acct->max_workers);
282
283         rcu_read_lock();
284         ret = io_wqe_activate_free_worker(wqe);
285         rcu_read_unlock();
286
287         if (!ret && acct->nr_workers < acct->max_workers)
288                 wake_up_process(wqe->wq->manager);
289 }
290
291 static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
292 {
293         struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
294
295         atomic_inc(&acct->nr_running);
296 }
297
298 static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
299         __must_hold(wqe->lock)
300 {
301         struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
302
303         if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
304                 io_wqe_wake_worker(wqe, acct);
305 }
306
307 static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
308 {
309         allow_kernel_signal(SIGINT);
310
311         current->flags |= PF_IO_WORKER;
312
313         worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
314         worker->restore_files = current->files;
315         io_wqe_inc_running(wqe, worker);
316 }
317
318 /*
319  * Worker will start processing some work. Move it to the busy list, if
320  * it's currently on the freelist
321  */
322 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
323                              struct io_wq_work *work)
324         __must_hold(wqe->lock)
325 {
326         bool worker_bound, work_bound;
327
328         if (worker->flags & IO_WORKER_F_FREE) {
329                 worker->flags &= ~IO_WORKER_F_FREE;
330                 hlist_nulls_del_init_rcu(&worker->nulls_node);
331                 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->busy_list);
332         }
333
334         /*
335          * If worker is moving from bound to unbound (or vice versa), then
336          * ensure we update the running accounting.
337          */
338         worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
339         work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
340         if (worker_bound != work_bound) {
341                 io_wqe_dec_running(wqe, worker);
342                 if (work_bound) {
343                         worker->flags |= IO_WORKER_F_BOUND;
344                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
345                         wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
346                         atomic_dec(&wqe->wq->user->processes);
347                 } else {
348                         worker->flags &= ~IO_WORKER_F_BOUND;
349                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
350                         wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
351                         atomic_inc(&wqe->wq->user->processes);
352                 }
353                 io_wqe_inc_running(wqe, worker);
354          }
355 }
356
357 /*
358  * No work, worker going to sleep. Move to freelist, and unuse mm if we
359  * have one attached. Dropping the mm may potentially sleep, so we drop
360  * the lock in that case and return success. Since the caller has to
361  * retry the loop in that case (we changed task state), we don't regrab
362  * the lock if we return success.
363  */
364 static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
365         __must_hold(wqe->lock)
366 {
367         if (!(worker->flags & IO_WORKER_F_FREE)) {
368                 worker->flags |= IO_WORKER_F_FREE;
369                 hlist_nulls_del_init_rcu(&worker->nulls_node);
370                 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
371         }
372
373         return __io_worker_unuse(wqe, worker);
374 }
375
376 static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
377         __must_hold(wqe->lock)
378 {
379         struct io_wq_work_node *node, *prev;
380         struct io_wq_work *work;
381
382         wq_list_for_each(node, prev, &wqe->work_list) {
383                 work = container_of(node, struct io_wq_work, list);
384
385                 /* not hashed, can run anytime */
386                 if (!(work->flags & IO_WQ_WORK_HASHED)) {
387                         wq_node_del(&wqe->work_list, node, prev);
388                         return work;
389                 }
390
391                 /* hashed, can run if not already running */
392                 *hash = work->flags >> IO_WQ_HASH_SHIFT;
393                 if (!(wqe->hash_map & BIT_ULL(*hash))) {
394                         wqe->hash_map |= BIT_ULL(*hash);
395                         wq_node_del(&wqe->work_list, node, prev);
396                         return work;
397                 }
398         }
399
400         return NULL;
401 }
402
403 static void io_worker_handle_work(struct io_worker *worker)
404         __releases(wqe->lock)
405 {
406         struct io_wq_work *work, *old_work = NULL, *put_work = NULL;
407         struct io_wqe *wqe = worker->wqe;
408         struct io_wq *wq = wqe->wq;
409
410         do {
411                 unsigned hash = -1U;
412
413                 /*
414                  * If we got some work, mark us as busy. If we didn't, but
415                  * the list isn't empty, it means we stalled on hashed work.
416                  * Mark us stalled so we don't keep looking for work when we
417                  * can't make progress, any work completion or insertion will
418                  * clear the stalled flag.
419                  */
420                 work = io_get_next_work(wqe, &hash);
421                 if (work)
422                         __io_worker_busy(wqe, worker, work);
423                 else if (!wq_list_empty(&wqe->work_list))
424                         wqe->flags |= IO_WQE_FLAG_STALLED;
425
426                 spin_unlock_irq(&wqe->lock);
427                 if (put_work && wq->put_work)
428                         wq->put_work(old_work);
429                 if (!work)
430                         break;
431 next:
432                 /* flush any pending signals before assigning new work */
433                 if (signal_pending(current))
434                         flush_signals(current);
435
436                 spin_lock_irq(&worker->lock);
437                 worker->cur_work = work;
438                 spin_unlock_irq(&worker->lock);
439
440                 if (work->flags & IO_WQ_WORK_CB)
441                         work->func(&work);
442
443                 if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
444                     current->files != work->files) {
445                         task_lock(current);
446                         current->files = work->files;
447                         task_unlock(current);
448                 }
449                 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
450                     wq->mm && mmget_not_zero(wq->mm)) {
451                         use_mm(wq->mm);
452                         set_fs(USER_DS);
453                         worker->mm = wq->mm;
454                 }
455                 if (!worker->creds)
456                         worker->creds = override_creds(wq->creds);
457                 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
458                         work->flags |= IO_WQ_WORK_CANCEL;
459                 if (worker->mm)
460                         work->flags |= IO_WQ_WORK_HAS_MM;
461
462                 if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) {
463                         put_work = work;
464                         wq->get_work(work);
465                 }
466
467                 old_work = work;
468                 work->func(&work);
469
470                 spin_lock_irq(&worker->lock);
471                 worker->cur_work = NULL;
472                 spin_unlock_irq(&worker->lock);
473
474                 spin_lock_irq(&wqe->lock);
475
476                 if (hash != -1U) {
477                         wqe->hash_map &= ~BIT_ULL(hash);
478                         wqe->flags &= ~IO_WQE_FLAG_STALLED;
479                 }
480                 if (work && work != old_work) {
481                         spin_unlock_irq(&wqe->lock);
482
483                         if (put_work && wq->put_work) {
484                                 wq->put_work(put_work);
485                                 put_work = NULL;
486                         }
487
488                         /* dependent work not hashed */
489                         hash = -1U;
490                         goto next;
491                 }
492         } while (1);
493 }
494
495 static int io_wqe_worker(void *data)
496 {
497         struct io_worker *worker = data;
498         struct io_wqe *wqe = worker->wqe;
499         struct io_wq *wq = wqe->wq;
500         DEFINE_WAIT(wait);
501
502         io_worker_start(wqe, worker);
503
504         while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
505                 prepare_to_wait(&worker->wait, &wait, TASK_INTERRUPTIBLE);
506
507                 spin_lock_irq(&wqe->lock);
508                 if (io_wqe_run_queue(wqe)) {
509                         __set_current_state(TASK_RUNNING);
510                         io_worker_handle_work(worker);
511                         continue;
512                 }
513                 /* drops the lock on success, retry */
514                 if (__io_worker_idle(wqe, worker)) {
515                         __release(&wqe->lock);
516                         continue;
517                 }
518                 spin_unlock_irq(&wqe->lock);
519                 if (signal_pending(current))
520                         flush_signals(current);
521                 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
522                         continue;
523                 /* timed out, exit unless we're the fixed worker */
524                 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
525                     !(worker->flags & IO_WORKER_F_FIXED))
526                         break;
527         }
528
529         finish_wait(&worker->wait, &wait);
530
531         if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
532                 spin_lock_irq(&wqe->lock);
533                 if (!wq_list_empty(&wqe->work_list))
534                         io_worker_handle_work(worker);
535                 else
536                         spin_unlock_irq(&wqe->lock);
537         }
538
539         io_worker_exit(worker);
540         return 0;
541 }
542
543 /*
544  * Called when a worker is scheduled in. Mark us as currently running.
545  */
546 void io_wq_worker_running(struct task_struct *tsk)
547 {
548         struct io_worker *worker = kthread_data(tsk);
549         struct io_wqe *wqe = worker->wqe;
550
551         if (!(worker->flags & IO_WORKER_F_UP))
552                 return;
553         if (worker->flags & IO_WORKER_F_RUNNING)
554                 return;
555         worker->flags |= IO_WORKER_F_RUNNING;
556         io_wqe_inc_running(wqe, worker);
557 }
558
559 /*
560  * Called when worker is going to sleep. If there are no workers currently
561  * running and we have work pending, wake up a free one or have the manager
562  * set one up.
563  */
564 void io_wq_worker_sleeping(struct task_struct *tsk)
565 {
566         struct io_worker *worker = kthread_data(tsk);
567         struct io_wqe *wqe = worker->wqe;
568
569         if (!(worker->flags & IO_WORKER_F_UP))
570                 return;
571         if (!(worker->flags & IO_WORKER_F_RUNNING))
572                 return;
573
574         worker->flags &= ~IO_WORKER_F_RUNNING;
575
576         spin_lock_irq(&wqe->lock);
577         io_wqe_dec_running(wqe, worker);
578         spin_unlock_irq(&wqe->lock);
579 }
580
581 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
582 {
583         struct io_wqe_acct *acct =&wqe->acct[index];
584         struct io_worker *worker;
585
586         worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
587         if (!worker)
588                 return false;
589
590         refcount_set(&worker->ref, 1);
591         worker->nulls_node.pprev = NULL;
592         init_waitqueue_head(&worker->wait);
593         worker->wqe = wqe;
594         spin_lock_init(&worker->lock);
595
596         worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
597                                 "io_wqe_worker-%d/%d", index, wqe->node);
598         if (IS_ERR(worker->task)) {
599                 kfree(worker);
600                 return false;
601         }
602
603         spin_lock_irq(&wqe->lock);
604         hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
605         list_add_tail_rcu(&worker->all_list, &wqe->all_list);
606         worker->flags |= IO_WORKER_F_FREE;
607         if (index == IO_WQ_ACCT_BOUND)
608                 worker->flags |= IO_WORKER_F_BOUND;
609         if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
610                 worker->flags |= IO_WORKER_F_FIXED;
611         acct->nr_workers++;
612         spin_unlock_irq(&wqe->lock);
613
614         if (index == IO_WQ_ACCT_UNBOUND)
615                 atomic_inc(&wq->user->processes);
616
617         wake_up_process(worker->task);
618         return true;
619 }
620
621 static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
622         __must_hold(wqe->lock)
623 {
624         struct io_wqe_acct *acct = &wqe->acct[index];
625
626         /* if we have available workers or no work, no need */
627         if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
628                 return false;
629         return acct->nr_workers < acct->max_workers;
630 }
631
632 /*
633  * Manager thread. Tasked with creating new workers, if we need them.
634  */
635 static int io_wq_manager(void *data)
636 {
637         struct io_wq *wq = data;
638         int workers_to_create = num_possible_nodes();
639         int node;
640
641         /* create fixed workers */
642         refcount_set(&wq->refs, workers_to_create);
643         for_each_node(node) {
644                 if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
645                         goto err;
646                 workers_to_create--;
647         }
648
649         complete(&wq->done);
650
651         while (!kthread_should_stop()) {
652                 for_each_node(node) {
653                         struct io_wqe *wqe = wq->wqes[node];
654                         bool fork_worker[2] = { false, false };
655
656                         spin_lock_irq(&wqe->lock);
657                         if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
658                                 fork_worker[IO_WQ_ACCT_BOUND] = true;
659                         if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
660                                 fork_worker[IO_WQ_ACCT_UNBOUND] = true;
661                         spin_unlock_irq(&wqe->lock);
662                         if (fork_worker[IO_WQ_ACCT_BOUND])
663                                 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
664                         if (fork_worker[IO_WQ_ACCT_UNBOUND])
665                                 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
666                 }
667                 set_current_state(TASK_INTERRUPTIBLE);
668                 schedule_timeout(HZ);
669         }
670
671         return 0;
672 err:
673         set_bit(IO_WQ_BIT_ERROR, &wq->state);
674         set_bit(IO_WQ_BIT_EXIT, &wq->state);
675         if (refcount_sub_and_test(workers_to_create, &wq->refs))
676                 complete(&wq->done);
677         return 0;
678 }
679
680 static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
681                             struct io_wq_work *work)
682 {
683         bool free_worker;
684
685         if (!(work->flags & IO_WQ_WORK_UNBOUND))
686                 return true;
687         if (atomic_read(&acct->nr_running))
688                 return true;
689
690         rcu_read_lock();
691         free_worker = !hlist_nulls_empty(&wqe->free_list);
692         rcu_read_unlock();
693         if (free_worker)
694                 return true;
695
696         if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
697             !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
698                 return false;
699
700         return true;
701 }
702
703 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
704 {
705         struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
706         unsigned long flags;
707
708         /*
709          * Do early check to see if we need a new unbound worker, and if we do,
710          * if we're allowed to do so. This isn't 100% accurate as there's a
711          * gap between this check and incrementing the value, but that's OK.
712          * It's close enough to not be an issue, fork() has the same delay.
713          */
714         if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
715                 work->flags |= IO_WQ_WORK_CANCEL;
716                 work->func(&work);
717                 return;
718         }
719
720         spin_lock_irqsave(&wqe->lock, flags);
721         wq_list_add_tail(&work->list, &wqe->work_list);
722         wqe->flags &= ~IO_WQE_FLAG_STALLED;
723         spin_unlock_irqrestore(&wqe->lock, flags);
724
725         if (!atomic_read(&acct->nr_running))
726                 io_wqe_wake_worker(wqe, acct);
727 }
728
729 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
730 {
731         struct io_wqe *wqe = wq->wqes[numa_node_id()];
732
733         io_wqe_enqueue(wqe, work);
734 }
735
736 /*
737  * Enqueue work, hashed by some key. Work items that hash to the same value
738  * will not be done in parallel. Used to limit concurrent writes, generally
739  * hashed by inode.
740  */
741 void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
742 {
743         struct io_wqe *wqe = wq->wqes[numa_node_id()];
744         unsigned bit;
745
746
747         bit = hash_ptr(val, IO_WQ_HASH_ORDER);
748         work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
749         io_wqe_enqueue(wqe, work);
750 }
751
752 static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
753 {
754         send_sig(SIGINT, worker->task, 1);
755         return false;
756 }
757
758 /*
759  * Iterate the passed in list and call the specific function for each
760  * worker that isn't exiting
761  */
762 static bool io_wq_for_each_worker(struct io_wqe *wqe,
763                                   bool (*func)(struct io_worker *, void *),
764                                   void *data)
765 {
766         struct io_worker *worker;
767         bool ret = false;
768
769         list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
770                 if (io_worker_get(worker)) {
771                         ret = func(worker, data);
772                         io_worker_release(worker);
773                         if (ret)
774                                 break;
775                 }
776         }
777
778         return ret;
779 }
780
781 void io_wq_cancel_all(struct io_wq *wq)
782 {
783         int node;
784
785         set_bit(IO_WQ_BIT_CANCEL, &wq->state);
786
787         /*
788          * Browse both lists, as there's a gap between handing work off
789          * to a worker and the worker putting itself on the busy_list
790          */
791         rcu_read_lock();
792         for_each_node(node) {
793                 struct io_wqe *wqe = wq->wqes[node];
794
795                 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
796         }
797         rcu_read_unlock();
798 }
799
800 struct io_cb_cancel_data {
801         struct io_wqe *wqe;
802         work_cancel_fn *cancel;
803         void *caller_data;
804 };
805
806 static bool io_work_cancel(struct io_worker *worker, void *cancel_data)
807 {
808         struct io_cb_cancel_data *data = cancel_data;
809         unsigned long flags;
810         bool ret = false;
811
812         /*
813          * Hold the lock to avoid ->cur_work going out of scope, caller
814          * may dereference the passed in work.
815          */
816         spin_lock_irqsave(&worker->lock, flags);
817         if (worker->cur_work &&
818             data->cancel(worker->cur_work, data->caller_data)) {
819                 send_sig(SIGINT, worker->task, 1);
820                 ret = true;
821         }
822         spin_unlock_irqrestore(&worker->lock, flags);
823
824         return ret;
825 }
826
827 static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
828                                                work_cancel_fn *cancel,
829                                                void *cancel_data)
830 {
831         struct io_cb_cancel_data data = {
832                 .wqe = wqe,
833                 .cancel = cancel,
834                 .caller_data = cancel_data,
835         };
836         struct io_wq_work_node *node, *prev;
837         struct io_wq_work *work;
838         unsigned long flags;
839         bool found = false;
840
841         spin_lock_irqsave(&wqe->lock, flags);
842         wq_list_for_each(node, prev, &wqe->work_list) {
843                 work = container_of(node, struct io_wq_work, list);
844
845                 if (cancel(work, cancel_data)) {
846                         wq_node_del(&wqe->work_list, node, prev);
847                         found = true;
848                         break;
849                 }
850         }
851         spin_unlock_irqrestore(&wqe->lock, flags);
852
853         if (found) {
854                 work->flags |= IO_WQ_WORK_CANCEL;
855                 work->func(&work);
856                 return IO_WQ_CANCEL_OK;
857         }
858
859         rcu_read_lock();
860         found = io_wq_for_each_worker(wqe, io_work_cancel, &data);
861         rcu_read_unlock();
862         return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
863 }
864
865 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
866                                   void *data)
867 {
868         enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
869         int node;
870
871         for_each_node(node) {
872                 struct io_wqe *wqe = wq->wqes[node];
873
874                 ret = io_wqe_cancel_cb_work(wqe, cancel, data);
875                 if (ret != IO_WQ_CANCEL_NOTFOUND)
876                         break;
877         }
878
879         return ret;
880 }
881
882 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
883 {
884         struct io_wq_work *work = data;
885         unsigned long flags;
886         bool ret = false;
887
888         if (worker->cur_work != work)
889                 return false;
890
891         spin_lock_irqsave(&worker->lock, flags);
892         if (worker->cur_work == work) {
893                 send_sig(SIGINT, worker->task, 1);
894                 ret = true;
895         }
896         spin_unlock_irqrestore(&worker->lock, flags);
897
898         return ret;
899 }
900
901 static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
902                                             struct io_wq_work *cwork)
903 {
904         struct io_wq_work_node *node, *prev;
905         struct io_wq_work *work;
906         unsigned long flags;
907         bool found = false;
908
909         cwork->flags |= IO_WQ_WORK_CANCEL;
910
911         /*
912          * First check pending list, if we're lucky we can just remove it
913          * from there. CANCEL_OK means that the work is returned as-new,
914          * no completion will be posted for it.
915          */
916         spin_lock_irqsave(&wqe->lock, flags);
917         wq_list_for_each(node, prev, &wqe->work_list) {
918                 work = container_of(node, struct io_wq_work, list);
919
920                 if (work == cwork) {
921                         wq_node_del(&wqe->work_list, node, prev);
922                         found = true;
923                         break;
924                 }
925         }
926         spin_unlock_irqrestore(&wqe->lock, flags);
927
928         if (found) {
929                 work->flags |= IO_WQ_WORK_CANCEL;
930                 work->func(&work);
931                 return IO_WQ_CANCEL_OK;
932         }
933
934         /*
935          * Now check if a free (going busy) or busy worker has the work
936          * currently running. If we find it there, we'll return CANCEL_RUNNING
937          * as an indication that we attempte to signal cancellation. The
938          * completion will run normally in this case.
939          */
940         rcu_read_lock();
941         found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
942         rcu_read_unlock();
943         return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
944 }
945
946 enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
947 {
948         enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
949         int node;
950
951         for_each_node(node) {
952                 struct io_wqe *wqe = wq->wqes[node];
953
954                 ret = io_wqe_cancel_work(wqe, cwork);
955                 if (ret != IO_WQ_CANCEL_NOTFOUND)
956                         break;
957         }
958
959         return ret;
960 }
961
962 struct io_wq_flush_data {
963         struct io_wq_work work;
964         struct completion done;
965 };
966
967 static void io_wq_flush_func(struct io_wq_work **workptr)
968 {
969         struct io_wq_work *work = *workptr;
970         struct io_wq_flush_data *data;
971
972         data = container_of(work, struct io_wq_flush_data, work);
973         complete(&data->done);
974 }
975
976 /*
977  * Doesn't wait for previously queued work to finish. When this completes,
978  * it just means that previously queued work was started.
979  */
980 void io_wq_flush(struct io_wq *wq)
981 {
982         struct io_wq_flush_data data;
983         int node;
984
985         for_each_node(node) {
986                 struct io_wqe *wqe = wq->wqes[node];
987
988                 init_completion(&data.done);
989                 INIT_IO_WORK(&data.work, io_wq_flush_func);
990                 data.work.flags |= IO_WQ_WORK_INTERNAL;
991                 io_wqe_enqueue(wqe, &data.work);
992                 wait_for_completion(&data.done);
993         }
994 }
995
996 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
997 {
998         int ret = -ENOMEM, node;
999         struct io_wq *wq;
1000
1001         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1002         if (!wq)
1003                 return ERR_PTR(-ENOMEM);
1004
1005         wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
1006         if (!wq->wqes) {
1007                 kfree(wq);
1008                 return ERR_PTR(-ENOMEM);
1009         }
1010
1011         wq->get_work = data->get_work;
1012         wq->put_work = data->put_work;
1013
1014         /* caller must already hold a reference to this */
1015         wq->user = data->user;
1016         wq->creds = data->creds;
1017
1018         for_each_node(node) {
1019                 struct io_wqe *wqe;
1020
1021                 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
1022                 if (!wqe)
1023                         goto err;
1024                 wq->wqes[node] = wqe;
1025                 wqe->node = node;
1026                 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1027                 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
1028                 if (wq->user) {
1029                         wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1030                                         task_rlimit(current, RLIMIT_NPROC);
1031                 }
1032                 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
1033                 wqe->node = node;
1034                 wqe->wq = wq;
1035                 spin_lock_init(&wqe->lock);
1036                 INIT_WQ_LIST(&wqe->work_list);
1037                 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1038                 INIT_HLIST_NULLS_HEAD(&wqe->busy_list, 1);
1039                 INIT_LIST_HEAD(&wqe->all_list);
1040         }
1041
1042         init_completion(&wq->done);
1043
1044         /* caller must have already done mmgrab() on this mm */
1045         wq->mm = data->mm;
1046
1047         wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1048         if (!IS_ERR(wq->manager)) {
1049                 wake_up_process(wq->manager);
1050                 wait_for_completion(&wq->done);
1051                 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1052                         ret = -ENOMEM;
1053                         goto err;
1054                 }
1055                 reinit_completion(&wq->done);
1056                 return wq;
1057         }
1058
1059         ret = PTR_ERR(wq->manager);
1060         complete(&wq->done);
1061 err:
1062         for_each_node(node)
1063                 kfree(wq->wqes[node]);
1064         kfree(wq->wqes);
1065         kfree(wq);
1066         return ERR_PTR(ret);
1067 }
1068
1069 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
1070 {
1071         wake_up_process(worker->task);
1072         return false;
1073 }
1074
1075 void io_wq_destroy(struct io_wq *wq)
1076 {
1077         int node;
1078
1079         set_bit(IO_WQ_BIT_EXIT, &wq->state);
1080         if (wq->manager)
1081                 kthread_stop(wq->manager);
1082
1083         rcu_read_lock();
1084         for_each_node(node)
1085                 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
1086         rcu_read_unlock();
1087
1088         wait_for_completion(&wq->done);
1089
1090         for_each_node(node)
1091                 kfree(wq->wqes[node]);
1092         kfree(wq->wqes);
1093         kfree(wq);
1094 }