ceph: re-send AIO write request when getting -EOLDSNAP error
authorYan, Zheng <zyan@redhat.com>
Thu, 24 Dec 2015 00:44:20 +0000 (08:44 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 21 Jan 2016 18:36:08 +0000 (19:36 +0100)
When receiving -EOLDSNAP from OSD, we need to re-send corresponding
write request. Due to locking issue, we can send new request inside
another OSD request's complete callback. So we use worker to re-send
request for AIO write.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/file.c

index 8e924b7dd4987e5070cc902d9b76456bd3dada0e..41c2267b4b7e61fb022a779e4fab079ec45f8e49 100644 (file)
@@ -554,9 +554,17 @@ struct ceph_aio_request {
        struct list_head osd_reqs;
        unsigned num_reqs;
        atomic_t pending_reqs;
+       struct timespec mtime;
        struct ceph_cap_flush *prealloc_cf;
 };
 
+struct ceph_aio_work {
+       struct work_struct work;
+       struct ceph_osd_request *req;
+};
+
+static void ceph_aio_retry_work(struct work_struct *work);
+
 static void ceph_aio_complete(struct inode *inode,
                              struct ceph_aio_request *aio_req)
 {
@@ -614,10 +622,19 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req,
             inode, rc, osd_data->length);
 
        if (rc == -EOLDSNAPC) {
-               BUG_ON(1);
-       }
-
-       if (!aio_req->write) {
+               struct ceph_aio_work *aio_work;
+               BUG_ON(!aio_req->write);
+
+               aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
+               if (aio_work) {
+                       INIT_WORK(&aio_work->work, ceph_aio_retry_work);
+                       aio_work->req = req;
+                       queue_work(ceph_inode_to_client(inode)->wb_wq,
+                                  &aio_work->work);
+                       return;
+               }
+               rc = -ENOMEM;
+       } else if (!aio_req->write) {
                if (rc == -ENOENT)
                        rc = 0;
                if (rc >= 0 && osd_data->length > rc) {
@@ -653,6 +670,69 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req,
        return;
 }
 
+static void ceph_aio_retry_work(struct work_struct *work)
+{
+       struct ceph_aio_work *aio_work =
+               container_of(work, struct ceph_aio_work, work);
+       struct ceph_osd_request *orig_req = aio_work->req;
+       struct ceph_aio_request *aio_req = orig_req->r_priv;
+       struct inode *inode = orig_req->r_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_snap_context *snapc;
+       struct ceph_osd_request *req;
+       int ret;
+
+       spin_lock(&ci->i_ceph_lock);
+       if (__ceph_have_pending_cap_snap(ci)) {
+               struct ceph_cap_snap *capsnap =
+                       list_last_entry(&ci->i_cap_snaps,
+                                       struct ceph_cap_snap,
+                                       ci_item);
+               snapc = ceph_get_snap_context(capsnap->context);
+       } else {
+               BUG_ON(!ci->i_head_snapc);
+               snapc = ceph_get_snap_context(ci->i_head_snapc);
+       }
+       spin_unlock(&ci->i_ceph_lock);
+
+       req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+                       false, GFP_NOFS);
+       if (IS_ERR(req)) {
+               ret = PTR_ERR(req);
+               req = orig_req;
+               goto out;
+       }
+
+       req->r_flags =  CEPH_OSD_FLAG_ORDERSNAP |
+                       CEPH_OSD_FLAG_ONDISK |
+                       CEPH_OSD_FLAG_WRITE;
+       req->r_base_oloc = orig_req->r_base_oloc;
+       req->r_base_oid = orig_req->r_base_oid;
+
+       req->r_ops[0] = orig_req->r_ops[0];
+       osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+
+       ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
+                               snapc, CEPH_NOSNAP, &aio_req->mtime);
+
+       ceph_put_snap_context(snapc);
+       ceph_osdc_put_request(orig_req);
+
+       req->r_callback = ceph_aio_complete_req;
+       req->r_inode = inode;
+       req->r_priv = aio_req;
+
+       ret = ceph_osdc_start_request(req->r_osdc, req, false);
+out:
+       if (ret < 0) {
+               BUG_ON(ret == -EOLDSNAPC);
+               req->r_result = ret;
+               ceph_aio_complete_req(req, NULL);
+       }
+
+       kfree(aio_work);
+}
+
 /*
  * Write commit request unsafe callback, called to tell us when a
  * request is unsafe (that is, in flight--has been handed to the
@@ -772,6 +852,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
                                aio_req->write = write;
                                INIT_LIST_HEAD(&aio_req->osd_reqs);
                                if (write) {
+                                       aio_req->mtime = mtime;
                                        swap(aio_req->prealloc_cf, *pcf);
                                }
                        }
@@ -867,6 +948,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
                                ret = ceph_osdc_start_request(req->r_osdc,
                                                              req, false);
                        if (ret < 0) {
+                               BUG_ON(ret == -EOLDSNAPC);
                                req->r_result = ret;
                                ceph_aio_complete_req(req, NULL);
                        }