ceph: add read/modify/write to ceph_sync_write
[sfrench/cifs-2.6.git] / fs / ceph / file.c
index 0abe36379fdf4d078408ab241265ec05f303ab92..45e00e42960bdcc8278d361249481f87c5d494e5 100644 (file)
@@ -1571,18 +1571,16 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_vino vino;
+       struct ceph_osd_client *osdc = &fsc->client->osdc;
        struct ceph_osd_request *req;
        struct page **pages;
        u64 len;
        int num_pages;
        int written = 0;
-       int flags;
        int ret;
        bool check_caps = false;
        struct timespec64 mtime = current_time(inode);
        size_t count = iov_iter_count(from);
-       size_t off;
 
        if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
                return -EROFS;
@@ -1602,29 +1600,235 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
        if (ret < 0)
                dout("invalidate_inode_pages2_range returned %d\n", ret);
 
-       flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
-
        while ((len = iov_iter_count(from)) > 0) {
                size_t left;
                int n;
+               u64 write_pos = pos;
+               u64 write_len = len;
+               u64 objnum, objoff;
+               u32 xlen;
+               u64 assert_ver = 0;
+               bool rmw;
+               bool first, last;
+               struct iov_iter saved_iter = *from;
+               size_t off;
+
+               ceph_fscrypt_adjust_off_and_len(inode, &write_pos, &write_len);
+
+               /* clamp the length to the end of first object */
+               ceph_calc_file_object_mapping(&ci->i_layout, write_pos,
+                                             write_len, &objnum, &objoff,
+                                             &xlen);
+               write_len = xlen;
+
+               /* adjust len downward if it goes beyond current object */
+               if (pos + len > write_pos + write_len)
+                       len = write_pos + write_len - pos;
 
-               vino = ceph_vino(inode);
-               req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                           vino, pos, &len, 0, 1,
-                                           CEPH_OSD_OP_WRITE, flags, snapc,
-                                           ci->i_truncate_seq,
-                                           ci->i_truncate_size,
-                                           false);
-               if (IS_ERR(req)) {
-                       ret = PTR_ERR(req);
-                       break;
-               }
+               /*
+                * If we had to adjust the length or position to align with a
+                * crypto block, then we must do a read/modify/write cycle. We
+                * use a version assertion to redrive the thing if something
+                * changes in between.
+                */
+               first = pos != write_pos;
+               last = (pos + len) != (write_pos + write_len);
+               rmw = first || last;
 
