Merge tag 'ceph-for-4.16-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 8 Feb 2018 19:38:59 +0000 (11:38 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 8 Feb 2018 19:38:59 +0000 (11:38 -0800)
Pull ceph updates from Ilya Dryomov:
 "Things have been very quiet on the rbd side, as work continues on the
  big ticket items slated for the next merge window.

  On the CephFS side we have a large number of cap handling
  improvements, a fix for our long-standing abuse of ->journal_info in
  ceph_readpages() and yet another dentry pointer management patch"

* tag 'ceph-for-4.16-rc1' of git://github.com/ceph/ceph-client:
  ceph: improving efficiency of syncfs
  libceph: check kstrndup() return value
  ceph: try to allocate enough memory for reserved caps
  ceph: fix race of queuing delayed caps
  ceph: delete unreachable code in ceph_check_caps()
  ceph: limit rate of cap import/export error messages
  ceph: fix incorrect snaprealm when adding caps
  ceph: fix un-balanced fsc->writeback_count update
  ceph: track read contexts in ceph_file_info
  ceph: avoid dereferencing invalid pointer during cached readdir
  ceph: use atomic_t for ceph_inode_info::i_shared_gen
  ceph: cleanup traceless reply handling for rename
  ceph: voluntarily drop Fx cap for readdir request
  ceph: properly drop caps for setattr request
  ceph: voluntarily drop Lx cap for link/rename requests
  ceph: voluntarily drop Ax cap for requests that create new inode
  rbd: whitelist RBD_FEATURE_OPERATIONS feature bit
  rbd: don't NULL out ->obj_request in rbd_img_obj_parent_read_full()
  rbd: use kmem_cache_zalloc() in rbd_img_request_create()
  rbd: obj_request->completion is unused

drivers/block/rbd.c
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/snap.c
fs/ceph/super.h
net/ceph/ceph_common.c

index cc93522a6d419dc77902bab085306c2b316f6c16..8e40da0937667a2e621814f02910adc158809487 100644 (file)
@@ -124,11 +124,13 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define RBD_FEATURE_STRIPINGV2         (1ULL<<1)
 #define RBD_FEATURE_EXCLUSIVE_LOCK     (1ULL<<2)
 #define RBD_FEATURE_DATA_POOL          (1ULL<<7)
+#define RBD_FEATURE_OPERATIONS         (1ULL<<8)
 
 #define RBD_FEATURES_ALL       (RBD_FEATURE_LAYERING |         \
                                 RBD_FEATURE_STRIPINGV2 |       \
                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
-                                RBD_FEATURE_DATA_POOL)
+                                RBD_FEATURE_DATA_POOL |        \
+                                RBD_FEATURE_OPERATIONS)
 
 /* Features supported by this (client software) implementation. */
 
@@ -281,7 +283,6 @@ struct rbd_obj_request {
        int                     result;
 
        rbd_obj_callback_t      callback;
-       struct completion       completion;
 
        struct kref             kref;
 };
@@ -1734,10 +1735,7 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
 {
        dout("%s: obj %p cb %p\n", __func__, obj_request,
                obj_request->callback);
-       if (obj_request->callback)
-               obj_request->callback(obj_request);
-       else
-               complete_all(&obj_request->completion);
+       obj_request->callback(obj_request);
 }
 
 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
@@ -2013,7 +2011,6 @@ rbd_obj_request_create(enum obj_request_type type)
        obj_request->which = BAD_WHICH;
        obj_request->type = type;
        INIT_LIST_HEAD(&obj_request->links);
-       init_completion(&obj_request->completion);
        kref_init(&obj_request->kref);
 
        dout("%s %p\n", __func__, obj_request);
