Merge tag 'for-5.4/io_uring-2019-09-27' of git://git.kernel.dk/linux-block
[sfrench/cifs-2.6.git] / fs / io_uring.c
index 0dadbdbead0fbfef8b0f2373756b3254832ada53..aa8ac557493cb5e20f1ac78a87bb6b8e6e6f6fe8 100644 (file)
@@ -200,6 +200,7 @@ struct io_ring_ctx {
                struct io_uring_sqe     *sq_sqes;
 
                struct list_head        defer_list;
+               struct list_head        timeout_list;
        } ____cacheline_aligned_in_smp;
 
        /* IO offload */
@@ -216,6 +217,7 @@ struct io_ring_ctx {
                struct wait_queue_head  cq_wait;
                struct fasync_struct    *cq_fasync;
                struct eventfd_ctx      *cq_ev_fd;
+               atomic_t                cq_timeouts;
        } ____cacheline_aligned_in_smp;
 
        struct io_rings *rings;
@@ -283,6 +285,11 @@ struct io_poll_iocb {
        struct wait_queue_entry         wait;
 };
 
+struct io_timeout {
+       struct file                     *file;
+       struct hrtimer                  timer;
+};
+
 /*
  * NOTE! Each of the iocb union members has the file pointer
  * as the first entry in their struct definition. So you can
@@ -294,6 +301,7 @@ struct io_kiocb {
                struct file             *file;
                struct kiocb            rw;
                struct io_poll_iocb     poll;
+               struct io_timeout       timeout;
        };
 
        struct sqe_submit       submit;
@@ -313,6 +321,7 @@ struct io_kiocb {
 #define REQ_F_LINK_DONE                128     /* linked sqes done */
 #define REQ_F_FAIL_LINK                256     /* fail rest of links */
 #define REQ_F_SHADOW_DRAIN     512     /* link-drain shadow req */
+#define REQ_F_TIMEOUT          1024    /* timeout request */
        u64                     user_data;
        u32                     result;
        u32                     sequence;
@@ -344,6 +353,8 @@ struct io_submit_state {
 };
 
 static void io_sq_wq_submit_work(struct work_struct *work);
+static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
+                                long res);
 static void __io_free_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
@@ -400,26 +411,30 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        INIT_LIST_HEAD(&ctx->poll_list);
        INIT_LIST_HEAD(&ctx->cancel_list);
        INIT_LIST_HEAD(&ctx->defer_list);