-               num_pages = calc_pages_for(pos, len);
+               dout("sync_write ino %llx %lld~%llu adjusted %lld~%llu -- %srmw\n",
+                    ci->i_vino.ino, pos, len, write_pos, write_len,
+                    rmw ? "" : "no ");
+
+               /*
+                * The data is emplaced into the page as it would be if it were
+                * in an array of pagecache pages.
+                */
+               num_pages = calc_pages_for(write_pos, write_len);
                pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
                if (IS_ERR(pages)) {
                        ret = PTR_ERR(pages);
-                       goto out;
+                       break;
+               }
+
+               /* Do we need to preload the pages? */
+               if (rmw) {
+                       u64 first_pos = write_pos;
+                       u64 last_pos = (write_pos + write_len) - CEPH_FSCRYPT_BLOCK_SIZE;
+                       u64 read_len = CEPH_FSCRYPT_BLOCK_SIZE;
+                       struct ceph_osd_req_op *op;
+
+                       /* We should only need to do this for encrypted inodes */
+                       WARN_ON_ONCE(!IS_ENCRYPTED(inode));
+
+                       /* No need to do two reads if first and last blocks are same */
+                       if (first && last_pos == first_pos)
+                               last = false;
+
+                       /*
+                        * Allocate a read request for one or two extents,
+                        * depending on how the request was aligned.
+                        */
+                       req = ceph_osdc_new_request(osdc, &ci->i_layout,
+                                       ci->i_vino, first ? first_pos : last_pos,
+                                       &read_len, 0, (first && last) ? 2 : 1,
+                                       CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ,
+                                       NULL, ci->i_truncate_seq,
+                                       ci->i_truncate_size, false);
+                       if (IS_ERR(req)) {
+                               ceph_release_page_vector(pages, num_pages);
+                               ret = PTR_ERR(req);
+                               break;
+                       }
+
+                       /* Something is misaligned! */
+                       if (read_len != CEPH_FSCRYPT_BLOCK_SIZE) {
+                               ceph_osdc_put_request(req);
+                               ceph_release_page_vector(pages, num_pages);
+                               ret = -EIO;
+                               break;
+                       }
+
+                       /* Add extent for first block? */
+                       op = &req->r_ops[0];
+
+                       if (first) {
+                               osd_req_op_extent_osd_data_pages(req, 0, pages,
+                                                        CEPH_FSCRYPT_BLOCK_SIZE,
+                                                        offset_in_page(first_pos),
+                                                        false, false);
+                               /* We only expect a single extent here */
+                               ret = __ceph_alloc_sparse_ext_map(op, 1);
+                               if (ret) {
+                                       ceph_osdc_put_request(req);
+                                       ceph_release_page_vector(pages, num_pages);
+                                       break;
+                               }
+                       }
+
+                       /* Add extent for last block */
+                       if (last) {
+                               /* Init the other extent if first extent has been used */
+                               if (first) {
+                                       op = &req->r_ops[1];
+                                       osd_req_op_extent_init(req, 1,
+                                                       CEPH_OSD_OP_SPARSE_READ,
+                                                       last_pos, CEPH_FSCRYPT_BLOCK_SIZE,
+                                                       ci->i_truncate_size,
+                                                       ci->i_truncate_seq);
+                               }
+
+                               ret = __ceph_alloc_sparse_ext_map(op, 1);
+                               if (ret) {
+                                       ceph_osdc_put_request(req);
+                                       ceph_release_page_vector(pages, num_pages);
+                                       break;
+                               }
+
+                               osd_req_op_extent_osd_data_pages(req, first ? 1 : 0,
+                                                       &pages[num_pages - 1],
+                                                       CEPH_FSCRYPT_BLOCK_SIZE,
+                                                       offset_in_page(last_pos),
+                                                       false, false);
+                       }
+
+                       ceph_osdc_start_request(osdc, req);
+                       ret = ceph_osdc_wait_request(osdc, req);
+
+                       /* FIXME: length field is wrong if there are 2 extents */
+                       ceph_update_read_metrics(&fsc->mdsc->metric,
+                                                req->r_start_latency,
+                                                req->r_end_latency,
+                                                read_len, ret);
+
+                       /* Ok if object is not already present */
+                       if (ret == -ENOENT) {
+                               /*
+                                * If there is no object, then we can't assert
+                                * on its version. Set it to 0, and we'll use an
+                                * exclusive create instead.
+                                */
+                               ceph_osdc_put_request(req);
+                               ret = 0;
+
+                               /*
+                                * zero out the soon-to-be uncopied parts of the
+                                * first and last pages.
+                                */
+                               if (first)
+                                       zero_user_segment(pages[0], 0,
+                                                         offset_in_page(first_pos));
+                               if (last)
+                                       zero_user_segment(pages[num_pages - 1],
+                                                         offset_in_page(last_pos),
+                                                         PAGE_SIZE);
+                       } else {
+                               if (ret < 0) {
+                                       ceph_osdc_put_request(req);
+                                       ceph_release_page_vector(pages, num_pages);
+                                       break;
+                               }
+
+                               op = &req->r_ops[0];
+                               if (op->extent.sparse_ext_cnt == 0) {
+                                       if (first)
+                                               zero_user_segment(pages[0], 0,
+                                                                 offset_in_page(first_pos));
+                                       else
+                                               zero_user_segment(pages[num_pages - 1],
+                                                                 offset_in_page(last_pos),
+                                                                 PAGE_SIZE);
+                               } else if (op->extent.sparse_ext_cnt != 1 ||
+                                          ceph_sparse_ext_map_end(op) !=
+                                               CEPH_FSCRYPT_BLOCK_SIZE) {
+                                       ret = -EIO;
+                                       ceph_osdc_put_request(req);
+                                       ceph_release_page_vector(pages, num_pages);
+                                       break;
+                               }
+
+                               if (first && last) {
+                                       op = &req->r_ops[1];
+                                       if (op->extent.sparse_ext_cnt == 0) {
+                                               zero_user_segment(pages[num_pages - 1],
+                                                                 offset_in_page(last_pos),
+                                                                 PAGE_SIZE);
+                                       } else if (op->extent.sparse_ext_cnt != 1 ||
+                                                  ceph_sparse_ext_map_end(op) !=
+                                                       CEPH_FSCRYPT_BLOCK_SIZE) {
+                                               ret = -EIO;
+                                               ceph_osdc_put_request(req);
+                                               ceph_release_page_vector(pages, num_pages);
+                                               break;
+                                       }
+                               }
+
+                               /* Grab assert version. It must be non-zero. */
+                               assert_ver = req->r_version;
+                               WARN_ON_ONCE(ret > 0 && assert_ver == 0);
+
+                               ceph_osdc_put_request(req);
+                               if (first) {
+                                       ret = ceph_fscrypt_decrypt_block_inplace(inode,
+                                                       pages[0], CEPH_FSCRYPT_BLOCK_SIZE,
+                                                       offset_in_page(first_pos),
+                                                       first_pos >> CEPH_FSCRYPT_BLOCK_SHIFT);
+                                       if (ret < 0) {
+                                               ceph_release_page_vector(pages, num_pages);
+                                               break;
+                                       }
+                               }
+                               if (last) {
+                                       ret = ceph_fscrypt_decrypt_block_inplace(inode,
+                                                       pages[num_pages - 1],
+                                                       CEPH_FSCRYPT_BLOCK_SIZE,
+                                                       offset_in_page(last_pos),
+                                                       last_pos >> CEPH_FSCRYPT_BLOCK_SHIFT);
+                                       if (ret < 0) {
+                                               ceph_release_page_vector(pages, num_pages);
+                                               break;
+                                       }
+                               }
+                       }
                }
 
                left = len;