@@ -2129,15 +2126,13 @@ static struct rbd_img_request *rbd_img_request_create(
 {
        struct rbd_img_request *img_request;
 
-       img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
+       img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
        if (!img_request)
                return NULL;
 
-       img_request->rq = NULL;
        img_request->rbd_dev = rbd_dev;
        img_request->offset = offset;
        img_request->length = length;
-       img_request->flags = 0;
        if (op_type == OBJ_OP_DISCARD) {
                img_request_discard_set(img_request);
                img_request->snapc = snapc;
@@ -2149,11 +2144,8 @@ static struct rbd_img_request *rbd_img_request_create(
        }
        if (rbd_dev_parent_get(rbd_dev))
                img_request_layered_set(img_request);
+
        spin_lock_init(&img_request->completion_lock);
-       img_request->next_completion = 0;
-       img_request->callback = NULL;
-       img_request->result = 0;
-       img_request->obj_request_count = 0;
        INIT_LIST_HEAD(&img_request->obj_requests);
        kref_init(&img_request->kref);
 
@@ -2692,8 +2684,6 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
 
        parent_request->copyup_pages = NULL;
        parent_request->copyup_page_count = 0;
-       parent_request->obj_request = NULL;
-       rbd_obj_request_put(obj_request);
 out_err:
        if (pages)
                ceph_release_page_vector(pages, page_count);
index dbf07051aacd26b3daafd1e87a0e1d7e36fe553c..b4336b42ce3bb1fabec247c01f51d86c2c28490f 100644 (file)
@@ -299,7 +299,8 @@ unlock:
  * start an async read(ahead) operation.  return nr_pages we submitted
  * a read for on success, or negative error code.
  */
-static int start_read(struct inode *inode, struct list_head *page_list, int max)
+static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
+                     struct list_head *page_list, int max)
 {
        struct ceph_osd_client *osdc =
                &ceph_inode_to_client(inode)->client->osdc;
@@ -316,7 +317,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
        int got = 0;
        int ret = 0;
 
-       if (!current->journal_info) {
+       if (!rw_ctx) {
                /* caller of readpages does not hold buffer and read caps
                 * (fadvise, madvise and readahead cases) */
                int want = CEPH_CAP_FILE_CACHE;
@@ -437,6 +438,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 {
        struct inode *inode = file_inode(file);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_file_info *ci = file->private_data;
+       struct ceph_rw_context *rw_ctx;
        int rc = 0;
        int max = 0;
 
@@ -449,11 +452,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
        if (rc == 0)
                goto out;
 
+       rw_ctx = ceph_find_rw_context(ci);
        max = fsc->mount_options->rsize >> PAGE_SHIFT;
-       dout("readpages %p file %p nr_pages %d max %d\n",
-            inode, file, nr_pages, max);
+       dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
+            inode, file, rw_ctx, nr_pages, max);
        while (!list_empty(page_list)) {
-               rc = start_read(inode, page_list, max);
+               rc = start_read(inode, rw_ctx, page_list, max);
                if (rc < 0)
                        goto out;
        }
@@ -574,7 +578,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        struct ceph_fs_client *fsc;
        struct ceph_snap_context *snapc, *oldest;
        loff_t page_off = page_offset(page);
-       long writeback_stat;
        int err, len = PAGE_SIZE;
        struct ceph_writeback_ctl ceph_wbc;
 
@@ -615,8 +618,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
             inode, page, page->index, page_off, len, snapc, snapc->seq);
 
-       writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
-       if (writeback_stat >
+       if (atomic_long_inc_return(&fsc->writeback_count) >
            CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
                set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
 
@@ -651,6 +653,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        end_page_writeback(page);
        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
        ceph_put_snap_context(snapc);  /* page's reference */
+
+       if (atomic_long_dec_return(&fsc->writeback_count) <
+           CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
+               clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
+
        return err;
 }
 
@@ -1450,9 +1457,10 @@ static int ceph_filemap_fault(struct vm_fault *vmf)
 
        if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
            ci->i_inline_version == CEPH_INLINE_NONE) {
-               current->journal_info = vma->vm_file;
+               CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
+               ceph_add_rw_context(fi, &rw_ctx);
                ret = filemap_fault(vmf);
-               current->journal_info = NULL;
+               ceph_del_rw_context(fi, &rw_ctx);
        } else
                ret = -EAGAIN;
 
index a14b2c974c9eacea27943fe7a995e21ce4e1bb3a..6582c4507e6c9d1fdf2876c13c424f0c5830495c 100644 (file)
@@ -154,13 +154,19 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
        spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_reserve_caps(struct ceph_mds_client *mdsc,
+/*
+ * Called under mdsc->mutex.
+ */
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
                      struct ceph_cap_reservation *ctx, int need)
 {
-       int i;
+       int i, j;
        struct ceph_cap *cap;
        int have;
        int alloc = 0;
+       int max_caps;
+       bool trimmed = false;
+       struct ceph_mds_session *s;
        LIST_HEAD(newcaps);
 
        dout("reserve caps ctx=%p need=%d\n", ctx, need);
@@ -179,16 +185,37 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc,
        spin_unlock(&mdsc->caps_list_lock);
 
        for (i = have; i < need; i++) {
+retry:
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
-               if (!cap)
-                       break;
+               if (!cap) {
+                       if (!trimmed) {
+                               for (j = 0; j < mdsc->max_sessions; j++) {
+                                       s = __ceph_lookup_mds_session(mdsc, j);
+                                       if (!s)
+                                               continue;
+                                       mutex_unlock(&mdsc->mutex);
+
+                                       mutex_lock(&s->s_mutex);
+                                       max_caps = s->s_nr_caps - (need - i);
+                                       ceph_trim_caps(mdsc, s, max_caps);
+                                       mutex_unlock(&s->s_mutex);
+
+                                       ceph_put_mds_session(s);
+                                       mutex_lock(&mdsc->mutex);
+                               }
+                               trimmed = true;
+                               goto retry;
+                       } else {
+                               pr_warn("reserve caps ctx=%p ENOMEM "
+                                       "need=%d got=%d\n",
+                                       ctx, need, have + alloc);
+                               goto out_nomem;
+                       }
+               }
                list_add(&cap->caps_item, &newcaps);
                alloc++;
        }
-       /* we didn't manage to reserve as much as we needed */
-       if (have + alloc != need)
-               pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
-                       ctx, need, have + alloc);
+       BUG_ON(have + alloc != need);
 
        spin_lock(&mdsc->caps_list_lock);
        mdsc->caps_total_count += alloc;
@@ -204,6 +231,24 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc,
        dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
             ctx, mdsc->caps_total_count, mdsc->caps_use_count,
             mdsc->caps_reserve_count, mdsc->caps_avail_count);
+       return 0;
+
+out_nomem:
+       while (!list_empty(&newcaps)) {
+               cap = list_first_entry(&newcaps,
+                               struct ceph_cap, caps_item);
+               list_del(&cap->caps_item);
+               kmem_cache_free(ceph_cap_cachep, cap);
+       }
+
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_avail_count += have;
+       mdsc->caps_reserve_count -= have;
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                        mdsc->caps_reserve_count +
+                                        mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
+       return -ENOMEM;
 }
 
 int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
@@ -498,7 +543,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
         */
        if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
                if (issued & CEPH_CAP_FILE_SHARED)
-                       ci->i_shared_gen++;
+                       atomic_inc(&ci->i_shared_gen);
                if (S_ISDIR(ci->vfs_inode.i_mode)) {
                        dout(" marking %p NOT complete\n", &ci->vfs_inode);
                        __ceph_dir_clear_complete(ci);
@@ -577,18 +622,30 @@ void ceph_add_cap(struct inode *inode,
                }
        }
 
-       if (!ci->i_snap_realm) {
+       if (!ci->i_snap_realm ||
+           ((flags & CEPH_CAP_FLAG_AUTH) &&
+            realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
                /*
                 * add this inode to the appropriate snap realm
                 */
                struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
                                                               realmino);
                if (realm) {
+                       struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
+                       if (oldrealm) {
+                               spin_lock(&oldrealm->inodes_with_caps_lock);
+                               list_del_init(&ci->i_snap_realm_item);
+                               spin_unlock(&oldrealm->inodes_with_caps_lock);
+                       }
+
                        spin_lock(&realm->inodes_with_caps_lock);
                        ci->i_snap_realm = realm;
                        list_add(&ci->i_snap_realm_item,
                                 &realm->inodes_with_caps);
                        spin_unlock(&realm->inodes_with_caps_lock);
+
+                       if (oldrealm)
+                               ceph_put_snap_realm(mdsc, oldrealm);
                } else {
                        pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
                               realmino);
@@ -890,6 +947,11 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
 /*
  * called under i_ceph_lock
  */
+static int __ceph_is_single_caps(struct ceph_inode_info *ci)
+{
+       return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
+}
+
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
        return !RB_EMPTY_ROOT(&ci->i_caps);
@@ -1703,21 +1765,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
        struct rb_node *p;
-       int delayed = 0, sent = 0, num;
-       bool is_delayed = flags & CHECK_CAPS_NODELAY;
+       int delayed = 0, sent = 0;
+       bool no_delay = flags & CHECK_CAPS_NODELAY;
        bool queue_invalidate = false;
-       bool force_requeue = false;
        bool tried_invalidate = false;
 
        /* if we are unmounting, flush any unused caps immediately. */
        if (mdsc->stopping)
-               is_delayed = true;
+               no_delay = true;
 
        spin_lock(&ci->i_ceph_lock);
 
        if (ci->i_ceph_flags & CEPH_I_FLUSH)
                flags |= CHECK_CAPS_FLUSH;
 
+       if (!(flags & CHECK_CAPS_AUTHONLY) ||
+           (ci->i_auth_cap && __ceph_is_single_caps(ci)))
+               __cap_delay_cancel(mdsc, ci);
+
        goto retry_locked;
 retry:
        spin_lock(&ci->i_ceph_lock);
@@ -1772,7 +1837,7 @@ retry_locked:
         * have cached pages, but don't want them, then try to invalidate.
         * If we fail, it's because pages are locked.... try again later.
         */
-       if ((!is_delayed || mdsc->stopping) &&
+       if ((!no_delay || mdsc->stopping) &&
            !S_ISDIR(inode->i_mode) &&          /* ignore readdir cache */
            !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
            inode->i_data.nrpages &&            /* have cached pages */
@@ -1781,27 +1846,16 @@ retry_locked:
            !tried_invalidate) {
                dout("check_caps trying to invalidate on %p\n", inode);
                if (try_nonblocking_invalidate(inode) < 0) {
-                       if (revoking & (CEPH_CAP_FILE_CACHE|
-                                       CEPH_CAP_FILE_LAZYIO)) {
-                               dout("check_caps queuing invalidate\n");
-                               queue_invalidate = true;
-                               ci->i_rdcache_revoking = ci->i_rdcache_gen;
-                       } else {
-                               dout("check_caps failed to invalidate pages\n");
-                               /* we failed to invalidate pages.  check these
-                                  caps again later. */
-                               force_requeue = true;
-                               __cap_set_timeouts(mdsc, ci);
-                       }
+                       dout("check_caps queuing invalidate\n");
+                       queue_invalidate = true;
+                       ci->i_rdcache_revoking = ci->i_rdcache_gen;
                }
                tried_invalidate = true;
                goto retry_locked;
        }
 
-       num = 0;
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
                cap = rb_entry(p, struct ceph_cap, ci_node);
-               num++;
 
                /* avoid looping forever */
                if (mds >= cap->mds ||
@@ -1864,7 +1918,7 @@ retry_locked:
                    cap->mds_wanted == want)
                        continue;     /* nope, all good */
 
-               if (is_delayed)
+               if (no_delay)
                        goto ack;
 
                /* delay? */
@@ -1955,15 +2009,8 @@ ack:
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
 
-       /*
-        * Reschedule delayed caps release if we delayed anything,
-        * otherwise cancel.
-        */
-       if (delayed && is_delayed)
-               force_requeue = true;   /* __send_cap delayed release; requeue */
-       if (!delayed && !is_delayed)
-               __cap_delay_cancel(mdsc, ci);
-       else if (!is_delayed || force_requeue)
+       /* Reschedule delayed caps release if we delayed anything */
+       if (delayed)
                __cap_delay_requeue(mdsc, ci);
 
        spin_unlock(&ci->i_ceph_lock);
@@ -2160,7 +2207,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
        u64 flush_tid;
        int err = 0;
        int dirty;
-       int wait = wbc->sync_mode == WB_SYNC_ALL;
+       int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
 
        dout("write_inode %p wait=%d\n", inode, wait);
        if (wait) {
@@ -3426,7 +3473,14 @@ retry:
         */
 
        issued = cap->issued;
-       WARN_ON(issued != cap->implemented);
+       if (issued != cap->implemented)
+               pr_err_ratelimited("handle_cap_export: issued != implemented: "
+                               "ino (%llx.%llx) mds%d seq %d mseq %d "
+                               "issued %s implemented %s\n",
+                               ceph_vinop(inode), mds, cap->seq, cap->mseq,
+                               ceph_cap_string(issued),
+                               ceph_cap_string(cap->implemented));
+
 
        tcap = __get_cap_for_mds(ci, target);
        if (tcap) {
@@ -3572,12 +3626,13 @@ retry:
                if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
                    (ocap->seq != le32_to_cpu(ph->seq) ||
                     ocap->mseq != le32_to_cpu(ph->mseq))) {
-                       pr_err("handle_cap_import: mismatched seq/mseq: "
-                              "ino (%llx.%llx) mds%d seq %d mseq %d "
-                              "importer mds%d has peer seq %d mseq %d\n",
-                              ceph_vinop(inode), peer, ocap->seq,
-                              ocap->mseq, mds, le32_to_cpu(ph->seq),
-                              le32_to_cpu(ph->mseq));
+                       pr_err_ratelimited("handle_cap_import: "
+                                       "mismatched seq/mseq: ino (%llx.%llx) "
+                                       "mds%d seq %d mseq %d importer mds%d "
+                                       "has peer seq %d mseq %d\n",
+                                       ceph_vinop(inode), peer, ocap->seq,
+                                       ocap->mseq, mds, le32_to_cpu(ph->seq),
+                                       le32_to_cpu(ph->mseq));
                }
                __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
        }
@@ -3939,11 +3994,20 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
 
        cap = __get_cap_for_mds(ci, mds);
        if (cap && __cap_is_valid(cap)) {
-               if (force ||
-                   ((cap->issued & drop) &&
-                    (cap->issued & unless) == 0)) {
-                       if ((cap->issued & drop) &&
-                           (cap->issued & unless) == 0) {
+               unless &= cap->issued;
+               if (unless) {
+                       if (unless & CEPH_CAP_AUTH_EXCL)
+                               drop &= ~CEPH_CAP_AUTH_SHARED;
+                       if (unless & CEPH_CAP_LINK_EXCL)
+                               drop &= ~CEPH_CAP_LINK_SHARED;
+                       if (unless & CEPH_CAP_XATTR_EXCL)
+                               drop &= ~CEPH_CAP_XATTR_SHARED;
+                       if (unless & CEPH_CAP_FILE_EXCL)
+                               drop &= ~CEPH_CAP_FILE_SHARED;
+               }
+
+               if (force || (cap->issued & drop)) {
+                       if (cap->issued & drop) {
                                int wanted = __ceph_caps_wanted(ci);
                                if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
                                        wanted |= cap->mds_wanted;
@@ -3975,7 +4039,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
                        *p += sizeof(*rel);
                        ret = 1;
                } else {
-                       dout("encode_inode_release %p cap %p %s\n",
+                       dout("encode_inode_release %p cap %p %s (noop)\n",
                             inode, cap, ceph_cap_string(cap->issued));
                }
        }
index 8a5266699b67657b0ea9053a03fe0ed255abb293..0c4346806e17a6f9a70e35fdafab13440c426d65 100644 (file)
@@ -173,7 +173,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
  * the MDS if/when the directory is modified).
  */
 static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
-                           u32 shared_gen)
+                           int shared_gen)
 {
        struct ceph_file_info *fi = file->private_data;
        struct dentry *parent = file->f_path.dentry;
@@ -184,7 +184,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        u64 idx = 0;
        int err = 0;
 
-       dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
+       dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos);
 
        /* search start position */
        if (ctx->pos > 2) {
@@ -231,11 +231,17 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                        goto out;
                }
 
-               di = ceph_dentry(dentry);
                spin_lock(&dentry->d_lock);
-               if (di->lease_shared_gen == shared_gen &&
-                   d_really_is_positive(dentry) &&
-                   fpos_cmp(ctx->pos, di->offset) <= 0) {
+               di = ceph_dentry(dentry);
+               if (d_unhashed(dentry) ||
+                   d_really_is_negative(dentry) ||
+                   di->lease_shared_gen != shared_gen) {
+                       spin_unlock(&dentry->d_lock);
+                       dput(dentry);
+                       err = -EAGAIN;
+                       goto out;
+               }
+               if (fpos_cmp(ctx->pos, di->offset) <= 0) {
                        emit_dentry = true;
                }
                spin_unlock(&dentry->d_lock);
@@ -333,7 +339,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
            ceph_snap(inode) != CEPH_SNAPDIR &&
            __ceph_dir_is_complete_ordered(ci) &&
            __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
-               u32 shared_gen = ci->i_shared_gen;
+               int shared_gen = atomic_read(&ci->i_shared_gen);
                spin_unlock(&ci->i_ceph_lock);
                err = __dcache_readdir(file, ctx, shared_gen);
                if (err != -EAGAIN)
@@ -381,6 +387,7 @@ more:
                if (op == CEPH_MDS_OP_READDIR) {
                        req->r_direct_hash = ceph_frag_value(frag);
                        __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
+                       req->r_inode_drop = CEPH_CAP_FILE_EXCL;
                }
                if (fi->last_name) {
                        req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
@@ -750,7 +757,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
                        spin_unlock(&ci->i_ceph_lock);
                        dout(" dir %p complete, -ENOENT\n", dir);
                        d_add(dentry, NULL);
-                       di->lease_shared_gen = ci->i_shared_gen;
+                       di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
                        return NULL;
                }
                spin_unlock(&ci->i_ceph_lock);
@@ -835,7 +842,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_args.mknod.mode = cpu_to_le32(mode);
        req->r_args.mknod.rdev = cpu_to_le32(rdev);
-       req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+       req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        if (acls.pagelist) {
                req->r_pagelist = acls.pagelist;
@@ -887,7 +894,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
-       req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+       req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
@@ -936,7 +943,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
        req->r_parent = dir;
        set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_args.mkdir.mode = cpu_to_le32(mode);
-       req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+       req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        if (acls.pagelist) {
                req->r_pagelist = acls.pagelist;
@@ -983,7 +990,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        /* release LINK_SHARED on source inode (mds will lock it) */
-       req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
+       req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (err) {
                d_drop(dentry);
@@ -1096,7 +1103,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        /* release LINK_RDCACHE on source inode (mds will lock it) */
-       req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
+       req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
        if (d_really_is_positive(new_dentry))
                req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry));
        err = ceph_mdsc_do_request(mdsc, old_dir, req);
@@ -1106,16 +1113,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
                 * do_request, above).  If there is no trace, we need
                 * to do it here.
                 */
-
-               /* d_move screws up sibling dentries' offsets */
-               ceph_dir_clear_complete(old_dir);
-               ceph_dir_clear_complete(new_dir);
-
                d_move(old_dentry, new_dentry);
-
-               /* ensure target dentry is invalidated, despite
-                  rehashing bug in vfs_rename_dir */
-               ceph_invalidate_dentry_lease(new_dentry);
        }
        ceph_mdsc_put_request(req);
        return err;
@@ -1199,12 +1197,12 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
        int valid = 0;
 
        spin_lock(&ci->i_ceph_lock);
-       if (ci->i_shared_gen == di->lease_shared_gen)
+       if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen)
                valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
        spin_unlock(&ci->i_ceph_lock);
        dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
-            dir, (unsigned)ci->i_shared_gen, dentry,
-            (unsigned)di->lease_shared_gen, valid);
+            dir, (unsigned)atomic_read(&ci->i_shared_gen),
+            dentry, (unsigned)di->lease_shared_gen, valid);
        return valid;
 }
 
@@ -1332,24 +1330,37 @@ static void ceph_d_release(struct dentry *dentry)
  */
 static void ceph_d_prune(struct dentry *dentry)
 {
-       dout("ceph_d_prune %p\n", dentry);
+       struct ceph_inode_info *dir_ci;
+       struct ceph_dentry_info *di;
+
+       dout("ceph_d_prune %pd %p\n", dentry, dentry);
 
        /* do we have a valid parent? */
        if (IS_ROOT(dentry))
                return;
 
-       /* if we are not hashed, we don't affect dir's completeness */
-       if (d_unhashed(dentry))
+       /* we hold d_lock, so d_parent is stable */
+       dir_ci = ceph_inode(d_inode(dentry->d_parent));
+       if (dir_ci->i_vino.snap == CEPH_SNAPDIR)
                return;
 
-       if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
+       /* who calls d_delete() should also disable dcache readdir */
+       if (d_really_is_negative(dentry))
                return;
 
-       /*
-        * we hold d_lock, so d_parent is stable, and d_fsdata is never
-        * cleared until d_release
-        */
-       ceph_dir_clear_complete(d_inode(dentry->d_parent));
+       /* d_fsdata does not get cleared until d_release */
+       if (!d_unhashed(dentry)) {
+               __ceph_dir_clear_complete(dir_ci);
+               return;
+       }
+
+       /* Disable dcache readdir just in case that someone called d_drop()
+        * or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED
+        * properly (dcache readdir is still enabled) */
+       di = ceph_dentry(dentry);
+       if (di->offset > 0 &&
+           di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen))
+               __ceph_dir_clear_ordered(dir_ci);
 }
 
 /*
index 5c17125f45c786ab65aeeaa4782faedd895525a9..6639926eed4e8461d0c50b6a686df236e1c96338 100644 (file)
@@ -181,6 +181,10 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
                        return -ENOMEM;
                }
                cf->fmode = fmode;
+
+               spin_lock_init(&cf->rw_contexts_lock);
+               INIT_LIST_HEAD(&cf->rw_contexts);
+
                cf->next_offset = 2;
                cf->readdir_cache_idx = -1;
                file->private_data = cf;
@@ -396,7 +400,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        if (flags & O_CREAT) {
-               req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+               req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
                req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
                if (acls.pagelist) {
                        req->r_pagelist = acls.pagelist;
@@ -464,6 +468,7 @@ int ceph_release(struct inode *inode, struct file *file)
                ceph_mdsc_put_request(cf->last_readdir);
        kfree(cf->last_name);
        kfree(cf->dir_info);
+       WARN_ON(!list_empty(&cf->rw_contexts));
        kmem_cache_free(ceph_file_cachep, cf);
 
        /* wake up anyone waiting for caps on this inode */
@@ -1199,12 +1204,13 @@ again:
                        retry_op = READ_INLINE;
                }
        } else {
+               CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
                dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
                     ceph_cap_string(got));
