io_uring: internally retry short reads
authorJens Axboe <axboe@kernel.dk>
Thu, 13 Aug 2020 17:51:40 +0000 (11:51 -0600)
committerJens Axboe <axboe@kernel.dk>
Thu, 13 Aug 2020 23:00:31 +0000 (16:00 -0700)
We've had a few application cases of not handling short reads properly,
and it is understandable as short reads aren't really expected if the
application isn't doing non-blocking IO.

Now that we retain the iov_iter over retries, we can implement internal
retry pretty trivially. This ensures that we don't return a short read,
even for buffered reads on page cache conflicts.

Cleanup the deep nesting and hard to read nature of io_read() as well,
it's much more straight forward now to read and understand. Added a
few comments explaining the logic as well.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index 584f861786718d0f6812118e52dcc0cf11466caa..7dd6df15bc498f8eb943d03d23288afdf77ad114 100644 (file)
@@ -510,6 +510,7 @@ struct io_async_rw {
        struct iovec                    fast_iov[UIO_FASTIOV];
        const struct iovec              *free_iovec;
        struct iov_iter                 iter;
+       size_t                          bytes_done;
        struct wait_page_queue          wpq;
 };
 
@@ -916,7 +917,7 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
                               bool needs_lock);
 static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
                             const struct iovec *fast_iov,
-                            struct iov_iter *iter);
+                            struct iov_iter *iter, bool force);
 
 static struct kmem_cache *req_cachep;
 
@@ -2298,7 +2299,7 @@ static bool io_resubmit_prep(struct io_kiocb *req, int error)
        ret = io_import_iovec(rw, req, &iovec, &iter, false);
        if (ret < 0)
                goto end_req;
-       ret = io_setup_async_rw(req, iovec, inline_vecs, &iter);
+       ret = io_setup_async_rw(req, iovec, inline_vecs, &iter, false);
        if (!ret)
                return true;
        kfree(iovec);
