Merge tag 'sound-5.5-rc3' of git://git.kernel.org/pub/scm/linux/kernel/git/tiwai...
[sfrench/cifs-2.6.git] / fs / io-wq.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Basic worker thread pool for io_uring
4  *
5  * Copyright (C) 2019 Jens Axboe
6  *
7  */
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
12 #include <linux/mm.h>
13 #include <linux/mmu_context.h>
14 #include <linux/sched/mm.h>
15 #include <linux/percpu.h>
16 #include <linux/slab.h>
17 #include <linux/kthread.h>
18 #include <linux/rculist_nulls.h>
19
20 #include "io-wq.h"
21
22 #define WORKER_IDLE_TIMEOUT     (5 * HZ)
23
24 enum {
25         IO_WORKER_F_UP          = 1,    /* up and active */
26         IO_WORKER_F_RUNNING     = 2,    /* account as running */
27         IO_WORKER_F_FREE        = 4,    /* worker on free list */
28         IO_WORKER_F_EXITING     = 8,    /* worker exiting */
29         IO_WORKER_F_FIXED       = 16,   /* static idle worker */
30         IO_WORKER_F_BOUND       = 32,   /* is doing bounded work */
31 };
32
33 enum {
34         IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
35         IO_WQ_BIT_CANCEL        = 1,    /* cancel work on list */
36         IO_WQ_BIT_ERROR         = 2,    /* error on setup */
37 };
38
39 enum {
40         IO_WQE_FLAG_STALLED     = 1,    /* stalled on hash */
41 };
42
43 /*
44  * One for each thread in a wqe pool
45  */
46 struct io_worker {
47         refcount_t ref;
48         unsigned flags;
49         struct hlist_nulls_node nulls_node;
50         struct list_head all_list;
51         struct task_struct *task;
52         struct io_wqe *wqe;
53
54         struct io_wq_work *cur_work;
55         spinlock_t lock;
56
57         struct rcu_head rcu;
58         struct mm_struct *mm;
59         const struct cred *creds;
60         struct files_struct *restore_files;
61 };
62
63 #if BITS_PER_LONG == 64
64 #define IO_WQ_HASH_ORDER        6
65 #else
66 #define IO_WQ_HASH_ORDER        5
67 #endif
68
69 struct io_wqe_acct {
70         unsigned nr_workers;
71         unsigned max_workers;
72         atomic_t nr_running;
73 };
74
75 enum {
76         IO_WQ_ACCT_BOUND,
77         IO_WQ_ACCT_UNBOUND,
78 };
79
80 /*
81  * Per-node worker thread pool
82  */
83 struct io_wqe {
84         struct {
85                 spinlock_t lock;
86                 struct io_wq_work_list work_list;
87                 unsigned long hash_map;
88                 unsigned flags;
89         } ____cacheline_aligned_in_smp;
90
91         int node;
92         struct io_wqe_acct acct[2];
93
94         struct hlist_nulls_head free_list;
95         struct hlist_nulls_head busy_list;
96         struct list_head all_list;
97
98         struct io_wq *wq;
99 };
100
101 /*
102  * Per io_wq state
103   */
104 struct io_wq {
105         struct io_wqe **wqes;
106         unsigned long state;
107
108         get_work_fn *get_work;
109         put_work_fn *put_work;
110
111         struct task_struct *manager;
112         struct user_struct *user;
113         const struct cred *creds;
114         struct mm_struct *mm;
115         refcount_t refs;
116         struct completion done;
117 };
118
119 static bool io_worker_get(struct io_worker *worker)
120 {
121         return refcount_inc_not_zero(&worker->ref);
122 }
123
124 static void io_worker_release(struct io_worker *worker)
125 {
126         if (refcount_dec_and_test(&worker->ref))
127                 wake_up_process(worker->task);
128 }
129
130 /*
131  * Note: drops the wqe->lock if returning true! The caller must re-acquire
132  * the lock in that case. Some callers need to restart handling if this
133  * happens, so we can't just re-acquire the lock on behalf of the caller.
134  */
135 static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
136 {
137         bool dropped_lock = false;
138
139         if (worker->creds) {
140                 revert_creds(worker->creds);
141                 worker->creds = NULL;
142         }
143
144         if (current->files != worker->restore_files) {
145                 __acquire(&wqe->lock);
146                 spin_unlock_irq(&wqe->lock);
147                 dropped_lock = true;
148
149                 task_lock(current);
150                 current->files = worker->restore_files;
151                 task_unlock(current);
152         }
153
154         /*
155          * If we have an active mm, we need to drop the wq lock before unusing
156          * it. If we do, return true and let the caller retry the idle loop.
157          */
158         if (worker->mm) {
159                 if (!dropped_lock) {
160                         __acquire(&wqe->lock);
161                         spin_unlock_irq(&wqe->lock);
162                         dropped_lock = true;
163                 }
164                 __set_current_state(TASK_RUNNING);
165                 set_fs(KERNEL_DS);
166                 unuse_mm(worker->mm);
167                 mmput(worker->mm);
168                 worker->mm = NULL;
169         }
170
171         return dropped_lock;
172 }
173
174 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
175                                                    struct io_wq_work *work)
176 {
177         if (work->flags & IO_WQ_WORK_UNBOUND)
178                 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
179
180         return &wqe->acct[IO_WQ_ACCT_BOUND];
181 }
182
183 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
184                                                   struct io_worker *worker)
185 {
186         if (worker->flags & IO_WORKER_F_BOUND)
187                 return &wqe->acct[IO_WQ_ACCT_BOUND];
188
189         return &wqe->acct[IO_WQ_ACCT_UNBOUND];
190 }
191
192 static void io_worker_exit(struct io_worker *worker)
193 {
194         struct io_wqe *wqe = worker->wqe;
195         struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
196         unsigned nr_workers;
197
198         /*
199          * If we're not at zero, someone else is holding a brief reference
200          * to the worker. Wait for that to go away.
201          */
202         set_current_state(TASK_INTERRUPTIBLE);
203         if (!refcount_dec_and_test(&worker->ref))
204                 schedule();
205         __set_current_state(TASK_RUNNING);
206
207         preempt_disable();
208         current->flags &= ~PF_IO_WORKER;
209         if (worker->flags & IO_WORKER_F_RUNNING)
210                 atomic_dec(&acct->nr_running);
211         if (!(worker->flags & IO_WORKER_F_BOUND))
212                 atomic_dec(&wqe->wq->user->processes);
213         worker->flags = 0;
214         preempt_enable();
215
216         spin_lock_irq(&wqe->lock);
217         hlist_nulls_del_rcu(&worker->nulls_node);
218         list_del_rcu(&worker->all_list);
219         if (__io_worker_unuse(wqe, worker)) {
220                 __release(&wqe->lock);
221                 spin_lock_irq(&wqe->lock);
222         }
223         acct->nr_workers--;
224         nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
225                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
226         spin_unlock_irq(&wqe->lock);
227
228         /* all workers gone, wq exit can proceed */
229         if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
230                 complete(&wqe->wq->done);
231
232         kfree_rcu(worker, rcu);
233 }
234
235 static inline bool io_wqe_run_queue(struct io_wqe *wqe)
236         __must_hold(wqe->lock)
237 {
238         if (!wq_list_empty(&wqe->work_list) &&
239             !(wqe->flags & IO_WQE_FLAG_STALLED))
240                 return true;
241         return false;
242 }
243
244 /*
245  * Check head of free list for an available worker. If one isn't available,
246  * caller must wake up the wq manager to create one.
247  */
248 static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
249         __must_hold(RCU)
250 {
251         struct hlist_nulls_node *n;
252         struct io_worker *worker;
253
254         n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
255         if (is_a_nulls(n))
256                 return false;
257
258         worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
259         if (io_worker_get(worker)) {
260                 wake_up_process(worker->task);
261                 io_worker_release(worker);
262                 return true;
263         }
264
265         return false;
266 }
267
268 /*
269  * We need a worker. If we find a free one, we're good. If not, and we're
270  * below the max number of workers, wake up the manager to create one.
271  */
272 static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
273 {
274         bool ret;
275
276         /*
277          * Most likely an attempt to queue unbounded work on an io_wq that
278          * wasn't setup with any unbounded workers.
279          */
280         WARN_ON_ONCE(!acct->max_workers);
281
282         rcu_read_lock();
283         ret = io_wqe_activate_free_worker(wqe);
284         rcu_read_unlock();
285
286         if (!ret && acct->nr_workers < acct->max_workers)
287                 wake_up_process(wqe->wq->manager);
288 }
289
290 static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
291 {
292         struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
293
294         atomic_inc(&acct->nr_running);
295 }
296
297 static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
298         __must_hold(wqe->lock)
299 {
300         struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
301
302         if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
303                 io_wqe_wake_worker(wqe, acct);
304 }
305
306 static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
307 {
308         allow_kernel_signal(SIGINT);
309
310         current->flags |= PF_IO_WORKER;
311
312         worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
313         worker->restore_files = current->files;
314         io_wqe_inc_running(wqe, worker);
315 }
316
317 /*
318  * Worker will start processing some work. Move it to the busy list, if
319  * it's currently on the freelist
320  */
321 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
322                              struct io_wq_work *work)
323         __must_hold(wqe->lock)
324 {
325         bool worker_bound, work_bound;
326
327         if (worker->flags & IO_WORKER_F_FREE) {
328                 worker->flags &= ~IO_WORKER_F_FREE;
329                 hlist_nulls_del_init_rcu(&worker->nulls_node);
330                 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->busy_list);
331         }
332
333         /*
334          * If worker is moving from bound to unbound (or vice versa), then
335          * ensure we update the running accounting.
336          */
337         worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
338         work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
339         if (worker_bound != work_bound) {
340                 io_wqe_dec_running(wqe, worker);
341                 if (work_bound) {
342                         worker->flags |= IO_WORKER_F_BOUND;
343                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
344                         wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
345                         atomic_dec(&wqe->wq->user->processes);
346                 } else {
347                         worker->flags &= ~IO_WORKER_F_BOUND;
348                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
349                         wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
350                         atomic_inc(&wqe->wq->user->processes);
351                 }
352                 io_wqe_inc_running(wqe, worker);
353          }
354 }
355
356 /*
357  * No work, worker going to sleep. Move to freelist, and unuse mm if we
358  * have one attached. Dropping the mm may potentially sleep, so we drop
359  * the lock in that case and return success. Since the caller has to
360  * retry the loop in that case (we changed task state), we don't regrab
361  * the lock if we return success.
362  */
363 static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
364         __must_hold(wqe->lock)
365 {
366         if (!(worker->flags & IO_WORKER_F_FREE)) {
367                 worker->flags |= IO_WORKER_F_FREE;
368                 hlist_nulls_del_init_rcu(&worker->nulls_node);
369                 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
370         }
371
372         return __io_worker_unuse(wqe, worker);
373 }
374
375 static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
376         __must_hold(wqe->lock)
377 {
378         struct io_wq_work_node *node, *prev;
379         struct io_wq_work *work;
380
381         wq_list_for_each(node, prev, &wqe->work_list) {
382                 work = container_of(node, struct io_wq_work, list);
383
384                 /* not hashed, can run anytime */
385                 if (!(work->flags & IO_WQ_WORK_HASHED)) {
386                         wq_node_del(&wqe->work_list, node, prev);
387                         return work;
388                 }
389
390                 /* hashed, can run if not already running */
391                 *hash = work->flags >> IO_WQ_HASH_SHIFT;
392                 if (!(wqe->hash_map & BIT_ULL(*hash))) {
393                         wqe->hash_map |= BIT_ULL(*hash);
394                         wq_node_del(&wqe->work_list, node, prev);
395                         return work;
396                 }
397         }
398
399         return NULL;
400 }
401
402 static void io_worker_handle_work(struct io_worker *worker)
403         __releases(wqe->lock)
404 {
405         struct io_wq_work *work, *old_work = NULL, *put_work = NULL;
406         struct io_wqe *wqe = worker->wqe;
407         struct io_wq *wq = wqe->wq;
408
409         do {
410                 unsigned hash = -1U;
411
412                 /*
413                  * If we got some work, mark us as busy. If we didn't, but
414                  * the list isn't empty, it means we stalled on hashed work.
415                  * Mark us stalled so we don't keep looking for work when we
416                  * can't make progress, any work completion or insertion will
417                  * clear the stalled flag.
418                  */
419                 work = io_get_next_work(wqe, &hash);
420                 if (work)
421                         __io_worker_busy(wqe, worker, work);
422                 else if (!wq_list_empty(&wqe->work_list))
423                         wqe->flags |= IO_WQE_FLAG_STALLED;
424
425                 spin_unlock_irq(&wqe->lock);
426                 if (put_work && wq->put_work)
427                         wq->put_work(old_work);
428                 if (!work)
429                         break;
430 next:
431                 /* flush any pending signals before assigning new work */
432                 if (signal_pending(current))
433                         flush_signals(current);
434
435                 spin_lock_irq(&worker->lock);
436                 worker->cur_work = work;
437                 spin_unlock_irq(&worker->lock);
438
439                 if (work->flags & IO_WQ_WORK_CB)
440                         work->func(&work);
441
442                 if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
443                     current->files != work->files) {
444                         task_lock(current);
445                         current->files = work->files;
446                         task_unlock(current);
447                 }
448                 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
449                     wq->mm && mmget_not_zero(wq->mm)) {
450                         use_mm(wq->mm);
451                         set_fs(USER_DS);
452                         worker->mm = wq->mm;
453                 }
454                 if (!worker->creds)
455                         worker->creds = override_creds(wq->creds);
456                 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
457                         work->flags |= IO_WQ_WORK_CANCEL;
458                 if (worker->mm)
459                         work->flags |= IO_WQ_WORK_HAS_MM;
460
461                 if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) {
462                         put_work = work;
463                         wq->get_work(work);
464                 }
465
466                 old_work = work;
467                 work->func(&work);
468
469                 spin_lock_irq(&worker->lock);
470                 worker->cur_work = NULL;
471                 spin_unlock_irq(&worker->lock);
472
473                 spin_lock_irq(&wqe->lock);
474
475                 if (hash != -1U) {
476                         wqe->hash_map &= ~BIT_ULL(hash);
477                         wqe->flags &= ~IO_WQE_FLAG_STALLED;
478                 }
479                 if (work && work != old_work) {
480                         spin_unlock_irq(&wqe->lock);
481
482                         if (put_work && wq->put_work) {
483                                 wq->put_work(put_work);
484                                 put_work = NULL;
485                         }
486
487                         /* dependent work not hashed */
488                         hash = -1U;
489                         goto next;
490                 }
491         } while (1);
492 }
493
494 static inline void io_worker_spin_for_work(struct io_wqe *wqe)
495 {
496         int i = 0;
497
498         while (++i < 1000) {
499                 if (io_wqe_run_queue(wqe))
500                         break;
501                 if (need_resched())
502                         break;
503                 cpu_relax();
504         }
505 }
506
507 static int io_wqe_worker(void *data)
508 {
509         struct io_worker *worker = data;
510         struct io_wqe *wqe = worker->wqe;
511         struct io_wq *wq = wqe->wq;
512         bool did_work;
513
514         io_worker_start(wqe, worker);
515
516         did_work = false;
517         while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
518                 set_current_state(TASK_INTERRUPTIBLE);
519 loop:
520                 if (did_work)
521                         io_worker_spin_for_work(wqe);
522                 spin_lock_irq(&wqe->lock);
523                 if (io_wqe_run_queue(wqe)) {
524                         __set_current_state(TASK_RUNNING);
525                         io_worker_handle_work(worker);
526                         did_work = true;
527                         goto loop;
528                 }
529                 did_work = false;
530                 /* drops the lock on success, retry */
531                 if (__io_worker_idle(wqe, worker)) {
532                         __release(&wqe->lock);
533                         goto loop;
534                 }
535                 spin_unlock_irq(&wqe->lock);
536                 if (signal_pending(current))
537                         flush_signals(current);
538                 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
539                         continue;
540                 /* timed out, exit unless we're the fixed worker */
541                 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
542                     !(worker->flags & IO_WORKER_F_FIXED))
543                         break;
544         }
545
546         if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
547                 spin_lock_irq(&wqe->lock);
548                 if (!wq_list_empty(&wqe->work_list))
549                         io_worker_handle_work(worker);
550                 else
551                         spin_unlock_irq(&wqe->lock);
552         }
553
554         io_worker_exit(worker);
555         return 0;
556 }
557
558 /*
559  * Called when a worker is scheduled in. Mark us as currently running.
560  */
561 void io_wq_worker_running(struct task_struct *tsk)
562 {
563         struct io_worker *worker = kthread_data(tsk);
564         struct io_wqe *wqe = worker->wqe;
565
566         if (!(worker->flags & IO_WORKER_F_UP))
567                 return;
568         if (worker->flags & IO_WORKER_F_RUNNING)
569                 return;
570         worker->flags |= IO_WORKER_F_RUNNING;
571         io_wqe_inc_running(wqe, worker);
572 }
573
574 /*
575  * Called when worker is going to sleep. If there are no workers currently
576  * running and we have work pending, wake up a free one or have the manager
577  * set one up.
578  */
579 void io_wq_worker_sleeping(struct task_struct *tsk)
580 {
581         struct io_worker *worker = kthread_data(tsk);
582         struct io_wqe *wqe = worker->wqe;
583
584         if (!(worker->flags & IO_WORKER_F_UP))
585                 return;
586         if (!(worker->flags & IO_WORKER_F_RUNNING))
587                 return;
588
589         worker->flags &= ~IO_WORKER_F_RUNNING;
590
591         spin_lock_irq(&wqe->lock);
592         io_wqe_dec_running(wqe, worker);
593         spin_unlock_irq(&wqe->lock);
594 }
595
596 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
597 {
598         struct io_wqe_acct *acct =&wqe->acct[index];
599         struct io_worker *worker;
600
601         worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
602         if (!worker)
603                 return false;
604
605         refcount_set(&worker->ref, 1);
606         worker->nulls_node.pprev = NULL;
607         worker->wqe = wqe;
608         spin_lock_init(&worker->lock);
609
610         worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
611                                 "io_wqe_worker-%d/%d", index, wqe->node);
612         if (IS_ERR(worker->task)) {
613                 kfree(worker);
614                 return false;
615         }
616
617         spin_lock_irq(&wqe->lock);
618         hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
619         list_add_tail_rcu(&worker->all_list, &wqe->all_list);
620         worker->flags |= IO_WORKER_F_FREE;
621         if (index == IO_WQ_ACCT_BOUND)
622                 worker->flags |= IO_WORKER_F_BOUND;
623         if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
624                 worker->flags |= IO_WORKER_F_FIXED;
625         acct->nr_workers++;
626         spin_unlock_irq(&wqe->lock);
627
628         if (index == IO_WQ_ACCT_UNBOUND)
629                 atomic_inc(&wq->user->processes);
630
631         wake_up_process(worker->task);
632         return true;
633 }
634
635 static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
636         __must_hold(wqe->lock)
637 {
638         struct io_wqe_acct *acct = &wqe->acct[index];
639
640         /* if we have available workers or no work, no need */
641         if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
642                 return false;
643         return acct->nr_workers < acct->max_workers;
644 }
645
646 /*
647  * Manager thread. Tasked with creating new workers, if we need them.
648  */
649 static int io_wq_manager(void *data)
650 {
651         struct io_wq *wq = data;
652         int workers_to_create = num_possible_nodes();
653         int node;
654
655         /* create fixed workers */
656         refcount_set(&wq->refs, workers_to_create);
657         for_each_node(node) {
658                 if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
659                         goto err;
660                 workers_to_create--;
661         }
662
663         complete(&wq->done);
664
665         while (!kthread_should_stop()) {
666                 for_each_node(node) {
667                         struct io_wqe *wqe = wq->wqes[node];
668                         bool fork_worker[2] = { false, false };
669
670                         spin_lock_irq(&wqe->lock);
671                         if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
672                                 fork_worker[IO_WQ_ACCT_BOUND] = true;
673                         if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
674                                 fork_worker[IO_WQ_ACCT_UNBOUND] = true;
675                         spin_unlock_irq(&wqe->lock);
676                         if (fork_worker[IO_WQ_ACCT_BOUND])
677                                 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
678                         if (fork_worker[IO_WQ_ACCT_UNBOUND])
679                                 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
680                 }
681                 set_current_state(TASK_INTERRUPTIBLE);
682                 schedule_timeout(HZ);
683         }
684
685         return 0;
686 err:
687         set_bit(IO_WQ_BIT_ERROR, &wq->state);
688         set_bit(IO_WQ_BIT_EXIT, &wq->state);
689         if (refcount_sub_and_test(workers_to_create, &wq->refs))
690                 complete(&wq->done);
691         return 0;
692 }
693
694 static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
695                             struct io_wq_work *work)
696 {
697         bool free_worker;
698
699         if (!(work->flags & IO_WQ_WORK_UNBOUND))
700                 return true;
701         if (atomic_read(&acct->nr_running))
702                 return true;
703
704         rcu_read_lock();
705         free_worker = !hlist_nulls_empty(&wqe->free_list);
706         rcu_read_unlock();
707         if (free_worker)
708                 return true;
709
710         if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
711             !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
712                 return false;
713
714         return true;
715 }
716
717 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
718 {
719         struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
720         unsigned long flags;
721
722         /*
723          * Do early check to see if we need a new unbound worker, and if we do,
724          * if we're allowed to do so. This isn't 100% accurate as there's a
725          * gap between this check and incrementing the value, but that's OK.
726          * It's close enough to not be an issue, fork() has the same delay.
727          */
728         if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
729                 work->flags |= IO_WQ_WORK_CANCEL;
730                 work->func(&work);
731                 return;
732         }
733
734         spin_lock_irqsave(&wqe->lock, flags);
735         wq_list_add_tail(&work->list, &wqe->work_list);
736         wqe->flags &= ~IO_WQE_FLAG_STALLED;
737         spin_unlock_irqrestore(&wqe->lock, flags);
738
739         if (!atomic_read(&acct->nr_running))
740                 io_wqe_wake_worker(wqe, acct);
741 }
742
743 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
744 {
745         struct io_wqe *wqe = wq->wqes[numa_node_id()];
746
747         io_wqe_enqueue(wqe, work);
748 }
749
750 /*
751  * Enqueue work, hashed by some key. Work items that hash to the same value
752  * will not be done in parallel. Used to limit concurrent writes, generally
753  * hashed by inode.
754  */
755 void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
756 {
757         struct io_wqe *wqe = wq->wqes[numa_node_id()];
758         unsigned bit;
759
760
761         bit = hash_ptr(val, IO_WQ_HASH_ORDER);
762         work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
763         io_wqe_enqueue(wqe, work);
764 }
765
766 static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
767 {
768         send_sig(SIGINT, worker->task, 1);
769         return false;
770 }
771
772 /*
773  * Iterate the passed in list and call the specific function for each
774  * worker that isn't exiting
775  */
776 static bool io_wq_for_each_worker(struct io_wqe *wqe,
777                                   bool (*func)(struct io_worker *, void *),
778                                   void *data)
779 {
780         struct io_worker *worker;
781         bool ret = false;
782
783         list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
784                 if (io_worker_get(worker)) {
785                         ret = func(worker, data);
786                         io_worker_release(worker);
787                         if (ret)
788                                 break;
789                 }
790         }
791
792         return ret;
793 }
794
795 void io_wq_cancel_all(struct io_wq *wq)
796 {
797         int node;
798
799         set_bit(IO_WQ_BIT_CANCEL, &wq->state);
800
801         /*
802          * Browse both lists, as there's a gap between handing work off
803          * to a worker and the worker putting itself on the busy_list
804          */
805         rcu_read_lock();
806         for_each_node(node) {
807                 struct io_wqe *wqe = wq->wqes[node];
808
809                 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
810         }
811         rcu_read_unlock();
812 }
813
814 struct io_cb_cancel_data {
815         struct io_wqe *wqe;
816         work_cancel_fn *cancel;
817         void *caller_data;
818 };
819
820 static bool io_work_cancel(struct io_worker *worker, void *cancel_data)
821 {
822         struct io_cb_cancel_data *data = cancel_data;
823         unsigned long flags;
824         bool ret = false;
825
826         /*
827          * Hold the lock to avoid ->cur_work going out of scope, caller
828          * may dereference the passed in work.
829          */
830         spin_lock_irqsave(&worker->lock, flags);
831         if (worker->cur_work &&
832             data->cancel(worker->cur_work, data->caller_data)) {
833                 send_sig(SIGINT, worker->task, 1);
834                 ret = true;
835         }
836         spin_unlock_irqrestore(&worker->lock, flags);
837
838         return ret;
839 }
840
841 static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
842                                                work_cancel_fn *cancel,
843                                                void *cancel_data)
844 {
845         struct io_cb_cancel_data data = {
846                 .wqe = wqe,
847                 .cancel = cancel,
848                 .caller_data = cancel_data,
849         };
850         struct io_wq_work_node *node, *prev;
851         struct io_wq_work *work;
852         unsigned long flags;
853         bool found = false;
854
855         spin_lock_irqsave(&wqe->lock, flags);
856         wq_list_for_each(node, prev, &wqe->work_list) {
857                 work = container_of(node, struct io_wq_work, list);
858
859                 if (cancel(work, cancel_data)) {
860                         wq_node_del(&wqe->work_list, node, prev);
861                         found = true;
862                         break;
863                 }
864         }
865         spin_unlock_irqrestore(&wqe->lock, flags);
866
867         if (found) {
868                 work->flags |= IO_WQ_WORK_CANCEL;
869                 work->func(&work);
870                 return IO_WQ_CANCEL_OK;
871         }
872
873         rcu_read_lock();
874         found = io_wq_for_each_worker(wqe, io_work_cancel, &data);
875         rcu_read_unlock();
876         return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
877 }
878
879 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
880                                   void *data)
881 {
882         enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
883         int node;
884
885         for_each_node(node) {
886                 struct io_wqe *wqe = wq->wqes[node];
887
888                 ret = io_wqe_cancel_cb_work(wqe, cancel, data);
889                 if (ret != IO_WQ_CANCEL_NOTFOUND)
890                         break;
891         }
892
893         return ret;
894 }
895
896 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
897 {
898         struct io_wq_work *work = data;
899         unsigned long flags;
900         bool ret = false;
901
902         if (worker->cur_work != work)
903                 return false;
904
905         spin_lock_irqsave(&worker->lock, flags);
906         if (worker->cur_work == work) {
907                 send_sig(SIGINT, worker->task, 1);
908                 ret = true;
909         }
910         spin_unlock_irqrestore(&worker->lock, flags);
911
912         return ret;
913 }
914
915 static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
916                                             struct io_wq_work *cwork)
917 {
918         struct io_wq_work_node *node, *prev;
919         struct io_wq_work *work;
920         unsigned long flags;
921         bool found = false;
922
923         cwork->flags |= IO_WQ_WORK_CANCEL;
924
925         /*
926          * First check pending list, if we're lucky we can just remove it
927          * from there. CANCEL_OK means that the work is returned as-new,
928          * no completion will be posted for it.
929          */
930         spin_lock_irqsave(&wqe->lock, flags);
931         wq_list_for_each(node, prev, &wqe->work_list) {
932                 work = container_of(node, struct io_wq_work, list);
933
934                 if (work == cwork) {
935                         wq_node_del(&wqe->work_list, node, prev);
936                         found = true;
937                         break;
938                 }
939         }
940         spin_unlock_irqrestore(&wqe->lock, flags);
941
942         if (found) {
943                 work->flags |= IO_WQ_WORK_CANCEL;
944                 work->func(&work);
945                 return IO_WQ_CANCEL_OK;
946         }
947
948         /*
949          * Now check if a free (going busy) or busy worker has the work
950          * currently running. If we find it there, we'll return CANCEL_RUNNING
951          * as an indication that we attempte to signal cancellation. The
952          * completion will run normally in this case.
953          */
954         rcu_read_lock();
955         found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
956         rcu_read_unlock();
957         return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
958 }
959
960 enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
961 {
962         enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
963         int node;
964
965         for_each_node(node) {
966                 struct io_wqe *wqe = wq->wqes[node];
967
968                 ret = io_wqe_cancel_work(wqe, cwork);
969                 if (ret != IO_WQ_CANCEL_NOTFOUND)
970                         break;
971         }
972
973         return ret;
974 }
975
976 struct io_wq_flush_data {
977         struct io_wq_work work;
978         struct completion done;
979 };
980
981 static void io_wq_flush_func(struct io_wq_work **workptr)
982 {
983         struct io_wq_work *work = *workptr;
984         struct io_wq_flush_data *data;
985
986         data = container_of(work, struct io_wq_flush_data, work);
987         complete(&data->done);
988 }
989
990 /*
991  * Doesn't wait for previously queued work to finish. When this completes,
992  * it just means that previously queued work was started.
993  */
994 void io_wq_flush(struct io_wq *wq)
995 {
996         struct io_wq_flush_data data;
997         int node;
998
999         for_each_node(node) {
1000                 struct io_wqe *wqe = wq->wqes[node];
1001
1002                 init_completion(&data.done);
1003                 INIT_IO_WORK(&data.work, io_wq_flush_func);
1004                 data.work.flags |= IO_WQ_WORK_INTERNAL;
1005                 io_wqe_enqueue(wqe, &data.work);
1006                 wait_for_completion(&data.done);
1007         }
1008 }
1009
1010 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1011 {
1012         int ret = -ENOMEM, node;
1013         struct io_wq *wq;
1014
1015         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1016         if (!wq)
1017                 return ERR_PTR(-ENOMEM);
1018
1019         wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
1020         if (!wq->wqes) {
1021                 kfree(wq);
1022                 return ERR_PTR(-ENOMEM);
1023         }
1024
1025         wq->get_work = data->get_work;
1026         wq->put_work = data->put_work;
1027
1028         /* caller must already hold a reference to this */
1029         wq->user = data->user;
1030         wq->creds = data->creds;
1031
1032         for_each_node(node) {
1033                 struct io_wqe *wqe;
1034
1035                 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
1036                 if (!wqe)
1037                         goto err;
1038                 wq->wqes[node] = wqe;
1039                 wqe->node = node;
1040                 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1041                 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
1042                 if (wq->user) {
1043                         wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1044                                         task_rlimit(current, RLIMIT_NPROC);
1045                 }
1046                 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
1047                 wqe->node = node;
1048                 wqe->wq = wq;
1049                 spin_lock_init(&wqe->lock);
1050                 INIT_WQ_LIST(&wqe->work_list);
1051                 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1052                 INIT_HLIST_NULLS_HEAD(&wqe->busy_list, 1);
1053                 INIT_LIST_HEAD(&wqe->all_list);
1054         }
1055
1056         init_completion(&wq->done);
1057
1058         /* caller must have already done mmgrab() on this mm */
1059         wq->mm = data->mm;
1060
1061         wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1062         if (!IS_ERR(wq->manager)) {
1063                 wake_up_process(wq->manager);
1064                 wait_for_completion(&wq->done);
1065                 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1066                         ret = -ENOMEM;
1067                         goto err;
1068                 }
1069                 reinit_completion(&wq->done);
1070                 return wq;
1071         }
1072
1073         ret = PTR_ERR(wq->manager);
1074         complete(&wq->done);
1075 err:
1076         for_each_node(node)
1077                 kfree(wq->wqes[node]);
1078         kfree(wq->wqes);
1079         kfree(wq);
1080         return ERR_PTR(ret);
1081 }
1082
1083 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
1084 {
1085         wake_up_process(worker->task);
1086         return false;
1087 }
1088
1089 void io_wq_destroy(struct io_wq *wq)
1090 {
1091         int node;
1092
1093         set_bit(IO_WQ_BIT_EXIT, &wq->state);
1094         if (wq->manager)
1095                 kthread_stop(wq->manager);
1096
1097         rcu_read_lock();
1098         for_each_node(node)
1099                 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
1100         rcu_read_unlock();
1101
1102         wait_for_completion(&wq->done);
1103
1104         for_each_node(node)
1105                 kfree(wq->wqes[node]);
1106         kfree(wq->wqes);
1107         kfree(wq);
1108 }