Merge tag 'for-6.8/io_uring-2024-01-18' of git://git.kernel.dk/linux
[sfrench/cifs-2.6.git] / io_uring / io_uring.c
index 09b6d860deba3d96e086d5f16bf654b312e5be5e..cd9a137ad6cefbb907a177fd8f0c9753ac0c70dd 100644 (file)
@@ -137,6 +137,14 @@ struct io_defer_entry {
 #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
 #define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
 
+/*
+ * No waiters. It's larger than any valid value of the tw counter
+ * so that tests against ->cq_wait_nr would fail and skip wake_up().
+ */
+#define IO_CQ_WAKE_INIT                (-1U)
+/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
+#define IO_CQ_WAKE_FORCE       (IO_CQ_WAKE_INIT >> 1)
+
 static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
                                         struct task_struct *task,
                                         bool cancel_all);
@@ -303,6 +311,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
                goto err;
 
        ctx->flags = p->flags;
+       atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
        init_waitqueue_head(&ctx->sqo_sq_wait);
        INIT_LIST_HEAD(&ctx->sqd_list);
        INIT_LIST_HEAD(&ctx->cq_overflow_list);
@@ -1304,16 +1313,23 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
 {
        struct io_ring_ctx *ctx = req->ctx;
        unsigned nr_wait, nr_tw, nr_tw_prev;
-       struct llist_node *first;
+       struct llist_node *head;
+
+       /* See comment above IO_CQ_WAKE_INIT */
+       BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
 
+       /*
+        * We don't know how many reuqests is there in the link and whether
+        * they can even be queued lazily, fall back to non-lazy.
+        */
        if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
                flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
-       first = READ_ONCE(ctx->work_llist.first);
+       head = READ_ONCE(ctx->work_llist.first);
        do {
                nr_tw_prev = 0;
-               if (first) {
-                       struct io_kiocb *first_req = container_of(first,
+               if (head) {
+                       struct io_kiocb *first_req = container_of(head,
                                                        struct io_kiocb,
                                                        io_task_work.node);
                        /*
@@ -1322,17 +1338,29 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
                         */
                        nr_tw_prev = READ_ONCE(first_req->nr_tw);
                }
+
+               /*
+                * Theoretically, it can overflow, but that's fine as one of
+                * previous adds should've tried to wake the task.
+                */
                nr_tw = nr_tw_prev + 1;
-               /* Large enough to fail the nr_wait comparison below */
                if (!(flags & IOU_F_TWQ_LAZY_WAKE))
-                       nr_tw = -1U;
+                       nr_tw = IO_CQ_WAKE_FORCE;
 
                req->nr_tw = nr_tw;
-               req->io_task_work.node.next = first;
-       } while (!try_cmpxchg(&ctx->work_llist.first, &first,
+               req->io_task_work.node.next = head;
+       } while (!try_cmpxchg(&ctx->work_llist.first, &head,
                              &req->io_task_work.node));
 
-       if (!first) {
+       /*
+        * cmpxchg implies a full barrier, which pairs with the barrier
+        * in set_current_state() on the io_cqring_wait() side. It's used
+        * to ensure that either we see updated ->cq_wait_nr, or waiters
+        * going to sleep will observe the work added to the list, which
+        * is similar to the wait/wawke task state sync.
+        */
+
+       if (!head) {
                if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
                        atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
                if (ctx->has_evfd)
@@ -1340,14 +1368,12 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
        }
 
        nr_wait = atomic_read(&ctx->cq_wait_nr);
-       /* no one is waiting */
-       if (!nr_wait)
+       /* not enough or no one is waiting */
+       if (nr_tw < nr_wait)
                return;
-       /* either not enough or the previous add has already woken it up */
-       if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
+       /* the previous add has already woken it up */
+       if (nr_tw_prev >= nr_wait)
                return;
-       /* pairs with set_current_state() in io_cqring_wait() */
-       smp_mb__after_atomic();
        wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
 }
 
@@ -2000,9 +2026,10 @@ inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
                goto out;
        fd = array_index_nospec(fd, ctx->nr_user_files);
        slot = io_fixed_file_slot(&ctx->file_table, fd);
-       file = io_slot_file(slot);
+       if (!req->rsrc_node)
+               __io_req_set_rsrc_node(req, ctx);
        req->flags |= io_slot_flags(slot);
-       io_req_set_rsrc_node(req, ctx, 0);
+       file = io_slot_file(slot);
 out:
        io_ring_submit_unlock(ctx, issue_flags);
        return file;
@@ -2613,7 +2640,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 
                ret = io_cqring_wait_schedule(ctx, &iowq);
                __set_current_state(TASK_RUNNING);
-               atomic_set(&ctx->cq_wait_nr, 0);
+               atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
 
                /*
                 * Run task_work after scheduling and before io_should_wake().
@@ -3787,7 +3814,8 @@ static int io_uring_install_fd(struct file *file)
  */
 static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
 {
-       return anon_inode_getfile_secure("[io_uring]", &io_uring_fops, ctx,
+       /* Create a new inode so that the LSM can block the creation.  */
+       return anon_inode_create_getfile("[io_uring]", &io_uring_fops, ctx,
                                         O_RDWR | O_CLOEXEC, NULL);
 }