Merge tag 'kbuild-v5.2' of git://git.kernel.org/pub/scm/linux/kernel/git/masahiroy...
[sfrench/cifs-2.6.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73
74 #include <uapi/linux/io_uring.h>
75
76 #include "internal.h"
77
78 #define IORING_MAX_ENTRIES      4096
79 #define IORING_MAX_FIXED_FILES  1024
80
81 struct io_uring {
82         u32 head ____cacheline_aligned_in_smp;
83         u32 tail ____cacheline_aligned_in_smp;
84 };
85
86 /*
87  * This data is shared with the application through the mmap at offset
88  * IORING_OFF_SQ_RING.
89  *
90  * The offsets to the member fields are published through struct
91  * io_sqring_offsets when calling io_uring_setup.
92  */
93 struct io_sq_ring {
94         /*
95          * Head and tail offsets into the ring; the offsets need to be
96          * masked to get valid indices.
97          *
98          * The kernel controls head and the application controls tail.
99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_pages;
206 };
207
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 struct io_uring_sqe     *sq_sqes;
225
226                 struct list_head        defer_list;
227         } ____cacheline_aligned_in_smp;
228
229         /* IO offload */
230         struct workqueue_struct *sqo_wq;
231         struct task_struct      *sqo_thread;    /* if using sq thread polling */
232         struct mm_struct        *sqo_mm;
233         wait_queue_head_t       sqo_wait;
234         unsigned                sqo_stop;
235
236         struct {
237                 /* CQ ring */
238                 struct io_cq_ring       *cq_ring;
239                 unsigned                cached_cq_tail;
240                 unsigned                cq_entries;
241                 unsigned                cq_mask;
242                 struct wait_queue_head  cq_wait;
243                 struct fasync_struct    *cq_fasync;
244                 struct eventfd_ctx      *cq_ev_fd;
245         } ____cacheline_aligned_in_smp;
246
247         /*
248          * If used, fixed file set. Writers must ensure that ->refs is dead,
249          * readers must ensure that ->refs is alive as long as the file* is
250          * used. Only updated through io_uring_register(2).
251          */
252         struct file             **user_files;
253         unsigned                nr_user_files;
254
255         /* if used, fixed mapped user buffers */
256         unsigned                nr_user_bufs;
257         struct io_mapped_ubuf   *user_bufs;
258
259         struct user_struct      *user;
260
261         struct completion       ctx_done;
262
263         struct {
264                 struct mutex            uring_lock;
265                 wait_queue_head_t       wait;
266         } ____cacheline_aligned_in_smp;
267
268         struct {
269                 spinlock_t              completion_lock;
270                 bool                    poll_multi_file;
271                 /*
272                  * ->poll_list is protected by the ctx->uring_lock for
273                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
274                  * For SQPOLL, only the single threaded io_sq_thread() will
275                  * manipulate the list, hence no extra locking is needed there.
276                  */
277                 struct list_head        poll_list;
278                 struct list_head        cancel_list;
279         } ____cacheline_aligned_in_smp;
280
281         struct async_list       pending_async[2];
282
283 #if defined(CONFIG_UNIX)
284         struct socket           *ring_sock;
285 #endif
286 };
287
288 struct sqe_submit {
289         const struct io_uring_sqe       *sqe;
290         unsigned short                  index;
291         bool                            has_user;
292         bool                            needs_lock;
293         bool                            needs_fixed_file;
294 };
295
296 /*
297  * First field must be the file pointer in all the
298  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
299  */
300 struct io_poll_iocb {
301         struct file                     *file;
302         struct wait_queue_head          *head;
303         __poll_t                        events;
304         bool                            done;
305         bool                            canceled;
306         struct wait_queue_entry         wait;
307 };
308
309 /*
310  * NOTE! Each of the iocb union members has the file pointer
311  * as the first entry in their struct definition. So you can
312  * access the file pointer through any of the sub-structs,
313  * or directly as just 'ki_filp' in this struct.
314  */
315 struct io_kiocb {
316         union {
317                 struct file             *file;
318                 struct kiocb            rw;
319                 struct io_poll_iocb     poll;
320         };
321
322         struct sqe_submit       submit;
323
324         struct io_ring_ctx      *ctx;
325         struct list_head        list;
326         unsigned int            flags;
327         refcount_t              refs;
328 #define REQ_F_NOWAIT            1       /* must not punt to workers */
329 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
330 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
331 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
332 #define REQ_F_PREPPED           16      /* prep already done */
333 #define REQ_F_IO_DRAIN          32      /* drain existing IO first */
334 #define REQ_F_IO_DRAINED        64      /* drain done */
335         u64                     user_data;
336         u32                     error;  /* iopoll result from callback */
337         u32                     sequence;
338
339         struct work_struct      work;
340 };
341
342 #define IO_PLUG_THRESHOLD               2
343 #define IO_IOPOLL_BATCH                 8
344
345 struct io_submit_state {
346         struct blk_plug         plug;
347
348         /*
349          * io_kiocb alloc cache
350          */
351         void                    *reqs[IO_IOPOLL_BATCH];
352         unsigned                int free_reqs;
353         unsigned                int cur_req;
354
355         /*
356          * File reference cache
357          */
358         struct file             *file;
359         unsigned int            fd;
360         unsigned int            has_refs;
361         unsigned int            used_refs;
362         unsigned int            ios_left;
363 };
364
365 static void io_sq_wq_submit_work(struct work_struct *work);
366
367 static struct kmem_cache *req_cachep;
368
369 static const struct file_operations io_uring_fops;
370
371 struct sock *io_uring_get_socket(struct file *file)
372 {
373 #if defined(CONFIG_UNIX)
374         if (file->f_op == &io_uring_fops) {
375                 struct io_ring_ctx *ctx = file->private_data;
376
377                 return ctx->ring_sock->sk;
378         }
379 #endif
380         return NULL;
381 }
382 EXPORT_SYMBOL(io_uring_get_socket);
383
384 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
385 {
386         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
387
388         complete(&ctx->ctx_done);
389 }
390
391 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
392 {
393         struct io_ring_ctx *ctx;
394         int i;
395
396         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
397         if (!ctx)
398                 return NULL;
399
400         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
401                 kfree(ctx);
402                 return NULL;
403         }
404
405         ctx->flags = p->flags;
406         init_waitqueue_head(&ctx->cq_wait);
407         init_completion(&ctx->ctx_done);
408         mutex_init(&ctx->uring_lock);
409         init_waitqueue_head(&ctx->wait);
410         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
411                 spin_lock_init(&ctx->pending_async[i].lock);
412                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
413                 atomic_set(&ctx->pending_async[i].cnt, 0);
414         }
415         spin_lock_init(&ctx->completion_lock);
416         INIT_LIST_HEAD(&ctx->poll_list);
417         INIT_LIST_HEAD(&ctx->cancel_list);
418         INIT_LIST_HEAD(&ctx->defer_list);
419         return ctx;
420 }
421
422 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
423                                      struct io_kiocb *req)
424 {
425         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
426                 return false;
427
428         return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
429 }
430
431 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
432 {
433         struct io_kiocb *req;
434
435         if (list_empty(&ctx->defer_list))
436                 return NULL;
437
438         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
439         if (!io_sequence_defer(ctx, req)) {
440                 list_del_init(&req->list);
441                 return req;
442         }
443
444         return NULL;
445 }
446
447 static void __io_commit_cqring(struct io_ring_ctx *ctx)
448 {
449         struct io_cq_ring *ring = ctx->cq_ring;
450
451         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
452                 /* order cqe stores with ring update */
453                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
454
455                 if (wq_has_sleeper(&ctx->cq_wait)) {
456                         wake_up_interruptible(&ctx->cq_wait);
457                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
458                 }
459         }
460 }
461
462 static void io_commit_cqring(struct io_ring_ctx *ctx)
463 {
464         struct io_kiocb *req;
465
466         __io_commit_cqring(ctx);
467
468         while ((req = io_get_deferred_req(ctx)) != NULL) {
469                 req->flags |= REQ_F_IO_DRAINED;
470                 queue_work(ctx->sqo_wq, &req->work);
471         }
472 }
473
474 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
475 {
476         struct io_cq_ring *ring = ctx->cq_ring;
477         unsigned tail;
478
479         tail = ctx->cached_cq_tail;
480         /*
481          * writes to the cq entry need to come after reading head; the
482          * control dependency is enough as we're using WRITE_ONCE to
483          * fill the cq entry
484          */
485         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
486                 return NULL;
487
488         ctx->cached_cq_tail++;
489         return &ring->cqes[tail & ctx->cq_mask];
490 }
491
492 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
493                                  long res, unsigned ev_flags)
494 {
495         struct io_uring_cqe *cqe;
496
497         /*
498          * If we can't get a cq entry, userspace overflowed the
499          * submission (by quite a lot). Increment the overflow count in
500          * the ring.
501          */
502         cqe = io_get_cqring(ctx);
503         if (cqe) {
504                 WRITE_ONCE(cqe->user_data, ki_user_data);
505                 WRITE_ONCE(cqe->res, res);
506                 WRITE_ONCE(cqe->flags, ev_flags);
507         } else {
508                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
509
510                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
511         }
512 }
513
514 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
515 {
516         if (waitqueue_active(&ctx->wait))
517                 wake_up(&ctx->wait);
518         if (waitqueue_active(&ctx->sqo_wait))
519                 wake_up(&ctx->sqo_wait);
520         if (ctx->cq_ev_fd)
521                 eventfd_signal(ctx->cq_ev_fd, 1);
522 }
523
524 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
525                                 long res, unsigned ev_flags)
526 {
527         unsigned long flags;
528
529         spin_lock_irqsave(&ctx->completion_lock, flags);
530         io_cqring_fill_event(ctx, user_data, res, ev_flags);
531         io_commit_cqring(ctx);
532         spin_unlock_irqrestore(&ctx->completion_lock, flags);
533
534         io_cqring_ev_posted(ctx);
535 }
536
537 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
538 {
539         percpu_ref_put_many(&ctx->refs, refs);
540
541         if (waitqueue_active(&ctx->wait))
542                 wake_up(&ctx->wait);
543 }
544
545 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
546                                    struct io_submit_state *state)
547 {
548         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
549         struct io_kiocb *req;
550
551         if (!percpu_ref_tryget(&ctx->refs))
552                 return NULL;
553
554         if (!state) {
555                 req = kmem_cache_alloc(req_cachep, gfp);
556                 if (unlikely(!req))
557                         goto out;
558         } else if (!state->free_reqs) {
559                 size_t sz;
560                 int ret;
561
562                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
563                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
564
565                 /*
566                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
567                  * retry single alloc to be on the safe side.
568                  */
569                 if (unlikely(ret <= 0)) {
570                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
571                         if (!state->reqs[0])
572                                 goto out;
573                         ret = 1;
574                 }
575                 state->free_reqs = ret - 1;
576                 state->cur_req = 1;
577                 req = state->reqs[0];
578         } else {
579                 req = state->reqs[state->cur_req];
580                 state->free_reqs--;
581                 state->cur_req++;
582         }
583
584         req->ctx = ctx;
585         req->flags = 0;
586         /* one is dropped after submission, the other at completion */
587         refcount_set(&req->refs, 2);
588         return req;
589 out:
590         io_ring_drop_ctx_refs(ctx, 1);
591         return NULL;
592 }
593
594 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
595 {
596         if (*nr) {
597                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
598                 io_ring_drop_ctx_refs(ctx, *nr);
599                 *nr = 0;
600         }
601 }
602
603 static void io_free_req(struct io_kiocb *req)
604 {
605         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
606                 fput(req->file);
607         io_ring_drop_ctx_refs(req->ctx, 1);
608         kmem_cache_free(req_cachep, req);
609 }
610
611 static void io_put_req(struct io_kiocb *req)
612 {
613         if (refcount_dec_and_test(&req->refs))
614                 io_free_req(req);
615 }
616
617 /*
618  * Find and free completed poll iocbs
619  */
620 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
621                                struct list_head *done)
622 {
623         void *reqs[IO_IOPOLL_BATCH];
624         struct io_kiocb *req;
625         int to_free;
626
627         to_free = 0;
628         while (!list_empty(done)) {
629                 req = list_first_entry(done, struct io_kiocb, list);
630                 list_del(&req->list);
631
632                 io_cqring_fill_event(ctx, req->user_data, req->error, 0);
633                 (*nr_events)++;
634
635                 if (refcount_dec_and_test(&req->refs)) {
636                         /* If we're not using fixed files, we have to pair the
637                          * completion part with the file put. Use regular
638                          * completions for those, only batch free for fixed
639                          * file.
640                          */
641                         if (req->flags & REQ_F_FIXED_FILE) {
642                                 reqs[to_free++] = req;
643                                 if (to_free == ARRAY_SIZE(reqs))
644                                         io_free_req_many(ctx, reqs, &to_free);
645                         } else {
646                                 io_free_req(req);
647                         }
648                 }
649         }
650
651         io_commit_cqring(ctx);
652         io_free_req_many(ctx, reqs, &to_free);
653 }
654
655 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
656                         long min)
657 {
658         struct io_kiocb *req, *tmp;
659         LIST_HEAD(done);
660         bool spin;
661         int ret;
662
663         /*
664          * Only spin for completions if we don't have multiple devices hanging
665          * off our complete list, and we're under the requested amount.
666          */
667         spin = !ctx->poll_multi_file && *nr_events < min;
668
669         ret = 0;
670         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
671                 struct kiocb *kiocb = &req->rw;
672
673                 /*
674                  * Move completed entries to our local list. If we find a
675                  * request that requires polling, break out and complete
676                  * the done list first, if we have entries there.
677                  */
678                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
679                         list_move_tail(&req->list, &done);
680                         continue;
681                 }
682                 if (!list_empty(&done))
683                         break;
684
685                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
686                 if (ret < 0)
687                         break;
688
689                 if (ret && spin)
690                         spin = false;
691                 ret = 0;
692         }
693
694         if (!list_empty(&done))
695                 io_iopoll_complete(ctx, nr_events, &done);
696
697         return ret;
698 }
699
700 /*
701  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
702  * non-spinning poll check - we'll still enter the driver poll loop, but only
703  * as a non-spinning completion check.
704  */
705 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
706                                 long min)
707 {
708         while (!list_empty(&ctx->poll_list)) {
709                 int ret;
710
711                 ret = io_do_iopoll(ctx, nr_events, min);
712                 if (ret < 0)
713                         return ret;
714                 if (!min || *nr_events >= min)
715                         return 0;
716         }
717
718         return 1;
719 }
720
721 /*
722  * We can't just wait for polled events to come to us, we have to actively
723  * find and complete them.
724  */
725 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
726 {
727         if (!(ctx->flags & IORING_SETUP_IOPOLL))
728                 return;
729
730         mutex_lock(&ctx->uring_lock);
731         while (!list_empty(&ctx->poll_list)) {
732                 unsigned int nr_events = 0;
733
734                 io_iopoll_getevents(ctx, &nr_events, 1);
735         }
736         mutex_unlock(&ctx->uring_lock);
737 }
738
739 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
740                            long min)
741 {
742         int ret = 0;
743
744         do {
745                 int tmin = 0;
746
747                 if (*nr_events < min)
748                         tmin = min - *nr_events;
749
750                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
751                 if (ret <= 0)
752                         break;
753                 ret = 0;
754         } while (min && !*nr_events && !need_resched());
755
756         return ret;
757 }
758
759 static void kiocb_end_write(struct kiocb *kiocb)
760 {
761         if (kiocb->ki_flags & IOCB_WRITE) {
762                 struct inode *inode = file_inode(kiocb->ki_filp);
763
764                 /*
765                  * Tell lockdep we inherited freeze protection from submission
766                  * thread.
767                  */
768                 if (S_ISREG(inode->i_mode))
769                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
770                 file_end_write(kiocb->ki_filp);
771         }
772 }
773
774 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
775 {
776         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
777
778         kiocb_end_write(kiocb);
779
780         io_cqring_add_event(req->ctx, req->user_data, res, 0);
781         io_put_req(req);
782 }
783
784 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
785 {
786         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
787
788         kiocb_end_write(kiocb);
789
790         req->error = res;
791         if (res != -EAGAIN)
792                 req->flags |= REQ_F_IOPOLL_COMPLETED;
793 }
794
795 /*
796  * After the iocb has been issued, it's safe to be found on the poll list.
797  * Adding the kiocb to the list AFTER submission ensures that we don't
798  * find it from a io_iopoll_getevents() thread before the issuer is done
799  * accessing the kiocb cookie.
800  */
801 static void io_iopoll_req_issued(struct io_kiocb *req)
802 {
803         struct io_ring_ctx *ctx = req->ctx;
804
805         /*
806          * Track whether we have multiple files in our lists. This will impact
807          * how we do polling eventually, not spinning if we're on potentially
808          * different devices.
809          */
810         if (list_empty(&ctx->poll_list)) {
811                 ctx->poll_multi_file = false;
812         } else if (!ctx->poll_multi_file) {
813                 struct io_kiocb *list_req;
814
815                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
816                                                 list);
817                 if (list_req->rw.ki_filp != req->rw.ki_filp)
818                         ctx->poll_multi_file = true;
819         }
820
821         /*
822          * For fast devices, IO may have already completed. If it has, add
823          * it to the front so we find it first.
824          */
825         if (req->flags & REQ_F_IOPOLL_COMPLETED)
826                 list_add(&req->list, &ctx->poll_list);
827         else
828                 list_add_tail(&req->list, &ctx->poll_list);
829 }
830
831 static void io_file_put(struct io_submit_state *state)
832 {
833         if (state->file) {
834                 int diff = state->has_refs - state->used_refs;
835
836                 if (diff)
837                         fput_many(state->file, diff);
838                 state->file = NULL;
839         }
840 }
841
842 /*
843  * Get as many references to a file as we have IOs left in this submission,
844  * assuming most submissions are for one file, or at least that each file
845  * has more than one submission.
846  */
847 static struct file *io_file_get(struct io_submit_state *state, int fd)
848 {
849         if (!state)
850                 return fget(fd);
851
852         if (state->file) {
853                 if (state->fd == fd) {
854                         state->used_refs++;
855                         state->ios_left--;
856                         return state->file;
857                 }
858                 io_file_put(state);
859         }
860         state->file = fget_many(fd, state->ios_left);
861         if (!state->file)
862                 return NULL;
863
864         state->fd = fd;
865         state->has_refs = state->ios_left;
866         state->used_refs = 1;
867         state->ios_left--;
868         return state->file;
869 }
870
871 /*
872  * If we tracked the file through the SCM inflight mechanism, we could support
873  * any file. For now, just ensure that anything potentially problematic is done
874  * inline.
875  */
876 static bool io_file_supports_async(struct file *file)
877 {
878         umode_t mode = file_inode(file)->i_mode;
879
880         if (S_ISBLK(mode) || S_ISCHR(mode))
881                 return true;
882         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
883                 return true;
884
885         return false;
886 }
887
888 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
889                       bool force_nonblock)
890 {
891         const struct io_uring_sqe *sqe = s->sqe;
892         struct io_ring_ctx *ctx = req->ctx;
893         struct kiocb *kiocb = &req->rw;
894         unsigned ioprio;
895         int ret;
896
897         if (!req->file)
898                 return -EBADF;
899         /* For -EAGAIN retry, everything is already prepped */
900         if (req->flags & REQ_F_PREPPED)
901                 return 0;
902
903         if (force_nonblock && !io_file_supports_async(req->file))
904                 force_nonblock = false;
905
906         kiocb->ki_pos = READ_ONCE(sqe->off);
907         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
908         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
909
910         ioprio = READ_ONCE(sqe->ioprio);
911         if (ioprio) {
912                 ret = ioprio_check_cap(ioprio);
913                 if (ret)
914                         return ret;
915
916                 kiocb->ki_ioprio = ioprio;
917         } else
918                 kiocb->ki_ioprio = get_current_ioprio();
919
920         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
921         if (unlikely(ret))
922                 return ret;
923
924         /* don't allow async punt if RWF_NOWAIT was requested */
925         if (kiocb->ki_flags & IOCB_NOWAIT)
926                 req->flags |= REQ_F_NOWAIT;
927
928         if (force_nonblock)
929                 kiocb->ki_flags |= IOCB_NOWAIT;
930
931         if (ctx->flags & IORING_SETUP_IOPOLL) {
932                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
933                     !kiocb->ki_filp->f_op->iopoll)
934                         return -EOPNOTSUPP;
935
936                 req->error = 0;
937                 kiocb->ki_flags |= IOCB_HIPRI;
938                 kiocb->ki_complete = io_complete_rw_iopoll;
939         } else {
940                 if (kiocb->ki_flags & IOCB_HIPRI)
941                         return -EINVAL;
942                 kiocb->ki_complete = io_complete_rw;
943         }
944         req->flags |= REQ_F_PREPPED;
945         return 0;
946 }
947
948 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
949 {
950         switch (ret) {
951         case -EIOCBQUEUED:
952                 break;
953         case -ERESTARTSYS:
954         case -ERESTARTNOINTR:
955         case -ERESTARTNOHAND:
956         case -ERESTART_RESTARTBLOCK:
957                 /*
958                  * We can't just restart the syscall, since previously
959                  * submitted sqes may already be in progress. Just fail this
960                  * IO with EINTR.
961                  */
962                 ret = -EINTR;
963                 /* fall through */
964         default:
965                 kiocb->ki_complete(kiocb, ret, 0);
966         }
967 }
968
969 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
970                            const struct io_uring_sqe *sqe,
971                            struct iov_iter *iter)
972 {
973         size_t len = READ_ONCE(sqe->len);
974         struct io_mapped_ubuf *imu;
975         unsigned index, buf_index;
976         size_t offset;
977         u64 buf_addr;
978
979         /* attempt to use fixed buffers without having provided iovecs */
980         if (unlikely(!ctx->user_bufs))
981                 return -EFAULT;
982
983         buf_index = READ_ONCE(sqe->buf_index);
984         if (unlikely(buf_index >= ctx->nr_user_bufs))
985                 return -EFAULT;
986
987         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
988         imu = &ctx->user_bufs[index];
989         buf_addr = READ_ONCE(sqe->addr);
990
991         /* overflow */
992         if (buf_addr + len < buf_addr)
993                 return -EFAULT;
994         /* not inside the mapped region */
995         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
996                 return -EFAULT;
997
998         /*
999          * May not be a start of buffer, set size appropriately
1000          * and advance us to the beginning.
1001          */
1002         offset = buf_addr - imu->ubuf;
1003         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1004         if (offset)
1005                 iov_iter_advance(iter, offset);
1006
1007         /* don't drop a reference to these pages */
1008         iter->type |= ITER_BVEC_FLAG_NO_REF;
1009         return 0;
1010 }
1011
1012 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
1013                            const struct sqe_submit *s, struct iovec **iovec,
1014                            struct iov_iter *iter)
1015 {
1016         const struct io_uring_sqe *sqe = s->sqe;
1017         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1018         size_t sqe_len = READ_ONCE(sqe->len);
1019         u8 opcode;
1020
1021         /*
1022          * We're reading ->opcode for the second time, but the first read
1023          * doesn't care whether it's _FIXED or not, so it doesn't matter
1024          * whether ->opcode changes concurrently. The first read does care
1025          * about whether it is a READ or a WRITE, so we don't trust this read
1026          * for that purpose and instead let the caller pass in the read/write
1027          * flag.
1028          */
1029         opcode = READ_ONCE(sqe->opcode);
1030         if (opcode == IORING_OP_READ_FIXED ||
1031             opcode == IORING_OP_WRITE_FIXED) {
1032                 int ret = io_import_fixed(ctx, rw, sqe, iter);
1033                 *iovec = NULL;
1034                 return ret;
1035         }
1036
1037         if (!s->has_user)
1038                 return -EFAULT;
1039
1040 #ifdef CONFIG_COMPAT
1041         if (ctx->compat)
1042                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1043                                                 iovec, iter);
1044 #endif
1045
1046         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1047 }
1048
1049 /*
1050  * Make a note of the last file/offset/direction we punted to async
1051  * context. We'll use this information to see if we can piggy back a
1052  * sequential request onto the previous one, if it's still hasn't been
1053  * completed by the async worker.
1054  */
1055 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1056 {
1057         struct async_list *async_list = &req->ctx->pending_async[rw];
1058         struct kiocb *kiocb = &req->rw;
1059         struct file *filp = kiocb->ki_filp;
1060         off_t io_end = kiocb->ki_pos + len;
1061
1062         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1063                 unsigned long max_pages;
1064
1065                 /* Use 8x RA size as a decent limiter for both reads/writes */
1066                 max_pages = filp->f_ra.ra_pages;
1067                 if (!max_pages)
1068                         max_pages = VM_READAHEAD_PAGES;
1069                 max_pages *= 8;
1070
1071                 /* If max pages are exceeded, reset the state */
1072                 len >>= PAGE_SHIFT;
1073                 if (async_list->io_pages + len <= max_pages) {
1074                         req->flags |= REQ_F_SEQ_PREV;
1075                         async_list->io_pages += len;
1076                 } else {
1077                         io_end = 0;
1078                         async_list->io_pages = 0;
1079                 }
1080         }
1081
1082         /* New file? Reset state. */
1083         if (async_list->file != filp) {
1084                 async_list->io_pages = 0;
1085                 async_list->file = filp;
1086         }
1087         async_list->io_end = io_end;
1088 }
1089
1090 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1091                    bool force_nonblock)
1092 {
1093         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1094         struct kiocb *kiocb = &req->rw;
1095         struct iov_iter iter;
1096         struct file *file;
1097         size_t iov_count;
1098         int ret;
1099
1100         ret = io_prep_rw(req, s, force_nonblock);
1101         if (ret)
1102                 return ret;
1103         file = kiocb->ki_filp;
1104
1105         if (unlikely(!(file->f_mode & FMODE_READ)))
1106                 return -EBADF;
1107         if (unlikely(!file->f_op->read_iter))
1108                 return -EINVAL;
1109
1110         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1111         if (ret)
1112                 return ret;
1113
1114         iov_count = iov_iter_count(&iter);
1115         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1116         if (!ret) {
1117                 ssize_t ret2;
1118
1119                 /* Catch -EAGAIN return for forced non-blocking submission */
1120                 ret2 = call_read_iter(file, kiocb, &iter);
1121                 if (!force_nonblock || ret2 != -EAGAIN) {
1122                         io_rw_done(kiocb, ret2);
1123                 } else {
1124                         /*
1125                          * If ->needs_lock is true, we're already in async
1126                          * context.
1127                          */
1128                         if (!s->needs_lock)
1129                                 io_async_list_note(READ, req, iov_count);
1130                         ret = -EAGAIN;
1131                 }
1132         }
1133         kfree(iovec);
1134         return ret;
1135 }
1136
1137 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1138                     bool force_nonblock)
1139 {
1140         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1141         struct kiocb *kiocb = &req->rw;
1142         struct iov_iter iter;
1143         struct file *file;
1144         size_t iov_count;
1145         int ret;
1146
1147         ret = io_prep_rw(req, s, force_nonblock);
1148         if (ret)
1149                 return ret;
1150
1151         file = kiocb->ki_filp;
1152         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1153                 return -EBADF;
1154         if (unlikely(!file->f_op->write_iter))
1155                 return -EINVAL;
1156
1157         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1158         if (ret)
1159                 return ret;
1160
1161         iov_count = iov_iter_count(&iter);
1162
1163         ret = -EAGAIN;
1164         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1165                 /* If ->needs_lock is true, we're already in async context. */
1166                 if (!s->needs_lock)
1167                         io_async_list_note(WRITE, req, iov_count);
1168                 goto out_free;
1169         }
1170
1171         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1172         if (!ret) {
1173                 ssize_t ret2;
1174
1175                 /*
1176                  * Open-code file_start_write here to grab freeze protection,
1177                  * which will be released by another thread in
1178                  * io_complete_rw().  Fool lockdep by telling it the lock got
1179                  * released so that it doesn't complain about the held lock when
1180                  * we return to userspace.
1181                  */
1182                 if (S_ISREG(file_inode(file)->i_mode)) {
1183                         __sb_start_write(file_inode(file)->i_sb,
1184                                                 SB_FREEZE_WRITE, true);
1185                         __sb_writers_release(file_inode(file)->i_sb,
1186                                                 SB_FREEZE_WRITE);
1187                 }
1188                 kiocb->ki_flags |= IOCB_WRITE;
1189
1190                 ret2 = call_write_iter(file, kiocb, &iter);
1191                 if (!force_nonblock || ret2 != -EAGAIN) {
1192                         io_rw_done(kiocb, ret2);
1193                 } else {
1194                         /*
1195                          * If ->needs_lock is true, we're already in async
1196                          * context.
1197                          */
1198                         if (!s->needs_lock)
1199                                 io_async_list_note(WRITE, req, iov_count);
1200                         ret = -EAGAIN;
1201                 }
1202         }
1203 out_free:
1204         kfree(iovec);
1205         return ret;
1206 }
1207
1208 /*
1209  * IORING_OP_NOP just posts a completion event, nothing else.
1210  */
1211 static int io_nop(struct io_kiocb *req, u64 user_data)
1212 {
1213         struct io_ring_ctx *ctx = req->ctx;
1214         long err = 0;
1215
1216         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1217                 return -EINVAL;
1218
1219         io_cqring_add_event(ctx, user_data, err, 0);
1220         io_put_req(req);
1221         return 0;
1222 }
1223
1224 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1225 {
1226         struct io_ring_ctx *ctx = req->ctx;
1227
1228         if (!req->file)
1229                 return -EBADF;
1230         /* Prep already done (EAGAIN retry) */
1231         if (req->flags & REQ_F_PREPPED)
1232                 return 0;
1233
1234         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1235                 return -EINVAL;
1236         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1237                 return -EINVAL;
1238
1239         req->flags |= REQ_F_PREPPED;
1240         return 0;
1241 }
1242
1243 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1244                     bool force_nonblock)
1245 {
1246         loff_t sqe_off = READ_ONCE(sqe->off);
1247         loff_t sqe_len = READ_ONCE(sqe->len);
1248         loff_t end = sqe_off + sqe_len;
1249         unsigned fsync_flags;
1250         int ret;
1251
1252         fsync_flags = READ_ONCE(sqe->fsync_flags);
1253         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1254                 return -EINVAL;
1255
1256         ret = io_prep_fsync(req, sqe);
1257         if (ret)
1258                 return ret;
1259
1260         /* fsync always requires a blocking context */
1261         if (force_nonblock)
1262                 return -EAGAIN;
1263
1264         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1265                                 end > 0 ? end : LLONG_MAX,
1266                                 fsync_flags & IORING_FSYNC_DATASYNC);
1267
1268         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1269         io_put_req(req);
1270         return 0;
1271 }
1272
1273 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1274 {
1275         struct io_ring_ctx *ctx = req->ctx;
1276         int ret = 0;
1277
1278         if (!req->file)
1279                 return -EBADF;
1280         /* Prep already done (EAGAIN retry) */
1281         if (req->flags & REQ_F_PREPPED)
1282                 return 0;
1283
1284         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1285                 return -EINVAL;
1286         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1287                 return -EINVAL;
1288
1289         req->flags |= REQ_F_PREPPED;
1290         return ret;
1291 }
1292
1293 static int io_sync_file_range(struct io_kiocb *req,
1294                               const struct io_uring_sqe *sqe,
1295                               bool force_nonblock)
1296 {
1297         loff_t sqe_off;
1298         loff_t sqe_len;
1299         unsigned flags;
1300         int ret;
1301
1302         ret = io_prep_sfr(req, sqe);
1303         if (ret)
1304                 return ret;
1305
1306         /* sync_file_range always requires a blocking context */
1307         if (force_nonblock)
1308                 return -EAGAIN;
1309
1310         sqe_off = READ_ONCE(sqe->off);
1311         sqe_len = READ_ONCE(sqe->len);
1312         flags = READ_ONCE(sqe->sync_range_flags);
1313
1314         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1315
1316         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1317         io_put_req(req);
1318         return 0;
1319 }
1320
1321 static void io_poll_remove_one(struct io_kiocb *req)
1322 {
1323         struct io_poll_iocb *poll = &req->poll;
1324
1325         spin_lock(&poll->head->lock);
1326         WRITE_ONCE(poll->canceled, true);
1327         if (!list_empty(&poll->wait.entry)) {
1328                 list_del_init(&poll->wait.entry);
1329                 queue_work(req->ctx->sqo_wq, &req->work);
1330         }
1331         spin_unlock(&poll->head->lock);
1332
1333         list_del_init(&req->list);
1334 }
1335
1336 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1337 {
1338         struct io_kiocb *req;
1339
1340         spin_lock_irq(&ctx->completion_lock);
1341         while (!list_empty(&ctx->cancel_list)) {
1342                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1343                 io_poll_remove_one(req);
1344         }
1345         spin_unlock_irq(&ctx->completion_lock);
1346 }
1347
1348 /*
1349  * Find a running poll command that matches one specified in sqe->addr,
1350  * and remove it if found.
1351  */
1352 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1353 {
1354         struct io_ring_ctx *ctx = req->ctx;
1355         struct io_kiocb *poll_req, *next;
1356         int ret = -ENOENT;
1357
1358         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1359                 return -EINVAL;
1360         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1361             sqe->poll_events)
1362                 return -EINVAL;
1363
1364         spin_lock_irq(&ctx->completion_lock);
1365         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1366                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1367                         io_poll_remove_one(poll_req);
1368                         ret = 0;
1369                         break;
1370                 }
1371         }
1372         spin_unlock_irq(&ctx->completion_lock);
1373
1374         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1375         io_put_req(req);
1376         return 0;
1377 }
1378
1379 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1380                              __poll_t mask)
1381 {
1382         req->poll.done = true;
1383         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask), 0);
1384         io_commit_cqring(ctx);
1385 }
1386
1387 static void io_poll_complete_work(struct work_struct *work)
1388 {
1389         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1390         struct io_poll_iocb *poll = &req->poll;
1391         struct poll_table_struct pt = { ._key = poll->events };
1392         struct io_ring_ctx *ctx = req->ctx;
1393         __poll_t mask = 0;
1394
1395         if (!READ_ONCE(poll->canceled))
1396                 mask = vfs_poll(poll->file, &pt) & poll->events;
1397
1398         /*
1399          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1400          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1401          * synchronize with them.  In the cancellation case the list_del_init
1402          * itself is not actually needed, but harmless so we keep it in to
1403          * avoid further branches in the fast path.
1404          */
1405         spin_lock_irq(&ctx->completion_lock);
1406         if (!mask && !READ_ONCE(poll->canceled)) {
1407                 add_wait_queue(poll->head, &poll->wait);
1408                 spin_unlock_irq(&ctx->completion_lock);
1409                 return;
1410         }
1411         list_del_init(&req->list);
1412         io_poll_complete(ctx, req, mask);
1413         spin_unlock_irq(&ctx->completion_lock);
1414
1415         io_cqring_ev_posted(ctx);
1416         io_put_req(req);
1417 }
1418
1419 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1420                         void *key)
1421 {
1422         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1423                                                         wait);
1424         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1425         struct io_ring_ctx *ctx = req->ctx;
1426         __poll_t mask = key_to_poll(key);
1427         unsigned long flags;
1428
1429         /* for instances that support it check for an event match first: */
1430         if (mask && !(mask & poll->events))
1431                 return 0;
1432
1433         list_del_init(&poll->wait.entry);
1434
1435         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1436                 list_del(&req->list);
1437                 io_poll_complete(ctx, req, mask);
1438                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1439
1440                 io_cqring_ev_posted(ctx);
1441                 io_put_req(req);
1442         } else {
1443                 queue_work(ctx->sqo_wq, &req->work);
1444         }
1445
1446         return 1;
1447 }
1448
1449 struct io_poll_table {
1450         struct poll_table_struct pt;
1451         struct io_kiocb *req;
1452         int error;
1453 };
1454
1455 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1456                                struct poll_table_struct *p)
1457 {
1458         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1459
1460         if (unlikely(pt->req->poll.head)) {
1461                 pt->error = -EINVAL;
1462                 return;
1463         }
1464
1465         pt->error = 0;
1466         pt->req->poll.head = head;
1467         add_wait_queue(head, &pt->req->poll.wait);
1468 }
1469
1470 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1471 {
1472         struct io_poll_iocb *poll = &req->poll;
1473         struct io_ring_ctx *ctx = req->ctx;
1474         struct io_poll_table ipt;
1475         bool cancel = false;
1476         __poll_t mask;
1477         u16 events;
1478
1479         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1480                 return -EINVAL;
1481         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1482                 return -EINVAL;
1483         if (!poll->file)
1484                 return -EBADF;
1485
1486         INIT_WORK(&req->work, io_poll_complete_work);
1487         events = READ_ONCE(sqe->poll_events);
1488         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1489
1490         poll->head = NULL;
1491         poll->done = false;
1492         poll->canceled = false;
1493
1494         ipt.pt._qproc = io_poll_queue_proc;
1495         ipt.pt._key = poll->events;
1496         ipt.req = req;
1497         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1498
1499         /* initialized the list so that we can do list_empty checks */
1500         INIT_LIST_HEAD(&poll->wait.entry);
1501         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1502
1503         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1504
1505         spin_lock_irq(&ctx->completion_lock);
1506         if (likely(poll->head)) {
1507                 spin_lock(&poll->head->lock);
1508                 if (unlikely(list_empty(&poll->wait.entry))) {
1509                         if (ipt.error)
1510                                 cancel = true;
1511                         ipt.error = 0;
1512                         mask = 0;
1513                 }
1514                 if (mask || ipt.error)
1515                         list_del_init(&poll->wait.entry);
1516                 else if (cancel)
1517                         WRITE_ONCE(poll->canceled, true);
1518                 else if (!poll->done) /* actually waiting for an event */
1519                         list_add_tail(&req->list, &ctx->cancel_list);
1520                 spin_unlock(&poll->head->lock);
1521         }
1522         if (mask) { /* no async, we'd stolen it */
1523                 ipt.error = 0;
1524                 io_poll_complete(ctx, req, mask);
1525         }
1526         spin_unlock_irq(&ctx->completion_lock);
1527
1528         if (mask) {
1529                 io_cqring_ev_posted(ctx);
1530                 io_put_req(req);
1531         }
1532         return ipt.error;
1533 }
1534
1535 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1536                         const struct io_uring_sqe *sqe)
1537 {
1538         struct io_uring_sqe *sqe_copy;
1539
1540         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1541                 return 0;
1542
1543         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1544         if (!sqe_copy)
1545                 return -EAGAIN;
1546
1547         spin_lock_irq(&ctx->completion_lock);
1548         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1549                 spin_unlock_irq(&ctx->completion_lock);
1550                 kfree(sqe_copy);
1551                 return 0;
1552         }
1553
1554         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1555         req->submit.sqe = sqe_copy;
1556
1557         INIT_WORK(&req->work, io_sq_wq_submit_work);
1558         list_add_tail(&req->list, &ctx->defer_list);
1559         spin_unlock_irq(&ctx->completion_lock);
1560         return -EIOCBQUEUED;
1561 }
1562
1563 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1564                            const struct sqe_submit *s, bool force_nonblock)
1565 {
1566         int ret, opcode;
1567
1568         if (unlikely(s->index >= ctx->sq_entries))
1569                 return -EINVAL;
1570         req->user_data = READ_ONCE(s->sqe->user_data);
1571
1572         opcode = READ_ONCE(s->sqe->opcode);
1573         switch (opcode) {
1574         case IORING_OP_NOP:
1575                 ret = io_nop(req, req->user_data);
1576                 break;
1577         case IORING_OP_READV:
1578                 if (unlikely(s->sqe->buf_index))
1579                         return -EINVAL;
1580                 ret = io_read(req, s, force_nonblock);
1581                 break;
1582         case IORING_OP_WRITEV:
1583                 if (unlikely(s->sqe->buf_index))
1584                         return -EINVAL;
1585                 ret = io_write(req, s, force_nonblock);
1586                 break;
1587         case IORING_OP_READ_FIXED:
1588                 ret = io_read(req, s, force_nonblock);
1589                 break;
1590         case IORING_OP_WRITE_FIXED:
1591                 ret = io_write(req, s, force_nonblock);
1592                 break;
1593         case IORING_OP_FSYNC:
1594                 ret = io_fsync(req, s->sqe, force_nonblock);
1595                 break;
1596         case IORING_OP_POLL_ADD:
1597                 ret = io_poll_add(req, s->sqe);
1598                 break;
1599         case IORING_OP_POLL_REMOVE:
1600                 ret = io_poll_remove(req, s->sqe);
1601                 break;
1602         case IORING_OP_SYNC_FILE_RANGE:
1603                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1604                 break;
1605         default:
1606                 ret = -EINVAL;
1607                 break;
1608         }
1609
1610         if (ret)
1611                 return ret;
1612
1613         if (ctx->flags & IORING_SETUP_IOPOLL) {
1614                 if (req->error == -EAGAIN)
1615                         return -EAGAIN;
1616
1617                 /* workqueue context doesn't hold uring_lock, grab it now */
1618                 if (s->needs_lock)
1619                         mutex_lock(&ctx->uring_lock);
1620                 io_iopoll_req_issued(req);
1621                 if (s->needs_lock)
1622                         mutex_unlock(&ctx->uring_lock);
1623         }
1624
1625         return 0;
1626 }
1627
1628 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1629                                                  const struct io_uring_sqe *sqe)
1630 {
1631         switch (sqe->opcode) {
1632         case IORING_OP_READV:
1633         case IORING_OP_READ_FIXED:
1634                 return &ctx->pending_async[READ];
1635         case IORING_OP_WRITEV:
1636         case IORING_OP_WRITE_FIXED:
1637                 return &ctx->pending_async[WRITE];
1638         default:
1639                 return NULL;
1640         }
1641 }
1642
1643 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1644 {
1645         u8 opcode = READ_ONCE(sqe->opcode);
1646
1647         return !(opcode == IORING_OP_READ_FIXED ||
1648                  opcode == IORING_OP_WRITE_FIXED);
1649 }
1650
1651 static void io_sq_wq_submit_work(struct work_struct *work)
1652 {
1653         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1654         struct io_ring_ctx *ctx = req->ctx;
1655         struct mm_struct *cur_mm = NULL;
1656         struct async_list *async_list;
1657         LIST_HEAD(req_list);
1658         mm_segment_t old_fs;
1659         int ret;
1660
1661         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1662 restart:
1663         do {
1664                 struct sqe_submit *s = &req->submit;
1665                 const struct io_uring_sqe *sqe = s->sqe;
1666
1667                 /* Ensure we clear previously set non-block flag */
1668                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1669
1670                 ret = 0;
1671                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1672                         if (!mmget_not_zero(ctx->sqo_mm)) {
1673                                 ret = -EFAULT;
1674                         } else {
1675                                 cur_mm = ctx->sqo_mm;
1676                                 use_mm(cur_mm);
1677                                 old_fs = get_fs();
1678                                 set_fs(USER_DS);
1679                         }
1680                 }
1681
1682                 if (!ret) {
1683                         s->has_user = cur_mm != NULL;
1684                         s->needs_lock = true;
1685                         do {
1686                                 ret = __io_submit_sqe(ctx, req, s, false);
1687                                 /*
1688                                  * We can get EAGAIN for polled IO even though
1689                                  * we're forcing a sync submission from here,
1690                                  * since we can't wait for request slots on the
1691                                  * block side.
1692                                  */
1693                                 if (ret != -EAGAIN)
1694                                         break;
1695                                 cond_resched();
1696                         } while (1);
1697                 }
1698
1699                 /* drop submission reference */
1700                 io_put_req(req);
1701
1702                 if (ret) {
1703                         io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1704                         io_put_req(req);
1705                 }
1706
1707                 /* async context always use a copy of the sqe */
1708                 kfree(sqe);
1709
1710                 if (!async_list)
1711                         break;
1712                 if (!list_empty(&req_list)) {
1713                         req = list_first_entry(&req_list, struct io_kiocb,
1714                                                 list);
1715                         list_del(&req->list);
1716                         continue;
1717                 }
1718                 if (list_empty(&async_list->list))
1719                         break;
1720
1721                 req = NULL;
1722                 spin_lock(&async_list->lock);
1723                 if (list_empty(&async_list->list)) {
1724                         spin_unlock(&async_list->lock);
1725                         break;
1726                 }
1727                 list_splice_init(&async_list->list, &req_list);
1728                 spin_unlock(&async_list->lock);
1729
1730                 req = list_first_entry(&req_list, struct io_kiocb, list);
1731                 list_del(&req->list);
1732         } while (req);
1733
1734         /*
1735          * Rare case of racing with a submitter. If we find the count has
1736          * dropped to zero AND we have pending work items, then restart
1737          * the processing. This is a tiny race window.
1738          */
1739         if (async_list) {
1740                 ret = atomic_dec_return(&async_list->cnt);
1741                 while (!ret && !list_empty(&async_list->list)) {
1742                         spin_lock(&async_list->lock);
1743                         atomic_inc(&async_list->cnt);
1744                         list_splice_init(&async_list->list, &req_list);
1745                         spin_unlock(&async_list->lock);
1746
1747                         if (!list_empty(&req_list)) {
1748                                 req = list_first_entry(&req_list,
1749                                                         struct io_kiocb, list);
1750                                 list_del(&req->list);
1751                                 goto restart;
1752                         }
1753                         ret = atomic_dec_return(&async_list->cnt);
1754                 }
1755         }
1756
1757         if (cur_mm) {
1758                 set_fs(old_fs);
1759                 unuse_mm(cur_mm);
1760                 mmput(cur_mm);
1761         }
1762 }
1763
1764 /*
1765  * See if we can piggy back onto previously submitted work, that is still
1766  * running. We currently only allow this if the new request is sequential
1767  * to the previous one we punted.
1768  */
1769 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1770 {
1771         bool ret = false;
1772
1773         if (!list)
1774                 return false;
1775         if (!(req->flags & REQ_F_SEQ_PREV))
1776                 return false;
1777         if (!atomic_read(&list->cnt))
1778                 return false;
1779
1780         ret = true;
1781         spin_lock(&list->lock);
1782         list_add_tail(&req->list, &list->list);
1783         if (!atomic_read(&list->cnt)) {
1784                 list_del_init(&req->list);
1785                 ret = false;
1786         }
1787         spin_unlock(&list->lock);
1788         return ret;
1789 }
1790
1791 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1792 {
1793         int op = READ_ONCE(sqe->opcode);
1794
1795         switch (op) {
1796         case IORING_OP_NOP:
1797         case IORING_OP_POLL_REMOVE:
1798                 return false;
1799         default:
1800                 return true;
1801         }
1802 }
1803
1804 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1805                            struct io_submit_state *state, struct io_kiocb *req)
1806 {
1807         unsigned flags;
1808         int fd;
1809
1810         flags = READ_ONCE(s->sqe->flags);
1811         fd = READ_ONCE(s->sqe->fd);
1812
1813         if (flags & IOSQE_IO_DRAIN) {
1814                 req->flags |= REQ_F_IO_DRAIN;
1815                 req->sequence = ctx->cached_sq_head - 1;
1816         }
1817
1818         if (!io_op_needs_file(s->sqe)) {
1819                 req->file = NULL;
1820                 return 0;
1821         }
1822
1823         if (flags & IOSQE_FIXED_FILE) {
1824                 if (unlikely(!ctx->user_files ||
1825                     (unsigned) fd >= ctx->nr_user_files))
1826                         return -EBADF;
1827                 req->file = ctx->user_files[fd];
1828                 req->flags |= REQ_F_FIXED_FILE;
1829         } else {
1830                 if (s->needs_fixed_file)
1831                         return -EBADF;
1832                 req->file = io_file_get(state, fd);
1833                 if (unlikely(!req->file))
1834                         return -EBADF;
1835         }
1836
1837         return 0;
1838 }
1839
1840 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1841                          struct io_submit_state *state)
1842 {
1843         struct io_kiocb *req;
1844         int ret;
1845
1846         /* enforce forwards compatibility on users */
1847         if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1848                 return -EINVAL;
1849
1850         req = io_get_req(ctx, state);
1851         if (unlikely(!req))
1852                 return -EAGAIN;
1853
1854         ret = io_req_set_file(ctx, s, state, req);
1855         if (unlikely(ret))
1856                 goto out;
1857
1858         ret = io_req_defer(ctx, req, s->sqe);
1859         if (ret) {
1860                 if (ret == -EIOCBQUEUED)
1861                         ret = 0;
1862                 return ret;
1863         }
1864
1865         ret = __io_submit_sqe(ctx, req, s, true);
1866         if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1867                 struct io_uring_sqe *sqe_copy;
1868
1869                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1870                 if (sqe_copy) {
1871                         struct async_list *list;
1872
1873                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1874                         s->sqe = sqe_copy;
1875
1876                         memcpy(&req->submit, s, sizeof(*s));
1877                         list = io_async_list_from_sqe(ctx, s->sqe);
1878                         if (!io_add_to_prev_work(list, req)) {
1879                                 if (list)
1880                                         atomic_inc(&list->cnt);
1881                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1882                                 queue_work(ctx->sqo_wq, &req->work);
1883                         }
1884
1885                         /*
1886                          * Queued up for async execution, worker will release
1887                          * submit reference when the iocb is actually
1888                          * submitted.
1889                          */
1890                         return 0;
1891                 }
1892         }
1893
1894 out:
1895         /* drop submission reference */
1896         io_put_req(req);
1897
1898         /* and drop final reference, if we failed */
1899         if (ret)
1900                 io_put_req(req);
1901
1902         return ret;
1903 }
1904
1905 /*
1906  * Batched submission is done, ensure local IO is flushed out.
1907  */
1908 static void io_submit_state_end(struct io_submit_state *state)
1909 {
1910         blk_finish_plug(&state->plug);
1911         io_file_put(state);
1912         if (state->free_reqs)
1913                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1914                                         &state->reqs[state->cur_req]);
1915 }
1916
1917 /*
1918  * Start submission side cache.
1919  */
1920 static void io_submit_state_start(struct io_submit_state *state,
1921                                   struct io_ring_ctx *ctx, unsigned max_ios)
1922 {
1923         blk_start_plug(&state->plug);
1924         state->free_reqs = 0;
1925         state->file = NULL;
1926         state->ios_left = max_ios;
1927 }
1928
1929 static void io_commit_sqring(struct io_ring_ctx *ctx)
1930 {
1931         struct io_sq_ring *ring = ctx->sq_ring;
1932
1933         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1934                 /*
1935                  * Ensure any loads from the SQEs are done at this point,
1936                  * since once we write the new head, the application could
1937                  * write new data to them.
1938                  */
1939                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1940         }
1941 }
1942
1943 /*
1944  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1945  * that is mapped by userspace. This means that care needs to be taken to
1946  * ensure that reads are stable, as we cannot rely on userspace always
1947  * being a good citizen. If members of the sqe are validated and then later
1948  * used, it's important that those reads are done through READ_ONCE() to
1949  * prevent a re-load down the line.
1950  */
1951 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1952 {
1953         struct io_sq_ring *ring = ctx->sq_ring;
1954         unsigned head;
1955
1956         /*
1957          * The cached sq head (or cq tail) serves two purposes:
1958          *
1959          * 1) allows us to batch the cost of updating the user visible
1960          *    head updates.
1961          * 2) allows the kernel side to track the head on its own, even
1962          *    though the application is the one updating it.
1963          */
1964         head = ctx->cached_sq_head;
1965         /* make sure SQ entry isn't read before tail */
1966         if (head == smp_load_acquire(&ring->r.tail))
1967                 return false;
1968
1969         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1970         if (head < ctx->sq_entries) {
1971                 s->index = head;
1972                 s->sqe = &ctx->sq_sqes[head];
1973                 ctx->cached_sq_head++;
1974                 return true;
1975         }
1976
1977         /* drop invalid entries */
1978         ctx->cached_sq_head++;
1979         ring->dropped++;
1980         return false;
1981 }
1982
1983 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1984                           unsigned int nr, bool has_user, bool mm_fault)
1985 {
1986         struct io_submit_state state, *statep = NULL;
1987         int ret, i, submitted = 0;
1988
1989         if (nr > IO_PLUG_THRESHOLD) {
1990                 io_submit_state_start(&state, ctx, nr);
1991                 statep = &state;
1992         }
1993
1994         for (i = 0; i < nr; i++) {
1995                 if (unlikely(mm_fault)) {
1996                         ret = -EFAULT;
1997                 } else {
1998                         sqes[i].has_user = has_user;
1999                         sqes[i].needs_lock = true;
2000                         sqes[i].needs_fixed_file = true;
2001                         ret = io_submit_sqe(ctx, &sqes[i], statep);
2002                 }
2003                 if (!ret) {
2004                         submitted++;
2005                         continue;
2006                 }
2007
2008                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
2009         }
2010
2011         if (statep)
2012                 io_submit_state_end(&state);
2013
2014         return submitted;
2015 }
2016
2017 static int io_sq_thread(void *data)
2018 {
2019         struct sqe_submit sqes[IO_IOPOLL_BATCH];
2020         struct io_ring_ctx *ctx = data;
2021         struct mm_struct *cur_mm = NULL;
2022         mm_segment_t old_fs;
2023         DEFINE_WAIT(wait);
2024         unsigned inflight;
2025         unsigned long timeout;
2026
2027         old_fs = get_fs();
2028         set_fs(USER_DS);
2029
2030         timeout = inflight = 0;
2031         while (!kthread_should_stop() && !ctx->sqo_stop) {
2032                 bool all_fixed, mm_fault = false;
2033                 int i;
2034
2035                 if (inflight) {
2036                         unsigned nr_events = 0;
2037
2038                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2039                                 /*
2040                                  * We disallow the app entering submit/complete
2041                                  * with polling, but we still need to lock the
2042                                  * ring to prevent racing with polled issue
2043                                  * that got punted to a workqueue.
2044                                  */
2045                                 mutex_lock(&ctx->uring_lock);
2046                                 io_iopoll_check(ctx, &nr_events, 0);
2047                                 mutex_unlock(&ctx->uring_lock);
2048                         } else {
2049                                 /*
2050                                  * Normal IO, just pretend everything completed.
2051                                  * We don't have to poll completions for that.
2052                                  */
2053                                 nr_events = inflight;
2054                         }
2055
2056                         inflight -= nr_events;
2057                         if (!inflight)
2058                                 timeout = jiffies + ctx->sq_thread_idle;
2059                 }
2060
2061                 if (!io_get_sqring(ctx, &sqes[0])) {
2062                         /*
2063                          * We're polling. If we're within the defined idle
2064                          * period, then let us spin without work before going
2065                          * to sleep.
2066                          */
2067                         if (inflight || !time_after(jiffies, timeout)) {
2068                                 cpu_relax();
2069                                 continue;
2070                         }
2071
2072                         /*
2073                          * Drop cur_mm before scheduling, we can't hold it for
2074                          * long periods (or over schedule()). Do this before
2075                          * adding ourselves to the waitqueue, as the unuse/drop
2076                          * may sleep.
2077                          */
2078                         if (cur_mm) {
2079                                 unuse_mm(cur_mm);
2080                                 mmput(cur_mm);
2081                                 cur_mm = NULL;
2082                         }
2083
2084                         prepare_to_wait(&ctx->sqo_wait, &wait,
2085                                                 TASK_INTERRUPTIBLE);
2086
2087                         /* Tell userspace we may need a wakeup call */
2088                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2089                         /* make sure to read SQ tail after writing flags */
2090                         smp_mb();
2091
2092                         if (!io_get_sqring(ctx, &sqes[0])) {
2093                                 if (kthread_should_stop()) {
2094                                         finish_wait(&ctx->sqo_wait, &wait);
2095                                         break;
2096                                 }
2097                                 if (signal_pending(current))
2098                                         flush_signals(current);
2099                                 schedule();
2100                                 finish_wait(&ctx->sqo_wait, &wait);
2101
2102                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2103                                 continue;
2104                         }
2105                         finish_wait(&ctx->sqo_wait, &wait);
2106
2107                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2108                 }
2109
2110                 i = 0;
2111                 all_fixed = true;
2112                 do {
2113                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2114                                 all_fixed = false;
2115
2116                         i++;
2117                         if (i == ARRAY_SIZE(sqes))
2118                                 break;
2119                 } while (io_get_sqring(ctx, &sqes[i]));
2120
2121                 /* Unless all new commands are FIXED regions, grab mm */
2122                 if (!all_fixed && !cur_mm) {
2123                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2124                         if (!mm_fault) {
2125                                 use_mm(ctx->sqo_mm);
2126                                 cur_mm = ctx->sqo_mm;
2127                         }
2128                 }
2129
2130                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2131                                                 mm_fault);
2132
2133                 /* Commit SQ ring head once we've consumed all SQEs */
2134                 io_commit_sqring(ctx);
2135         }
2136
2137         set_fs(old_fs);
2138         if (cur_mm) {
2139                 unuse_mm(cur_mm);
2140                 mmput(cur_mm);
2141         }
2142
2143         if (kthread_should_park())
2144                 kthread_parkme();
2145
2146         return 0;
2147 }
2148
2149 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2150 {
2151         struct io_submit_state state, *statep = NULL;
2152         int i, submit = 0;
2153
2154         if (to_submit > IO_PLUG_THRESHOLD) {
2155                 io_submit_state_start(&state, ctx, to_submit);
2156                 statep = &state;
2157         }
2158
2159         for (i = 0; i < to_submit; i++) {
2160                 struct sqe_submit s;
2161                 int ret;
2162
2163                 if (!io_get_sqring(ctx, &s))
2164                         break;
2165
2166                 s.has_user = true;
2167                 s.needs_lock = false;
2168                 s.needs_fixed_file = false;
2169                 submit++;
2170
2171                 ret = io_submit_sqe(ctx, &s, statep);
2172                 if (ret)
2173                         io_cqring_add_event(ctx, s.sqe->user_data, ret, 0);
2174         }
2175         io_commit_sqring(ctx);
2176
2177         if (statep)
2178                 io_submit_state_end(statep);
2179
2180         return submit;
2181 }
2182
2183 static unsigned io_cqring_events(struct io_cq_ring *ring)
2184 {
2185         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2186 }
2187
2188 /*
2189  * Wait until events become available, if we don't already have some. The
2190  * application must reap them itself, as they reside on the shared cq ring.
2191  */
2192 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2193                           const sigset_t __user *sig, size_t sigsz)
2194 {
2195         struct io_cq_ring *ring = ctx->cq_ring;
2196         sigset_t ksigmask, sigsaved;
2197         DEFINE_WAIT(wait);
2198         int ret;
2199
2200         /* See comment at the top of this file */
2201         smp_rmb();
2202         if (io_cqring_events(ring) >= min_events)
2203                 return 0;
2204
2205         if (sig) {
2206 #ifdef CONFIG_COMPAT
2207                 if (in_compat_syscall())
2208                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2209                                                       &ksigmask, &sigsaved, sigsz);
2210                 else
2211 #endif
2212                         ret = set_user_sigmask(sig, &ksigmask,
2213                                                &sigsaved, sigsz);
2214
2215                 if (ret)
2216                         return ret;
2217         }
2218
2219         do {
2220                 prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
2221
2222                 ret = 0;
2223                 /* See comment at the top of this file */
2224                 smp_rmb();
2225                 if (io_cqring_events(ring) >= min_events)
2226                         break;
2227
2228                 schedule();
2229
2230                 ret = -EINTR;
2231                 if (signal_pending(current))
2232                         break;
2233         } while (1);
2234
2235         finish_wait(&ctx->wait, &wait);
2236
2237         if (sig)
2238                 restore_user_sigmask(sig, &sigsaved);
2239
2240         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2241 }
2242
2243 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2244 {
2245 #if defined(CONFIG_UNIX)
2246         if (ctx->ring_sock) {
2247                 struct sock *sock = ctx->ring_sock->sk;
2248                 struct sk_buff *skb;
2249
2250                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2251                         kfree_skb(skb);
2252         }
2253 #else
2254         int i;
2255
2256         for (i = 0; i < ctx->nr_user_files; i++)
2257                 fput(ctx->user_files[i]);
2258 #endif
2259 }
2260
2261 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2262 {
2263         if (!ctx->user_files)
2264                 return -ENXIO;
2265
2266         __io_sqe_files_unregister(ctx);
2267         kfree(ctx->user_files);
2268         ctx->user_files = NULL;
2269         ctx->nr_user_files = 0;
2270         return 0;
2271 }
2272
2273 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2274 {
2275         if (ctx->sqo_thread) {
2276                 ctx->sqo_stop = 1;
2277                 mb();
2278                 kthread_park(ctx->sqo_thread);
2279                 kthread_stop(ctx->sqo_thread);
2280                 ctx->sqo_thread = NULL;
2281         }
2282 }
2283
2284 static void io_finish_async(struct io_ring_ctx *ctx)
2285 {
2286         io_sq_thread_stop(ctx);
2287
2288         if (ctx->sqo_wq) {
2289                 destroy_workqueue(ctx->sqo_wq);
2290                 ctx->sqo_wq = NULL;
2291         }
2292 }
2293
2294 #if defined(CONFIG_UNIX)
2295 static void io_destruct_skb(struct sk_buff *skb)
2296 {
2297         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2298
2299         io_finish_async(ctx);
2300         unix_destruct_scm(skb);
2301 }
2302
2303 /*
2304  * Ensure the UNIX gc is aware of our file set, so we are certain that
2305  * the io_uring can be safely unregistered on process exit, even if we have
2306  * loops in the file referencing.
2307  */
2308 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2309 {
2310         struct sock *sk = ctx->ring_sock->sk;
2311         struct scm_fp_list *fpl;
2312         struct sk_buff *skb;
2313         int i;
2314
2315         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2316                 unsigned long inflight = ctx->user->unix_inflight + nr;
2317
2318                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2319                         return -EMFILE;
2320         }
2321
2322         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2323         if (!fpl)
2324                 return -ENOMEM;
2325
2326         skb = alloc_skb(0, GFP_KERNEL);
2327         if (!skb) {
2328                 kfree(fpl);
2329                 return -ENOMEM;
2330         }
2331
2332         skb->sk = sk;
2333         skb->destructor = io_destruct_skb;
2334
2335         fpl->user = get_uid(ctx->user);
2336         for (i = 0; i < nr; i++) {
2337                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2338                 unix_inflight(fpl->user, fpl->fp[i]);
2339         }
2340
2341         fpl->max = fpl->count = nr;
2342         UNIXCB(skb).fp = fpl;
2343         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2344         skb_queue_head(&sk->sk_receive_queue, skb);
2345
2346         for (i = 0; i < nr; i++)
2347                 fput(fpl->fp[i]);
2348
2349         return 0;
2350 }
2351
2352 /*
2353  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2354  * causes regular reference counting to break down. We rely on the UNIX
2355  * garbage collection to take care of this problem for us.
2356  */
2357 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2358 {
2359         unsigned left, total;
2360         int ret = 0;
2361
2362         total = 0;
2363         left = ctx->nr_user_files;
2364         while (left) {
2365                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2366
2367                 ret = __io_sqe_files_scm(ctx, this_files, total);
2368                 if (ret)
2369                         break;
2370                 left -= this_files;
2371                 total += this_files;
2372         }
2373
2374         if (!ret)
2375                 return 0;
2376
2377         while (total < ctx->nr_user_files) {
2378                 fput(ctx->user_files[total]);
2379                 total++;
2380         }
2381
2382         return ret;
2383 }
2384 #else
2385 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2386 {
2387         return 0;
2388 }
2389 #endif
2390
2391 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2392                                  unsigned nr_args)
2393 {
2394         __s32 __user *fds = (__s32 __user *) arg;
2395         int fd, ret = 0;
2396         unsigned i;
2397
2398         if (ctx->user_files)
2399                 return -EBUSY;
2400         if (!nr_args)
2401                 return -EINVAL;
2402         if (nr_args > IORING_MAX_FIXED_FILES)
2403                 return -EMFILE;
2404
2405         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2406         if (!ctx->user_files)
2407                 return -ENOMEM;
2408
2409         for (i = 0; i < nr_args; i++) {
2410                 ret = -EFAULT;
2411                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2412                         break;
2413
2414                 ctx->user_files[i] = fget(fd);
2415
2416                 ret = -EBADF;
2417                 if (!ctx->user_files[i])
2418                         break;
2419                 /*
2420                  * Don't allow io_uring instances to be registered. If UNIX
2421                  * isn't enabled, then this causes a reference cycle and this
2422                  * instance can never get freed. If UNIX is enabled we'll
2423                  * handle it just fine, but there's still no point in allowing
2424                  * a ring fd as it doesn't support regular read/write anyway.
2425                  */
2426                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2427                         fput(ctx->user_files[i]);
2428                         break;
2429                 }
2430                 ctx->nr_user_files++;
2431                 ret = 0;
2432         }
2433
2434         if (ret) {
2435                 for (i = 0; i < ctx->nr_user_files; i++)
2436                         fput(ctx->user_files[i]);
2437
2438                 kfree(ctx->user_files);
2439                 ctx->user_files = NULL;
2440                 ctx->nr_user_files = 0;
2441                 return ret;
2442         }
2443
2444         ret = io_sqe_files_scm(ctx);
2445         if (ret)
2446                 io_sqe_files_unregister(ctx);
2447
2448         return ret;
2449 }
2450
2451 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2452                                struct io_uring_params *p)
2453 {
2454         int ret;
2455
2456         init_waitqueue_head(&ctx->sqo_wait);
2457         mmgrab(current->mm);
2458         ctx->sqo_mm = current->mm;
2459
2460         if (ctx->flags & IORING_SETUP_SQPOLL) {
2461                 ret = -EPERM;
2462                 if (!capable(CAP_SYS_ADMIN))
2463                         goto err;
2464
2465                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2466                 if (!ctx->sq_thread_idle)
2467                         ctx->sq_thread_idle = HZ;
2468
2469                 if (p->flags & IORING_SETUP_SQ_AFF) {
2470                         int cpu = array_index_nospec(p->sq_thread_cpu,
2471                                                         nr_cpu_ids);
2472
2473                         ret = -EINVAL;
2474                         if (!cpu_online(cpu))
2475                                 goto err;
2476
2477                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2478                                                         ctx, cpu,
2479                                                         "io_uring-sq");
2480                 } else {
2481                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2482                                                         "io_uring-sq");
2483                 }
2484                 if (IS_ERR(ctx->sqo_thread)) {
2485                         ret = PTR_ERR(ctx->sqo_thread);
2486                         ctx->sqo_thread = NULL;
2487                         goto err;
2488                 }
2489                 wake_up_process(ctx->sqo_thread);
2490         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2491                 /* Can't have SQ_AFF without SQPOLL */
2492                 ret = -EINVAL;
2493                 goto err;
2494         }
2495
2496         /* Do QD, or 2 * CPUS, whatever is smallest */
2497         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2498                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2499         if (!ctx->sqo_wq) {
2500                 ret = -ENOMEM;
2501                 goto err;
2502         }
2503
2504         return 0;
2505 err:
2506         io_sq_thread_stop(ctx);
2507         mmdrop(ctx->sqo_mm);
2508         ctx->sqo_mm = NULL;
2509         return ret;
2510 }
2511
2512 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2513 {
2514         atomic_long_sub(nr_pages, &user->locked_vm);
2515 }
2516
2517 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2518 {
2519         unsigned long page_limit, cur_pages, new_pages;
2520
2521         /* Don't allow more pages than we can safely lock */
2522         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2523
2524         do {
2525                 cur_pages = atomic_long_read(&user->locked_vm);
2526                 new_pages = cur_pages + nr_pages;
2527                 if (new_pages > page_limit)
2528                         return -ENOMEM;
2529         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2530                                         new_pages) != cur_pages);
2531
2532         return 0;
2533 }
2534
2535 static void io_mem_free(void *ptr)
2536 {
2537         struct page *page;
2538
2539         if (!ptr)
2540                 return;
2541
2542         page = virt_to_head_page(ptr);
2543         if (put_page_testzero(page))
2544                 free_compound_page(page);
2545 }
2546
2547 static void *io_mem_alloc(size_t size)
2548 {
2549         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2550                                 __GFP_NORETRY;
2551
2552         return (void *) __get_free_pages(gfp_flags, get_order(size));
2553 }
2554
2555 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2556 {
2557         struct io_sq_ring *sq_ring;
2558         struct io_cq_ring *cq_ring;
2559         size_t bytes;
2560
2561         bytes = struct_size(sq_ring, array, sq_entries);
2562         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2563         bytes += struct_size(cq_ring, cqes, cq_entries);
2564
2565         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2566 }
2567
2568 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2569 {
2570         int i, j;
2571
2572         if (!ctx->user_bufs)
2573                 return -ENXIO;
2574
2575         for (i = 0; i < ctx->nr_user_bufs; i++) {
2576                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2577
2578                 for (j = 0; j < imu->nr_bvecs; j++)
2579                         put_page(imu->bvec[j].bv_page);
2580
2581                 if (ctx->account_mem)
2582                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2583                 kvfree(imu->bvec);
2584                 imu->nr_bvecs = 0;
2585         }
2586
2587         kfree(ctx->user_bufs);
2588         ctx->user_bufs = NULL;
2589         ctx->nr_user_bufs = 0;
2590         return 0;
2591 }
2592
2593 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2594                        void __user *arg, unsigned index)
2595 {
2596         struct iovec __user *src;
2597
2598 #ifdef CONFIG_COMPAT
2599         if (ctx->compat) {
2600                 struct compat_iovec __user *ciovs;
2601                 struct compat_iovec ciov;
2602
2603                 ciovs = (struct compat_iovec __user *) arg;
2604                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2605                         return -EFAULT;
2606
2607                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2608                 dst->iov_len = ciov.iov_len;
2609                 return 0;
2610         }
2611 #endif
2612         src = (struct iovec __user *) arg;
2613         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2614                 return -EFAULT;
2615         return 0;
2616 }
2617
2618 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2619                                   unsigned nr_args)
2620 {
2621         struct vm_area_struct **vmas = NULL;
2622         struct page **pages = NULL;
2623         int i, j, got_pages = 0;
2624         int ret = -EINVAL;
2625
2626         if (ctx->user_bufs)
2627                 return -EBUSY;
2628         if (!nr_args || nr_args > UIO_MAXIOV)
2629                 return -EINVAL;
2630
2631         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2632                                         GFP_KERNEL);
2633         if (!ctx->user_bufs)
2634                 return -ENOMEM;
2635
2636         for (i = 0; i < nr_args; i++) {
2637                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2638                 unsigned long off, start, end, ubuf;
2639                 int pret, nr_pages;
2640                 struct iovec iov;
2641                 size_t size;
2642
2643                 ret = io_copy_iov(ctx, &iov, arg, i);
2644                 if (ret)
2645                         break;
2646
2647                 /*
2648                  * Don't impose further limits on the size and buffer
2649                  * constraints here, we'll -EINVAL later when IO is
2650                  * submitted if they are wrong.
2651                  */
2652                 ret = -EFAULT;
2653                 if (!iov.iov_base || !iov.iov_len)
2654                         goto err;
2655
2656                 /* arbitrary limit, but we need something */
2657                 if (iov.iov_len > SZ_1G)
2658                         goto err;
2659
2660                 ubuf = (unsigned long) iov.iov_base;
2661                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2662                 start = ubuf >> PAGE_SHIFT;
2663                 nr_pages = end - start;
2664
2665                 if (ctx->account_mem) {
2666                         ret = io_account_mem(ctx->user, nr_pages);
2667                         if (ret)
2668                                 goto err;
2669                 }
2670
2671                 ret = 0;
2672                 if (!pages || nr_pages > got_pages) {
2673                         kfree(vmas);
2674                         kfree(pages);
2675                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2676                                                 GFP_KERNEL);
2677                         vmas = kvmalloc_array(nr_pages,
2678                                         sizeof(struct vm_area_struct *),
2679                                         GFP_KERNEL);
2680                         if (!pages || !vmas) {
2681                                 ret = -ENOMEM;
2682                                 if (ctx->account_mem)
2683                                         io_unaccount_mem(ctx->user, nr_pages);
2684                                 goto err;
2685                         }
2686                         got_pages = nr_pages;
2687                 }
2688
2689                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2690                                                 GFP_KERNEL);
2691                 ret = -ENOMEM;
2692                 if (!imu->bvec) {
2693                         if (ctx->account_mem)
2694                                 io_unaccount_mem(ctx->user, nr_pages);
2695                         goto err;
2696                 }
2697
2698                 ret = 0;
2699                 down_read(&current->mm->mmap_sem);
2700                 pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
2701                                                 pages, vmas);
2702                 if (pret == nr_pages) {
2703                         /* don't support file backed memory */
2704                         for (j = 0; j < nr_pages; j++) {
2705                                 struct vm_area_struct *vma = vmas[j];
2706
2707                                 if (vma->vm_file &&
2708                                     !is_file_hugepages(vma->vm_file)) {
2709                                         ret = -EOPNOTSUPP;
2710                                         break;
2711                                 }
2712                         }
2713                 } else {
2714                         ret = pret < 0 ? pret : -EFAULT;
2715                 }
2716                 up_read(&current->mm->mmap_sem);
2717                 if (ret) {
2718                         /*
2719                          * if we did partial map, or found file backed vmas,
2720                          * release any pages we did get
2721                          */
2722                         if (pret > 0) {
2723                                 for (j = 0; j < pret; j++)
2724                                         put_page(pages[j]);
2725                         }
2726                         if (ctx->account_mem)
2727                                 io_unaccount_mem(ctx->user, nr_pages);
2728                         kvfree(imu->bvec);
2729                         goto err;
2730                 }
2731
2732                 off = ubuf & ~PAGE_MASK;
2733                 size = iov.iov_len;
2734                 for (j = 0; j < nr_pages; j++) {
2735                         size_t vec_len;
2736
2737                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2738                         imu->bvec[j].bv_page = pages[j];
2739                         imu->bvec[j].bv_len = vec_len;
2740                         imu->bvec[j].bv_offset = off;
2741                         off = 0;
2742                         size -= vec_len;
2743                 }
2744                 /* store original address for later verification */
2745                 imu->ubuf = ubuf;
2746                 imu->len = iov.iov_len;
2747                 imu->nr_bvecs = nr_pages;
2748
2749                 ctx->nr_user_bufs++;
2750         }
2751         kvfree(pages);
2752         kvfree(vmas);
2753         return 0;
2754 err:
2755         kvfree(pages);
2756         kvfree(vmas);
2757         io_sqe_buffer_unregister(ctx);
2758         return ret;
2759 }
2760
2761 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2762 {
2763         __s32 __user *fds = arg;
2764         int fd;
2765
2766         if (ctx->cq_ev_fd)
2767                 return -EBUSY;
2768
2769         if (copy_from_user(&fd, fds, sizeof(*fds)))
2770                 return -EFAULT;
2771
2772         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2773         if (IS_ERR(ctx->cq_ev_fd)) {
2774                 int ret = PTR_ERR(ctx->cq_ev_fd);
2775                 ctx->cq_ev_fd = NULL;
2776                 return ret;
2777         }
2778
2779         return 0;
2780 }
2781
2782 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2783 {
2784         if (ctx->cq_ev_fd) {
2785                 eventfd_ctx_put(ctx->cq_ev_fd);
2786                 ctx->cq_ev_fd = NULL;
2787                 return 0;
2788         }
2789
2790         return -ENXIO;
2791 }
2792
2793 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2794 {
2795         io_finish_async(ctx);
2796         if (ctx->sqo_mm)
2797                 mmdrop(ctx->sqo_mm);
2798
2799         io_iopoll_reap_events(ctx);
2800         io_sqe_buffer_unregister(ctx);
2801         io_sqe_files_unregister(ctx);
2802         io_eventfd_unregister(ctx);
2803
2804 #if defined(CONFIG_UNIX)
2805         if (ctx->ring_sock)
2806                 sock_release(ctx->ring_sock);
2807 #endif
2808
2809         io_mem_free(ctx->sq_ring);
2810         io_mem_free(ctx->sq_sqes);
2811         io_mem_free(ctx->cq_ring);
2812
2813         percpu_ref_exit(&ctx->refs);
2814         if (ctx->account_mem)
2815                 io_unaccount_mem(ctx->user,
2816                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2817         free_uid(ctx->user);
2818         kfree(ctx);
2819 }
2820
2821 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2822 {
2823         struct io_ring_ctx *ctx = file->private_data;
2824         __poll_t mask = 0;
2825
2826         poll_wait(file, &ctx->cq_wait, wait);
2827         /*
2828          * synchronizes with barrier from wq_has_sleeper call in
2829          * io_commit_cqring
2830          */
2831         smp_rmb();
2832         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2833             ctx->sq_ring->ring_entries)
2834                 mask |= EPOLLOUT | EPOLLWRNORM;
2835         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2836                 mask |= EPOLLIN | EPOLLRDNORM;
2837
2838         return mask;
2839 }
2840
2841 static int io_uring_fasync(int fd, struct file *file, int on)
2842 {
2843         struct io_ring_ctx *ctx = file->private_data;
2844
2845         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2846 }
2847
2848 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2849 {
2850         mutex_lock(&ctx->uring_lock);
2851         percpu_ref_kill(&ctx->refs);
2852         mutex_unlock(&ctx->uring_lock);
2853
2854         io_poll_remove_all(ctx);
2855         io_iopoll_reap_events(ctx);
2856         wait_for_completion(&ctx->ctx_done);
2857         io_ring_ctx_free(ctx);
2858 }
2859
2860 static int io_uring_release(struct inode *inode, struct file *file)
2861 {
2862         struct io_ring_ctx *ctx = file->private_data;
2863
2864         file->private_data = NULL;
2865         io_ring_ctx_wait_and_kill(ctx);
2866         return 0;
2867 }
2868
2869 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2870 {
2871         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2872         unsigned long sz = vma->vm_end - vma->vm_start;
2873         struct io_ring_ctx *ctx = file->private_data;
2874         unsigned long pfn;
2875         struct page *page;
2876         void *ptr;
2877
2878         switch (offset) {
2879         case IORING_OFF_SQ_RING:
2880                 ptr = ctx->sq_ring;
2881                 break;
2882         case IORING_OFF_SQES:
2883                 ptr = ctx->sq_sqes;
2884                 break;
2885         case IORING_OFF_CQ_RING:
2886                 ptr = ctx->cq_ring;
2887                 break;
2888         default:
2889                 return -EINVAL;
2890         }
2891
2892         page = virt_to_head_page(ptr);
2893         if (sz > (PAGE_SIZE << compound_order(page)))
2894                 return -EINVAL;
2895
2896         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2897         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2898 }
2899
2900 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2901                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2902                 size_t, sigsz)
2903 {
2904         struct io_ring_ctx *ctx;
2905         long ret = -EBADF;
2906         int submitted = 0;
2907         struct fd f;
2908
2909         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2910                 return -EINVAL;
2911
2912         f = fdget(fd);
2913         if (!f.file)
2914                 return -EBADF;
2915
2916         ret = -EOPNOTSUPP;
2917         if (f.file->f_op != &io_uring_fops)
2918                 goto out_fput;
2919
2920         ret = -ENXIO;
2921         ctx = f.file->private_data;
2922         if (!percpu_ref_tryget(&ctx->refs))
2923                 goto out_fput;
2924
2925         /*
2926          * For SQ polling, the thread will do all submissions and completions.
2927          * Just return the requested submit count, and wake the thread if
2928          * we were asked to.
2929          */
2930         if (ctx->flags & IORING_SETUP_SQPOLL) {
2931                 if (flags & IORING_ENTER_SQ_WAKEUP)
2932                         wake_up(&ctx->sqo_wait);
2933                 submitted = to_submit;
2934                 goto out_ctx;
2935         }
2936
2937         ret = 0;
2938         if (to_submit) {
2939                 to_submit = min(to_submit, ctx->sq_entries);
2940
2941                 mutex_lock(&ctx->uring_lock);
2942                 submitted = io_ring_submit(ctx, to_submit);
2943                 mutex_unlock(&ctx->uring_lock);
2944         }
2945         if (flags & IORING_ENTER_GETEVENTS) {
2946                 unsigned nr_events = 0;
2947
2948                 min_complete = min(min_complete, ctx->cq_entries);
2949
2950                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2951                         mutex_lock(&ctx->uring_lock);
2952                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2953                         mutex_unlock(&ctx->uring_lock);
2954                 } else {
2955                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2956                 }
2957         }
2958
2959 out_ctx:
2960         io_ring_drop_ctx_refs(ctx, 1);
2961 out_fput:
2962         fdput(f);
2963         return submitted ? submitted : ret;
2964 }
2965
2966 static const struct file_operations io_uring_fops = {
2967         .release        = io_uring_release,
2968         .mmap           = io_uring_mmap,
2969         .poll           = io_uring_poll,
2970         .fasync         = io_uring_fasync,
2971 };
2972
2973 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2974                                   struct io_uring_params *p)
2975 {
2976         struct io_sq_ring *sq_ring;
2977         struct io_cq_ring *cq_ring;
2978         size_t size;
2979
2980         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2981         if (!sq_ring)
2982                 return -ENOMEM;
2983
2984         ctx->sq_ring = sq_ring;
2985         sq_ring->ring_mask = p->sq_entries - 1;
2986         sq_ring->ring_entries = p->sq_entries;
2987         ctx->sq_mask = sq_ring->ring_mask;
2988         ctx->sq_entries = sq_ring->ring_entries;
2989
2990         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2991         if (size == SIZE_MAX)
2992                 return -EOVERFLOW;
2993
2994         ctx->sq_sqes = io_mem_alloc(size);
2995         if (!ctx->sq_sqes)
2996                 return -ENOMEM;
2997
2998         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2999         if (!cq_ring)
3000                 return -ENOMEM;
3001
3002         ctx->cq_ring = cq_ring;
3003         cq_ring->ring_mask = p->cq_entries - 1;
3004         cq_ring->ring_entries = p->cq_entries;
3005         ctx->cq_mask = cq_ring->ring_mask;
3006         ctx->cq_entries = cq_ring->ring_entries;
3007         return 0;
3008 }
3009
3010 /*
3011  * Allocate an anonymous fd, this is what constitutes the application
3012  * visible backing of an io_uring instance. The application mmaps this
3013  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3014  * we have to tie this fd to a socket for file garbage collection purposes.
3015  */
3016 static int io_uring_get_fd(struct io_ring_ctx *ctx)
3017 {
3018         struct file *file;
3019         int ret;
3020
3021 #if defined(CONFIG_UNIX)
3022         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3023                                 &ctx->ring_sock);
3024         if (ret)
3025                 return ret;
3026 #endif
3027
3028         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3029         if (ret < 0)
3030                 goto err;
3031
3032         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3033                                         O_RDWR | O_CLOEXEC);
3034         if (IS_ERR(file)) {
3035                 put_unused_fd(ret);
3036                 ret = PTR_ERR(file);
3037                 goto err;
3038         }
3039
3040 #if defined(CONFIG_UNIX)
3041         ctx->ring_sock->file = file;
3042         ctx->ring_sock->sk->sk_user_data = ctx;
3043 #endif
3044         fd_install(ret, file);
3045         return ret;
3046 err:
3047 #if defined(CONFIG_UNIX)
3048         sock_release(ctx->ring_sock);
3049         ctx->ring_sock = NULL;
3050 #endif
3051         return ret;
3052 }
3053
3054 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3055 {
3056         struct user_struct *user = NULL;
3057         struct io_ring_ctx *ctx;
3058         bool account_mem;
3059         int ret;
3060
3061         if (!entries || entries > IORING_MAX_ENTRIES)
3062                 return -EINVAL;
3063
3064         /*
3065          * Use twice as many entries for the CQ ring. It's possible for the
3066          * application to drive a higher depth than the size of the SQ ring,
3067          * since the sqes are only used at submission time. This allows for
3068          * some flexibility in overcommitting a bit.
3069          */
3070         p->sq_entries = roundup_pow_of_two(entries);
3071         p->cq_entries = 2 * p->sq_entries;
3072
3073         user = get_uid(current_user());
3074         account_mem = !capable(CAP_IPC_LOCK);
3075
3076         if (account_mem) {
3077                 ret = io_account_mem(user,
3078                                 ring_pages(p->sq_entries, p->cq_entries));
3079                 if (ret) {
3080                         free_uid(user);
3081                         return ret;
3082                 }
3083         }
3084
3085         ctx = io_ring_ctx_alloc(p);
3086         if (!ctx) {
3087                 if (account_mem)
3088                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3089                                                                 p->cq_entries));
3090                 free_uid(user);
3091                 return -ENOMEM;
3092         }
3093         ctx->compat = in_compat_syscall();
3094         ctx->account_mem = account_mem;
3095         ctx->user = user;
3096
3097         ret = io_allocate_scq_urings(ctx, p);
3098         if (ret)
3099                 goto err;
3100
3101         ret = io_sq_offload_start(ctx, p);
3102         if (ret)
3103                 goto err;
3104
3105         ret = io_uring_get_fd(ctx);
3106         if (ret < 0)
3107                 goto err;
3108
3109         memset(&p->sq_off, 0, sizeof(p->sq_off));
3110         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3111         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3112         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3113         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3114         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3115         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3116         p->sq_off.array = offsetof(struct io_sq_ring, array);
3117
3118         memset(&p->cq_off, 0, sizeof(p->cq_off));
3119         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3120         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3121         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3122         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3123         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3124         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3125         return ret;
3126 err:
3127         io_ring_ctx_wait_and_kill(ctx);
3128         return ret;
3129 }
3130
3131 /*
3132  * Sets up an aio uring context, and returns the fd. Applications asks for a
3133  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3134  * params structure passed in.
3135  */
3136 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3137 {
3138         struct io_uring_params p;
3139         long ret;
3140         int i;
3141
3142         if (copy_from_user(&p, params, sizeof(p)))
3143                 return -EFAULT;
3144         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3145                 if (p.resv[i])
3146                         return -EINVAL;
3147         }
3148
3149         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3150                         IORING_SETUP_SQ_AFF))
3151                 return -EINVAL;
3152
3153         ret = io_uring_create(entries, &p);
3154         if (ret < 0)
3155                 return ret;
3156
3157         if (copy_to_user(params, &p, sizeof(p)))
3158                 return -EFAULT;
3159
3160         return ret;
3161 }
3162
3163 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3164                 struct io_uring_params __user *, params)
3165 {
3166         return io_uring_setup(entries, params);
3167 }
3168
3169 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3170                                void __user *arg, unsigned nr_args)
3171         __releases(ctx->uring_lock)
3172         __acquires(ctx->uring_lock)
3173 {
3174         int ret;
3175
3176         /*
3177          * We're inside the ring mutex, if the ref is already dying, then
3178          * someone else killed the ctx or is already going through
3179          * io_uring_register().
3180          */
3181         if (percpu_ref_is_dying(&ctx->refs))
3182                 return -ENXIO;
3183
3184         percpu_ref_kill(&ctx->refs);
3185
3186         /*
3187          * Drop uring mutex before waiting for references to exit. If another
3188          * thread is currently inside io_uring_enter() it might need to grab
3189          * the uring_lock to make progress. If we hold it here across the drain
3190          * wait, then we can deadlock. It's safe to drop the mutex here, since
3191          * no new references will come in after we've killed the percpu ref.
3192          */
3193         mutex_unlock(&ctx->uring_lock);
3194         wait_for_completion(&ctx->ctx_done);
3195         mutex_lock(&ctx->uring_lock);
3196
3197         switch (opcode) {
3198         case IORING_REGISTER_BUFFERS:
3199                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3200                 break;
3201         case IORING_UNREGISTER_BUFFERS:
3202                 ret = -EINVAL;
3203                 if (arg || nr_args)
3204                         break;
3205                 ret = io_sqe_buffer_unregister(ctx);
3206                 break;
3207         case IORING_REGISTER_FILES:
3208                 ret = io_sqe_files_register(ctx, arg, nr_args);
3209                 break;
3210         case IORING_UNREGISTER_FILES:
3211                 ret = -EINVAL;
3212                 if (arg || nr_args)
3213                         break;
3214                 ret = io_sqe_files_unregister(ctx);
3215                 break;
3216         case IORING_REGISTER_EVENTFD:
3217                 ret = -EINVAL;
3218                 if (nr_args != 1)
3219                         break;
3220                 ret = io_eventfd_register(ctx, arg);
3221                 break;
3222         case IORING_UNREGISTER_EVENTFD:
3223                 ret = -EINVAL;
3224                 if (arg || nr_args)
3225                         break;
3226                 ret = io_eventfd_unregister(ctx);
3227                 break;
3228         default:
3229                 ret = -EINVAL;
3230                 break;
3231         }
3232
3233         /* bring the ctx back to life */
3234         reinit_completion(&ctx->ctx_done);
3235         percpu_ref_reinit(&ctx->refs);
3236         return ret;
3237 }
3238
3239 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3240                 void __user *, arg, unsigned int, nr_args)
3241 {
3242         struct io_ring_ctx *ctx;
3243         long ret = -EBADF;
3244         struct fd f;
3245
3246         f = fdget(fd);
3247         if (!f.file)
3248                 return -EBADF;
3249
3250         ret = -EOPNOTSUPP;
3251         if (f.file->f_op != &io_uring_fops)
3252                 goto out_fput;
3253
3254         ctx = f.file->private_data;
3255
3256         mutex_lock(&ctx->uring_lock);
3257         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3258         mutex_unlock(&ctx->uring_lock);
3259 out_fput:
3260         fdput(f);
3261         return ret;
3262 }
3263
3264 static int __init io_uring_init(void)
3265 {
3266         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3267         return 0;
3268 };
3269 __initcall(io_uring_init);