-               current->journal_info = filp;
+               ceph_add_rw_context(fi, &rw_ctx);
                ret = generic_file_read_iter(iocb, to);
-               current->journal_info = NULL;
+               ceph_del_rw_context(fi, &rw_ctx);
        }
        dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
             inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
index ab81652198c48e1e90a5545cb06089a6fa30da1a..c6ec5aa461002796325e84796199dd0ed2dd0a67 100644 (file)
@@ -494,7 +494,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_wrbuffer_ref = 0;
        ci->i_wrbuffer_ref_head = 0;
        atomic_set(&ci->i_filelock_ref, 0);
-       ci->i_shared_gen = 0;
+       atomic_set(&ci->i_shared_gen, 0);
        ci->i_rdcache_gen = 0;
        ci->i_rdcache_revoking = 0;
 
@@ -1041,7 +1041,7 @@ static void update_dentry_lease(struct dentry *dentry,
        if (ceph_snap(dir) != CEPH_NOSNAP)
                goto out_unlock;
 
-       di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
+       di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
 
        if (duration == 0)
                goto out_unlock;
@@ -1080,6 +1080,27 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
 
        BUG_ON(d_inode(dn));
 
+       if (S_ISDIR(in->i_mode)) {
+               /* If inode is directory, d_splice_alias() below will remove
+                * 'realdn' from its origin parent. We need to ensure that
+                * origin parent's readdir cache will not reference 'realdn'
+                */
+               realdn = d_find_any_alias(in);
+               if (realdn) {
+                       struct ceph_dentry_info *di = ceph_dentry(realdn);
+                       spin_lock(&realdn->d_lock);
+
+                       realdn->d_op->d_prune(realdn);
+
+                       di->time = jiffies;
+                       di->lease_shared_gen = 0;
+                       di->offset = 0;
+
+                       spin_unlock(&realdn->d_lock);
+                       dput(realdn);
+               }
+       }
+
        /* dn must be unhashed */
        if (!d_unhashed(dn))
                d_drop(dn);
@@ -1295,8 +1316,8 @@ retry_lookup:
                if (!rinfo->head->is_target) {
                        dout("fill_trace null dentry\n");
                        if (d_really_is_positive(dn)) {
-                               ceph_dir_clear_ordered(dir);
                                dout("d_delete %p\n", dn);
+                               ceph_dir_clear_ordered(dir);
                                d_delete(dn);
                        } else if (have_lease) {
                                if (d_unhashed(dn))
@@ -1323,7 +1344,6 @@ retry_lookup:
                        dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
                             dn, d_inode(dn), ceph_vinop(d_inode(dn)),
                             ceph_vinop(in));
-                       ceph_dir_clear_ordered(dir);
                        d_invalidate(dn);
                        have_lease = false;
                }
@@ -1573,9 +1593,19 @@ retry_lookup:
                } else if (d_really_is_positive(dn) &&
                           (ceph_ino(d_inode(dn)) != tvino.ino ||
                            ceph_snap(d_inode(dn)) != tvino.snap)) {
+                       struct ceph_dentry_info *di = ceph_dentry(dn);
                        dout(" dn %p points to wrong inode %p\n",
                             dn, d_inode(dn));
-                       __ceph_dir_clear_ordered(ci);
+
+                       spin_lock(&dn->d_lock);
+                       if (di->offset > 0 &&
+                           di->lease_shared_gen ==
+                           atomic_read(&ci->i_shared_gen)) {
+                               __ceph_dir_clear_ordered(ci);
+                               di->offset = 0;
+                       }
+                       spin_unlock(&dn->d_lock);
+
                        d_delete(dn);
                        dput(dn);
                        goto retry_lookup;
@@ -1600,9 +1630,7 @@ retry_lookup:
                                 &req->r_caps_reservation);
                if (ret < 0) {
                        pr_err("fill_inode badness on %p\n", in);
-                       if (d_really_is_positive(dn))
-                               __ceph_dir_clear_ordered(ci);
-                       else
+                       if (d_really_is_negative(dn))
                                iput(in);
                        d_drop(dn);
                        err = ret;
@@ -2000,8 +2028,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
                        ceph_encode_timespec(&req->r_args.setattr.atime,
                                             &attr->ia_atime);
                        mask |= CEPH_SETATTR_ATIME;
-                       release |= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD |
-                               CEPH_CAP_FILE_WR;
+                       release |= CEPH_CAP_FILE_SHARED |
+                                  CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
                }
        }
        if (ia_valid & ATTR_MTIME) {
@@ -2022,8 +2050,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
                        ceph_encode_timespec(&req->r_args.setattr.mtime,
                                             &attr->ia_mtime);
                        mask |= CEPH_SETATTR_MTIME;
-                       release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
-                               CEPH_CAP_FILE_WR;
+                       release |= CEPH_CAP_FILE_SHARED |
+                                  CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
                }
        }
        if (ia_valid & ATTR_SIZE) {
@@ -2041,8 +2069,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
                        req->r_args.setattr.old_size =
                                cpu_to_le64(inode->i_size);
                        mask |= CEPH_SETATTR_SIZE;
-                       release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
-                               CEPH_CAP_FILE_WR;
+                       release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+                                  CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
                }
        }
 