+       INIT_LIST_HEAD(&ctx->timeout_list);
        return ctx;
 }
 
 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
                                     struct io_kiocb *req)
 {
-       if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
+       /* timeout requests always honor sequence */
+       if (!(req->flags & REQ_F_TIMEOUT) &&
+           (req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
                return false;
 
        return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped;
 }
 
-static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
+static struct io_kiocb *__io_get_deferred_req(struct io_ring_ctx *ctx,
+                                             struct list_head *list)
 {
        struct io_kiocb *req;
 
-       if (list_empty(&ctx->defer_list))
+       if (list_empty(list))
                return NULL;
 
-       req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
+       req = list_first_entry(list, struct io_kiocb, list);
        if (!io_sequence_defer(ctx, req)) {
                list_del_init(&req->list);
                return req;
@@ -428,6 +443,16 @@ static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
        return NULL;
 }
 
+static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
+{
+       return __io_get_deferred_req(ctx, &ctx->defer_list);
+}
+
+static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
+{
+       return __io_get_deferred_req(ctx, &ctx->timeout_list);
+}
+
 static void __io_commit_cqring(struct io_ring_ctx *ctx)
 {
        struct io_rings *rings = ctx->rings;
@@ -446,25 +471,50 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
 static inline void io_queue_async_work(struct io_ring_ctx *ctx,
                                       struct io_kiocb *req)
 {
-       int rw;
+       int rw = 0;
 
-       switch (req->submit.sqe->opcode) {
-       case IORING_OP_WRITEV:
-       case IORING_OP_WRITE_FIXED:
-               rw = !(req->rw.ki_flags & IOCB_DIRECT);
-               break;
-       default:
-               rw = 0;
-               break;
+       if (req->submit.sqe) {
+               switch (req->submit.sqe->opcode) {
+               case IORING_OP_WRITEV:
+               case IORING_OP_WRITE_FIXED:
+                       rw = !(req->rw.ki_flags & IOCB_DIRECT);
+                       break;
+               }
        }
 
        queue_work(ctx->sqo_wq[rw], &req->work);
 }
 
+static void io_kill_timeout(struct io_kiocb *req)
+{
+       int ret;
+
+       ret = hrtimer_try_to_cancel(&req->timeout.timer);
+       if (ret != -1) {
+               atomic_inc(&req->ctx->cq_timeouts);
+               list_del(&req->list);
+               io_cqring_fill_event(req->ctx, req->user_data, 0);
+               __io_free_req(req);
+       }
+}
+
+static void io_kill_timeouts(struct io_ring_ctx *ctx)
+{
+       struct io_kiocb *req, *tmp;
+
+       spin_lock_irq(&ctx->completion_lock);
+       list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
+               io_kill_timeout(req);
+       spin_unlock_irq(&ctx->completion_lock);
+}
+
 static void io_commit_cqring(struct io_ring_ctx *ctx)
 {
        struct io_kiocb *req;
 
+       while ((req = io_get_timeout_req(ctx)) != NULL)
+               io_kill_timeout(req);
+
        __io_commit_cqring(ctx);
 
        while ((req = io_get_deferred_req(ctx)) != NULL) {
@@ -1248,6 +1298,51 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
        }
 }
 
+/*
+ * For files that don't have ->read_iter() and ->write_iter(), handle them
+ * by looping over ->read() or ->write() manually.
+ */
+static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
+                          struct iov_iter *iter)
+{
+       ssize_t ret = 0;
+
+       /*
+        * Don't support polled IO through this interface, and we can't
+        * support non-blocking either. For the latter, this just causes
+        * the kiocb to be handled from an async context.
+        */
+       if (kiocb->ki_flags & IOCB_HIPRI)
+               return -EOPNOTSUPP;
+       if (kiocb->ki_flags & IOCB_NOWAIT)
+               return -EAGAIN;
+
+       while (iov_iter_count(iter)) {
+               struct iovec iovec = iov_iter_iovec(iter);
+               ssize_t nr;
+
+               if (rw == READ) {
+                       nr = file->f_op->read(file, iovec.iov_base,
+                                             iovec.iov_len, &kiocb->ki_pos);
+               } else {
+                       nr = file->f_op->write(file, iovec.iov_base,
+                                              iovec.iov_len, &kiocb->ki_pos);
+               }
+
+               if (nr < 0) {
+                       if (!ret)
+                               ret = nr;
+                       break;
+               }
+               ret += nr;
+               if (nr != iovec.iov_len)
+                       break;
+               iov_iter_advance(iter, nr);
+       }
+
+       return ret;
+}
+
 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
                   bool force_nonblock)
 {
@@ -1265,8 +1360,6 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
 
        if (unlikely(!(file->f_mode & FMODE_READ)))
                return -EBADF;
-       if (unlikely(!file->f_op->read_iter))
-               return -EINVAL;
 
        ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
        if (ret < 0)
@@ -1281,7 +1374,11 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
        if (!ret) {
                ssize_t ret2;
 
-               ret2 = call_read_iter(file, kiocb, &iter);
+               if (file->f_op->read_iter)
+                       ret2 = call_read_iter(file, kiocb, &iter);
+               else
+                       ret2 = loop_rw_iter(READ, file, kiocb, &iter);
+
                /*
                 * In case of a short read, punt to async. This can happen
                 * if we have data partially cached. Alternatively we can
@@ -1326,8 +1423,6 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
        file = kiocb->ki_filp;
        if (unlikely(!(file->f_mode & FMODE_WRITE)))
                return -EBADF;
-       if (unlikely(!file->f_op->write_iter))
-               return -EINVAL;
 
        ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
        if (ret < 0)
@@ -1365,7 +1460,10 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
                }
                kiocb->ki_flags |= IOCB_WRITE;
 
-               ret2 = call_write_iter(file, kiocb, &iter);
+               if (file->f_op->write_iter)
+                       ret2 = call_write_iter(file, kiocb, &iter);
+               else
+                       ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
                if (!force_nonblock || ret2 != -EAGAIN) {
                        io_rw_done(kiocb, ret2);
                } else {
@@ -1714,6 +1812,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
        if (!poll->file)
                return -EBADF;
 
+       req->submit.sqe = NULL;
        INIT_WORK(&req->work, io_poll_complete_work);
        events = READ_ONCE(sqe->poll_events);
        poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
@@ -1765,6 +1864,81 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
        return ipt.error;
 }
 
+static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
+{
+       struct io_ring_ctx *ctx;
+       struct io_kiocb *req;
+       unsigned long flags;
+
+       req = container_of(timer, struct io_kiocb, timeout.timer);
+       ctx = req->ctx;
+       atomic_inc(&ctx->cq_timeouts);
+
+       spin_lock_irqsave(&ctx->completion_lock, flags);
+       list_del(&req->list);
+
+       io_cqring_fill_event(ctx, req->user_data, -ETIME);
+       io_commit_cqring(ctx);
+       spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
+       io_cqring_ev_posted(ctx);
+
+       io_put_req(req);
+       return HRTIMER_NORESTART;
+}
+
+static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+       unsigned count, req_dist, tail_index;
+       struct io_ring_ctx *ctx = req->ctx;
+       struct list_head *entry;
+       struct timespec ts;
+
+       if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
+               return -EINVAL;
+       if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->timeout_flags ||
+           sqe->len != 1)
+               return -EINVAL;
+       if (copy_from_user(&ts, (void __user *) (unsigned long) sqe->addr,
+           sizeof(ts)))
+               return -EFAULT;
+
+       /*
+        * sqe->off holds how many events that need to occur for this
+        * timeout event to be satisfied.
+        */
+       count = READ_ONCE(sqe->off);
+       if (!count)
+               count = 1;
+
+       req->sequence = ctx->cached_sq_head + count - 1;
+       req->flags |= REQ_F_TIMEOUT;
+
+       /*
+        * Insertion sort, ensuring the first entry in the list is always
+        * the one we need first.
+        */
+       tail_index = ctx->cached_cq_tail - ctx->rings->sq_dropped;
+       req_dist = req->sequence - tail_index;
+       spin_lock_irq(&ctx->completion_lock);
+       list_for_each_prev(entry, &ctx->timeout_list) {
+               struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
+               unsigned dist;
+
+               dist = nxt->sequence - tail_index;
+               if (req_dist >= dist)
+                       break;
+       }
+       list_add(&req->list, entry);
+       spin_unlock_irq(&ctx->completion_lock);
+
+       hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+       req->timeout.timer.function = io_timeout_fn;
+       hrtimer_start(&req->timeout.timer, timespec_to_ktime(ts),
+                       HRTIMER_MODE_REL);
+       return 0;
+}
+
 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
                        const struct io_uring_sqe *sqe)
 {
@@ -1842,6 +2016,9 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        case IORING_OP_RECVMSG:
                ret = io_recvmsg(req, s->sqe, force_nonblock);
                break;
+       case IORING_OP_TIMEOUT:
+               ret = io_timeout(req, s->sqe);
+               break;
        default:
                ret = -EINVAL;
                break;
@@ -2098,13 +2275,11 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
 
-               sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
+               sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
                if (sqe_copy) {
                        struct async_list *list;
 
-                       memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
                        s->sqe = sqe_copy;
-
                        memcpy(&req->submit, s, sizeof(*s));
                        list = io_async_list_from_sqe(ctx, s->sqe);
                        if (!io_add_to_prev_work(list, req)) {
@@ -2359,18 +2534,22 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                        io_queue_link_head(ctx, link, &link->submit, shadow_req,
                                                true);
                        link = NULL;
+                       shadow_req = NULL;
                }
                prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 
                if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
                        if (!shadow_req) {
                                shadow_req = io_get_req(ctx, NULL);
+                               if (unlikely(!shadow_req))
+                                       goto out;
                                shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
                                refcount_dec(&shadow_req->refs);
                        }
                        shadow_req->sequence = sqes[i].sequence;
                }
 
+out:
                if (unlikely(mm_fault)) {
                        io_cqring_add_event(ctx, sqes[i].sqe->user_data,
                                                -EFAULT);
@@ -2436,7 +2615,7 @@ static int io_sq_thread(void *data)
                         * to sleep.
                         */
                        if (inflight || !time_after(jiffies, timeout)) {
-                               cpu_relax();
+                               cond_resched();
                                continue;
                        }
 
@@ -2545,18 +2724,22 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
                        io_queue_link_head(ctx, link, &link->submit, shadow_req,
                                                force_nonblock);
                        link = NULL;
+                       shadow_req = NULL;
                }
                prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 
                if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
                        if (!shadow_req) {
                                shadow_req = io_get_req(ctx, NULL);
+                               if (unlikely(!shadow_req))
+                                       goto out;
                                shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
                                refcount_dec(&shadow_req->refs);
                        }
                        shadow_req->sequence = s.sequence;
                }
 
+out:
                s.has_user = true;
                s.needs_lock = false;
                s.needs_fixed_file = false;
@@ -2585,6 +2768,38 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
        return submit;
 }
 
+struct io_wait_queue {
+       struct wait_queue_entry wq;
+       struct io_ring_ctx *ctx;
+       unsigned to_wait;
+       unsigned nr_timeouts;
+};
+
+static inline bool io_should_wake(struct io_wait_queue *iowq)
+{
+       struct io_ring_ctx *ctx = iowq->ctx;
+
+       /*
+        * Wake up if we have enough events, or if a timeout occured since we
+        * started waiting. For timeouts, we always want to return to userspace,
+        * regardless of event count.
+        */
+       return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+                       atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
+}
+
+static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
+                           int wake_flags, void *key)
+{
+       struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
+                                                       wq);
+
+       if (!io_should_wake(iowq))
+               return -1;
+
+       return autoremove_wake_function(curr, mode, wake_flags, key);
+}
+
 /*
  * Wait until events become available, if we don't already have some. The
  * application must reap them itself, as they reside on the shared cq ring.
@@ -2592,6 +2807,15 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                          const sigset_t __user *sig, size_t sigsz)
 {
+       struct io_wait_queue iowq = {
+               .wq = {
+                       .private        = current,
+                       .func           = io_wake_function,
+                       .entry          = LIST_HEAD_INIT(iowq.wq.entry),
+               },
+               .ctx            = ctx,
+               .to_wait        = min_events,
+       };
        struct io_rings *rings = ctx->rings;
        int ret;
 
@@ -2611,7 +2835,21 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                        return ret;
        }
 
-       ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events);
+       ret = 0;
+       iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
+       do {
+               prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
+                                               TASK_INTERRUPTIBLE);
+               if (io_should_wake(&iowq))
+                       break;
+               schedule();
+               if (signal_pending(current)) {
+                       ret = -ERESTARTSYS;
+                       break;
+               }
+       } while (1);
+       finish_wait(&ctx->wait, &iowq.wq);
+
        restore_saved_sigmask_unless(ret == -ERESTARTSYS);
        if (ret == -ERESTARTSYS)
                ret = -EINTR;
@@ -3263,7 +3501,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
        if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
            ctx->rings->sq_ring_entries)
                mask |= EPOLLOUT | EPOLLWRNORM;
-       if (READ_ONCE(ctx->rings->sq.head) != ctx->cached_cq_tail)
+       if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
                mask |= EPOLLIN | EPOLLRDNORM;
 
        return mask;
@@ -3282,6 +3520,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
        percpu_ref_kill(&ctx->refs);
        mutex_unlock(&ctx->uring_lock);
 
+       io_kill_timeouts(ctx);
        io_poll_remove_all(ctx);
        io_iopoll_reap_events(ctx);
        wait_for_completion(&ctx->ctx_done);
@@ -3319,7 +3558,7 @@ static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
        }
 
        page = virt_to_head_page(ptr);
-       if (sz > (PAGE_SIZE << compound_order(page)))
+       if (sz > page_size(page))
                return -EINVAL;
 
        pfn = virt_to_phys(ptr) >> PAGE_SHIFT;