@@ -2588,6 +2589,14 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret,
 {
        struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
 
+       /* add previously done IO, if any */
+       if (req->io && req->io->rw.bytes_done > 0) {
+               if (ret < 0)
+                       ret = req->io->rw.bytes_done;
+               else
+                       ret += req->io->rw.bytes_done;
+       }
+
        if (req->flags & REQ_F_CUR_POS)
                req->file->f_pos = kiocb->ki_pos;
        if (ret >= 0 && kiocb->ki_complete == io_complete_rw)
@@ -2935,6 +2944,7 @@ static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
 
        memcpy(&rw->iter, iter, sizeof(*iter));
        rw->free_iovec = NULL;
+       rw->bytes_done = 0;
        /* can only be fixed buffers, no need to do anything */
        if (iter->type == ITER_BVEC)
                return;
@@ -2971,9 +2981,9 @@ static int io_alloc_async_ctx(struct io_kiocb *req)
 
 static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
                             const struct iovec *fast_iov,
-                            struct iov_iter *iter)
+                            struct iov_iter *iter, bool force)
 {
-       if (!io_op_defs[req->opcode].async_ctx)
+       if (!force && !io_op_defs[req->opcode].async_ctx)
                return 0;
        if (!req->io) {
                if (__io_alloc_async_ctx(req))
@@ -3097,8 +3107,7 @@ static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb,
  * succeed, or in rare cases where it fails, we then fall back to using the
  * async worker threads for a blocking retry.
  */
-static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec,
-                              struct iovec *fast_iov, struct iov_iter *iter)
+static bool io_rw_should_retry(struct io_kiocb *req)
 {
        struct kiocb *kiocb = &req->rw.kiocb;
        int ret;
@@ -3107,8 +3116,8 @@ static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec,
        if (req->flags & REQ_F_NOWAIT)
                return false;
 
-       /* already tried, or we're doing O_DIRECT */
-       if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
+       /* Only for buffered IO */
+       if (kiocb->ki_flags & IOCB_DIRECT)
                return false;
        /*
         * just use poll if we can, and don't attempt if the fs doesn't
@@ -3117,16 +3126,6 @@ static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec,
        if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
                return false;
 
-       /*
-        * If request type doesn't require req->io to defer in general,
-        * we need to allocate it here
-        */
-       if (!req->io) {
-               if (__io_alloc_async_ctx(req))
-                       return false;
-               io_req_map_rw(req, iovec, fast_iov, iter);
-       }
-
        ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
                                                io_async_buf_func, req);
        if (!ret) {
@@ -3153,8 +3152,8 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
        struct kiocb *kiocb = &req->rw.kiocb;
        struct iov_iter __iter, *iter = &__iter;
+       ssize_t io_size, ret, ret2;
        size_t iov_count;
-       ssize_t io_size, ret, ret2 = 0;
 
        if (req->io)
                iter = &req->io->rw.iter;
@@ -3164,6 +3163,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
                return ret;
        io_size = ret;
        req->result = io_size;
+       ret = 0;
 
        /* Ensure we clear previously set non-block flag */
        if (!force_nonblock)
@@ -3178,31 +3178,62 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
        if (unlikely(ret))
                goto out_free;
 
-       ret2 = io_iter_do_read(req, iter);
+       ret = io_iter_do_read(req, iter);
 
-       /* Catch -EAGAIN return for forced non-blocking submission */
-       if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
-               kiocb_done(kiocb, ret2, cs);
-       } else {
-copy_iov:
-               ret = io_setup_async_rw(req, iovec, inline_vecs, iter);
+       if (!ret) {
+               goto done;
+       } else if (ret == -EIOCBQUEUED) {
+               ret = 0;
+               goto out_free;
+       } else if (ret == -EAGAIN) {
+               ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
                if (ret)
                        goto out_free;
-               /* it's copied and will be cleaned with ->io */
-               iovec = NULL;
-               /* if we can retry, do so with the callbacks armed */
-               if (io_rw_should_retry(req, iovec, inline_vecs, iter)) {
-                       ret2 = io_iter_do_read(req, iter);
-                       if (ret2 == -EIOCBQUEUED) {
-                               goto out_free;
-                       } else if (ret2 != -EAGAIN) {
-                               kiocb_done(kiocb, ret2, cs);
-                               goto out_free;
-                       }
-               }
+               return -EAGAIN;
+       } else if (ret < 0) {
+               goto out_free;
+       }
+
+       /* read it all, or we did blocking attempt. no retry. */
+       if (!iov_iter_count(iter) || !force_nonblock)
+               goto done;
+
+       io_size -= ret;
+copy_iov:
+       ret2 = io_setup_async_rw(req, iovec, inline_vecs, iter, true);
+       if (ret2) {
+               ret = ret2;
+               goto out_free;
+       }
+       /* it's copied and will be cleaned with ->io */
+       iovec = NULL;
+       /* now use our persistent iterator, if we aren't already */
+       iter = &req->io->rw.iter;
+retry:
+       req->io->rw.bytes_done += ret;
+       /* if we can retry, do so with the callbacks armed */
+       if (!io_rw_should_retry(req)) {
                kiocb->ki_flags &= ~IOCB_WAITQ;
                return -EAGAIN;
        }
+
+       /*
+        * Now retry read with the IOCB_WAITQ parts set in the iocb. If we
+        * get -EIOCBQUEUED, then we'll get a notification when the desired
+        * page gets unlocked. We can also get a partial read here, and if we
+        * do, then just retry at the new offset.
+        */
+       ret = io_iter_do_read(req, iter);
+       if (ret == -EIOCBQUEUED) {
+               ret = 0;
+               goto out_free;
+       } else if (ret > 0 && ret < io_size) {
+               /* we got some bytes, but not all. retry. */
+               goto retry;
+       }
+done:
+       kiocb_done(kiocb, ret, cs);
+       ret = 0;
 out_free:
        if (iovec)
                kfree(iovec);
@@ -3295,7 +3326,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
                kiocb_done(kiocb, ret2, cs);
        } else {
 copy_iov:
-               ret = io_setup_async_rw(req, iovec, inline_vecs, iter);
+               ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
                if (!ret)
                        return -EAGAIN;
        }