@@ -1632,35 +1836,91 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
                for (n = 0; n < num_pages; n++) {
                        size_t plen = min_t(size_t, left, PAGE_SIZE - off);
 
+                       /* copy the data */
                        ret = copy_page_from_iter(pages[n], off, plen, from);
-                       off = 0;
                        if (ret != plen) {
                                ret = -EFAULT;
                                break;
                        }
+                       off = 0;
                        left -= ret;
                }
-
                if (ret < 0) {
+                       dout("sync_write write failed with %d\n", ret);
                        ceph_release_page_vector(pages, num_pages);
-                       goto out;
+                       break;
                }
 
-               req->r_inode = inode;
+               if (IS_ENCRYPTED(inode)) {
+                       ret = ceph_fscrypt_encrypt_pages(inode, pages,
+                                                        write_pos, write_len,
+                                                        GFP_KERNEL);
+                       if (ret < 0) {
+                               dout("encryption failed with %d\n", ret);
+                               ceph_release_page_vector(pages, num_pages);
+                               break;
+                       }
+               }
 
-               osd_req_op_extent_osd_data_pages(req, 0, pages, len,
-                                                offset_in_page(pos),
-                                                false, true);
+               req = ceph_osdc_new_request(osdc, &ci->i_layout,
+                                           ci->i_vino, write_pos, &write_len,
+                                           rmw ? 1 : 0, rmw ? 2 : 1,
+                                           CEPH_OSD_OP_WRITE,
+                                           CEPH_OSD_FLAG_WRITE,
+                                           snapc, ci->i_truncate_seq,
+                                           ci->i_truncate_size, false);
+               if (IS_ERR(req)) {
+                       ret = PTR_ERR(req);
+                       ceph_release_page_vector(pages, num_pages);
+                       break;
+               }
 
+               dout("sync_write write op %lld~%llu\n", write_pos, write_len);
+               osd_req_op_extent_osd_data_pages(req, rmw ? 1 : 0, pages, write_len,
+                                                offset_in_page(write_pos), false,
+                                                true);
+               req->r_inode = inode;
                req->r_mtime = mtime;
-               ceph_osdc_start_request(&fsc->client->osdc, req);
-               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+               /* Set up the assertion */
+               if (rmw) {
+                       /*
+                        * Set up the assertion. If we don't have a version
+                        * number, then the object doesn't exist yet. Use an
+                        * exclusive create instead of a version assertion in
+                        * that case.
+                        */
+                       if (assert_ver) {
+                               osd_req_op_init(req, 0, CEPH_OSD_OP_ASSERT_VER, 0);
+                               req->r_ops[0].assert_ver.ver = assert_ver;
+                       } else {
+                               osd_req_op_init(req, 0, CEPH_OSD_OP_CREATE,
+                                               CEPH_OSD_OP_FLAG_EXCL);
+                       }
+               }
+
+               ceph_osdc_start_request(osdc, req);
+               ret = ceph_osdc_wait_request(osdc, req);
 
                ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
                                          req->r_end_latency, len, ret);
-out:
                ceph_osdc_put_request(req);
                if (ret != 0) {
+                       dout("sync_write osd write returned %d\n", ret);
+                       /* Version changed! Must re-do the rmw cycle */
+                       if ((assert_ver && (ret == -ERANGE || ret == -EOVERFLOW)) ||
+                           (!assert_ver && ret == -EEXIST)) {
+                               /* We should only ever see this on a rmw */
+                               WARN_ON_ONCE(!rmw);
+
+                               /* The version should never go backward */
+                               WARN_ON_ONCE(ret == -EOVERFLOW);
+
+                               *from = saved_iter;
+
+                               /* FIXME: limit number of times we loop? */
+                               continue;
+                       }
                        ceph_set_error_write(ci);
                        break;
                }
@@ -1668,6 +1928,7 @@ out:
                ceph_clear_error_write(ci);
                pos += len;
                written += len;
+               dout("sync_write written %d\n", written);
                if (pos > i_size_read(inode)) {
                        check_caps = ceph_inode_set_size(inode, pos);
                        if (check_caps)
@@ -1681,6 +1942,7 @@ out:
                ret = written;
                iocb->ki_pos = pos;
        }
+       dout("sync_write returning %d\n", ret);
        return ret;
 }