index 1b468250e94752e6eedf63283cba95a236fb0380..2e8f90f96540291ab412703133d2a4278edbac71 100644 (file)
@@ -604,10 +604,20 @@ static void __register_request(struct ceph_mds_client *mdsc,
                               struct ceph_mds_request *req,
                               struct inode *dir)
 {
+       int ret = 0;
+
        req->r_tid = ++mdsc->last_tid;
-       if (req->r_num_caps)
-               ceph_reserve_caps(mdsc, &req->r_caps_reservation,
-                                 req->r_num_caps);
+       if (req->r_num_caps) {
+               ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+                                       req->r_num_caps);
+               if (ret < 0) {
+                       pr_err("__register_request %p "
+                              "failed to reserve caps: %d\n", req, ret);
+                       /* set req->r_err to fail early from __do_request */
+                       req->r_err = ret;
+                       return;
+               }
+       }
        dout("__register_request %p tid %lld\n", req, req->r_tid);
        ceph_mdsc_get_request(req);
        insert_request(&mdsc->request_tree, req);
@@ -1545,9 +1555,9 @@ out:
 /*
  * Trim session cap count down to some max number.
  */
-static int trim_caps(struct ceph_mds_client *mdsc,
-                    struct ceph_mds_session *session,
-                    int max_caps)
+int ceph_trim_caps(struct ceph_mds_client *mdsc,
+                  struct ceph_mds_session *session,
+                  int max_caps)
 {
        int trim_caps = session->s_nr_caps - max_caps;
 
@@ -2438,11 +2448,14 @@ out:
  */
 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
 {
-       struct inode *inode = req->r_parent;
+       struct inode *dir = req->r_parent;
+       struct inode *old_dir = req->r_old_dentry_dir;
 
-       dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
+       dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
 
-       ceph_dir_clear_complete(inode);
+       ceph_dir_clear_complete(dir);
+       if (old_dir)
+               ceph_dir_clear_complete(old_dir);
        if (req->r_dentry)
                ceph_invalidate_dentry_lease(req->r_dentry);
        if (req->r_old_dentry)
@@ -2773,7 +2786,7 @@ static void handle_session(struct ceph_mds_session *session,
                break;
 
        case CEPH_SESSION_RECALL_STATE:
-               trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
+               ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
                break;
 
        case CEPH_SESSION_FLUSHMSG:
index 837ac4b087a0babb0f202ae66e79688d8f57822c..71e3b783ee6fae5a64ae0a4a3eb1471c9fd9ed44 100644 (file)
@@ -444,4 +444,7 @@ ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
 extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
                                          struct ceph_mds_session *session);
 
+extern int ceph_trim_caps(struct ceph_mds_client *mdsc,
+                         struct ceph_mds_session *session,
+                         int max_caps);
 #endif
index 8a2ca41e4b97ca50cad0016102684a50a3ebed59..07cf95e6413d775d01c383f3d3d7e0e8e4879910 100644 (file)
@@ -922,13 +922,17 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        /*
                         * Move the inode to the new realm
                         */
-                       spin_lock(&realm->inodes_with_caps_lock);
+                       oldrealm = ci->i_snap_realm;
+                       spin_lock(&oldrealm->inodes_with_caps_lock);
                        list_del_init(&ci->i_snap_realm_item);
+                       spin_unlock(&oldrealm->inodes_with_caps_lock);
+
+                       spin_lock(&realm->inodes_with_caps_lock);
                        list_add(&ci->i_snap_realm_item,
                                 &realm->inodes_with_caps);
-                       oldrealm = ci->i_snap_realm;
                        ci->i_snap_realm = realm;
                        spin_unlock(&realm->inodes_with_caps_lock);
+
                        spin_unlock(&ci->i_ceph_lock);
 
                        ceph_get_snap_realm(mdsc, realm);
index 2beeec07fa76ce199e7d461831b9b93a705419ee..21b2e5b004eb72ba10057df37c6907766aed5993 100644 (file)
@@ -256,7 +256,8 @@ struct ceph_inode_xattr {
  */
 struct ceph_dentry_info {
        struct ceph_mds_session *lease_session;
-       u32 lease_gen, lease_shared_gen;
+       int lease_shared_gen;
+       u32 lease_gen;
        u32 lease_seq;
        unsigned long lease_renew_after, lease_renew_from;
        struct list_head lru;
@@ -353,7 +354,7 @@ struct ceph_inode_info {
        int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref;
        int i_wrbuffer_ref, i_wrbuffer_ref_head;
        atomic_t i_filelock_ref;
-       u32 i_shared_gen;       /* increment each time we get FILE_SHARED */
+       atomic_t i_shared_gen;       /* increment each time we get FILE_SHARED */
        u32 i_rdcache_gen;      /* incremented each time we get FILE_CACHE. */
        u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
 
@@ -648,7 +649,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
 extern void ceph_caps_init(struct ceph_mds_client *mdsc);
 extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
 extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
-extern void ceph_reserve_caps(struct ceph_mds_client *mdsc,
+extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
                             struct ceph_cap_reservation *ctx, int need);
 extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
                               struct ceph_cap_reservation *ctx);
@@ -668,6 +669,9 @@ struct ceph_file_info {
        short fmode;     /* initialized on open */
        short flags;     /* CEPH_F_* */
 
+       spinlock_t rw_contexts_lock;
+       struct list_head rw_contexts;
+
        /* readdir: position within the dir */
        u32 frag;
        struct ceph_mds_request *last_readdir;
@@ -684,6 +688,49 @@ struct ceph_file_info {
        int dir_info_len;
 };
 
+struct ceph_rw_context {
+       struct list_head list;
+       struct task_struct *thread;
+       int caps;
+};
+
+#define CEPH_DEFINE_RW_CONTEXT(_name, _caps)   \
+       struct ceph_rw_context _name = {        \
+               .thread = current,              \
+               .caps = _caps,                  \
+       }
+
+static inline void ceph_add_rw_context(struct ceph_file_info *cf,
+                                      struct ceph_rw_context *ctx)
+{
+       spin_lock(&cf->rw_contexts_lock);
+       list_add(&ctx->list, &cf->rw_contexts);
+       spin_unlock(&cf->rw_contexts_lock);
+}
+
+static inline void ceph_del_rw_context(struct ceph_file_info *cf,
+                                      struct ceph_rw_context *ctx)
+{
+       spin_lock(&cf->rw_contexts_lock);
+       list_del(&ctx->list);
+       spin_unlock(&cf->rw_contexts_lock);
+}
+
+static inline struct ceph_rw_context*
+ceph_find_rw_context(struct ceph_file_info *cf)
+{
+       struct ceph_rw_context *ctx, *found = NULL;
+       spin_lock(&cf->rw_contexts_lock);
+       list_for_each_entry(ctx, &cf->rw_contexts, list) {
+               if (ctx->thread == current) {
+                       found = ctx;
+                       break;
+               }
+       }
+       spin_unlock(&cf->rw_contexts_lock);
+       return found;
+}
+
 struct ceph_readdir_cache_control {
        struct page  *page;
        struct dentry **dentries;
index 5c036d2f401e25b42ece6d7cc6c4fc30c00dea43..1e492ef2a33d945699a327831640db04c1f158fa 100644 (file)
@@ -421,6 +421,10 @@ ceph_parse_options(char *options, const char *dev_name,
                        opt->name = kstrndup(argstr[0].from,
                                              argstr[0].to-argstr[0].from,
                                              GFP_KERNEL);
+                       if (!opt->name) {
+                               err = -ENOMEM;
+                               goto out;
+                       }
                        break;
                case Opt_secret:
                        opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);