ocfs2: Add new cluster lock type
authorMark Fasheh <mark.fasheh@oracle.com>
Fri, 8 Sep 2006 21:14:34 +0000 (14:14 -0700)
committerMark Fasheh <mark.fasheh@oracle.com>
Sun, 24 Sep 2006 20:50:42 +0000 (13:50 -0700)
Replace the dentry vote mechanism with a cluster lock which covers a set
of dentries. This allows us to force d_delete() only on nodes which actually
care about an unlink.

Every node that does a ->lookup() gets a read only lock on the dentry, until
an unlink during which the unlinking node, will request an exclusive lock,
forcing the other nodes who care about that dentry to d_delete() it. The
effect is that we retain a very lightweight ->d_revalidate(), and at the
same time get to make large improvements to the average case performance of
the ocfs2 unlink and rename operations.

This patch adds the cluster lock type which OCFS2 can attach to
dentries.  A small number of fs/ocfs2/dcache.c functions are stubbed
out so that this change can compile.

Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
fs/ocfs2/dcache.c
fs/ocfs2/dcache.h
fs/ocfs2/dlmglue.c
fs/ocfs2/dlmglue.h
fs/ocfs2/ocfs2_lockid.h

index 1a01380e3878ed74af3bd393e942e53ea65bfd1b..aea45771894679fb5a936c912be05c065304308a 100644 (file)
@@ -87,6 +87,8 @@ bail:
        return ret;
 }
 
+DEFINE_SPINLOCK(dentry_attach_lock);
+
 struct dentry_operations ocfs2_dentry_ops = {
        .d_revalidate           = ocfs2_dentry_revalidate,
 };
index 90072771114b6af8fb74025d7fde04ab2725a559..f1423c2134eec46c6addbf187acc34c43f956dff 100644 (file)
 
 extern struct dentry_operations ocfs2_dentry_ops;
 
+struct ocfs2_dentry_lock {
+       unsigned int            dl_count;
+       u64                     dl_parent_blkno;
+
+       /*
+        * The ocfs2_dentry_lock keeps an inode reference until
+        * dl_lockres has been destroyed. This is usually done in
+        * ->d_iput() anyway, so there should be minimal impact.
+        */
+       struct inode            *dl_inode;
+       struct ocfs2_lock_res   dl_lockres;
+};
+
+static inline void ocfs2_dentry_lock_put(struct ocfs2_super *osb,
+                                        struct ocfs2_dentry_lock *dl)
+{
+}
+
+static inline struct dentry *ocfs2_find_local_alias(struct inode *inode,
+                                                   u64 parent_blkno,
+                                                   int skip_unhashed)
+{
+       return NULL;
+}
+
+extern spinlock_t dentry_attach_lock;
+
 #endif /* OCFS2_DCACHE_H */
index 20c6ca8ac7fd15261f631bd793b1d1fddbd604fd..764d15defd8848b32c7c852534a5c7b73d50da15 100644 (file)
@@ -46,6 +46,7 @@
 #include "ocfs2.h"
 
 #include "alloc.h"
+#include "dcache.h"
 #include "dlmglue.h"
 #include "extent_map.h"
 #include "heartbeat.h"
@@ -69,6 +70,9 @@ struct ocfs2_mask_waiter {
 static void ocfs2_inode_ast_func(void *opaque);
 static void ocfs2_inode_bast_func(void *opaque,
                                  int level);
+static void ocfs2_dentry_ast_func(void *opaque);
+static void ocfs2_dentry_bast_func(void *opaque,
+                                 int level);
 static void ocfs2_super_ast_func(void *opaque);
 static void ocfs2_super_bast_func(void *opaque,
                                  int level);
@@ -76,32 +80,57 @@ static void ocfs2_rename_ast_func(void *opaque);
 static void ocfs2_rename_bast_func(void *opaque,
                                   int level);
 
+/*
+ * Return value from ocfs2_convert_worker_t functions.
+ *
+ * These control the precise actions of ocfs2_generic_unblock_lock()
+ * and ocfs2_process_blocked_lock()
+ *
+ */
+enum ocfs2_unblock_action {
+       UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
+       UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
+                                     * ->post_unlock callback */
+       UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
+                                     * ->post_unlock() callback. */
+};
+
+struct ocfs2_unblock_ctl {
+       int requeue;
+       enum ocfs2_unblock_action unblock_action;
+};
+
 /* so far, all locks have gotten along with the same unlock ast */
 static void ocfs2_unlock_ast_func(void *opaque,
                                  enum dlm_status status);
-static int ocfs2_do_unblock_meta(struct inode *inode,
-                                int *requeue);
 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
-                             int *requeue);
+                             struct ocfs2_unblock_ctl *ctl);
 static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
-                             int *requeue);
+                             struct ocfs2_unblock_ctl *ctl);
 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
-                             int *requeue);
+                                   struct ocfs2_unblock_ctl *ctl);
+static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
+                                    struct ocfs2_unblock_ctl *ctl);
 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
-                                 int *requeue);
-typedef void (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
-static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
-                                     struct ocfs2_lock_res *lockres,
-                                     int *requeue,
-                                     ocfs2_convert_worker_t *worker);
+                                 struct ocfs2_unblock_ctl *ctl);
+
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+                                    struct ocfs2_lock_res *lockres);
 
 struct ocfs2_lock_res_ops {
        void (*ast)(void *);
        void (*bast)(void *, int);
        void (*unlock_ast)(void *, enum dlm_status);
-       int  (*unblock)(struct ocfs2_lock_res *, int *);
+       int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
+       void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
 };
 
+typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
+static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
+                                     struct ocfs2_lock_res *lockres,
+                                     struct ocfs2_unblock_ctl *ctl,
+                                     ocfs2_convert_worker_t *worker);
+
 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
        .ast            = ocfs2_inode_ast_func,
        .bast           = ocfs2_inode_bast_func,
@@ -116,9 +145,6 @@ static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
        .unblock        = ocfs2_unblock_meta,
 };
 
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
-                                     int blocking);
-
 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
        .ast            = ocfs2_inode_ast_func,
        .bast           = ocfs2_inode_bast_func,
@@ -140,6 +166,14 @@ static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
        .unblock        = ocfs2_unblock_osb_lock,
 };
 
+static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
+       .ast            = ocfs2_dentry_ast_func,
+       .bast           = ocfs2_dentry_bast_func,
+       .unlock_ast     = ocfs2_unlock_ast_func,
+       .unblock        = ocfs2_unblock_dentry_lock,
+       .post_unlock    = ocfs2_dentry_post_unlock,
+};
+
 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
 {
        return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -172,6 +206,13 @@ static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
        return (struct inode *) lockres->l_priv;
 }
 
+static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
+{
+       BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
+
+       return (struct ocfs2_dentry_lock *)lockres->l_priv;
+}
+
 static int ocfs2_lock_create(struct ocfs2_super *osb,
                             struct ocfs2_lock_res *lockres,
                             int level,
@@ -204,22 +245,6 @@ static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
                                                  struct ocfs2_lock_res *lockres,
                                                  int new_level);
 
-static char *ocfs2_lock_type_strings[] = {
-       [OCFS2_LOCK_TYPE_META] = "Meta",
-       [OCFS2_LOCK_TYPE_DATA] = "Data",
-       [OCFS2_LOCK_TYPE_SUPER] = "Super",
-       [OCFS2_LOCK_TYPE_RENAME] = "Rename",
-       /* Need to differntiate from [R]ename.. serializing writes is the
-        * important job it does, anyway. */
-       [OCFS2_LOCK_TYPE_RW] = "Write/Read",
-};
-
-static char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
-{
-       mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
-       return ocfs2_lock_type_strings[type];
-}
-
 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
                                  u64 blkno,
                                  u32 generation,
@@ -265,13 +290,9 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
                                       struct ocfs2_lock_res *res,
                                       enum ocfs2_lock_type type,
-                                      u64 blkno,
-                                      u32 generation,
                                       struct ocfs2_lock_res_ops *ops,
                                       void *priv)
 {
-       ocfs2_build_lock_name(type, blkno, generation, res->l_name);
-
        res->l_type          = type;
        res->l_ops           = ops;
        res->l_priv          = priv;
@@ -319,9 +340,59 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
                        break;
        };
 
-       ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type,
-                                  OCFS2_I(inode)->ip_blkno,
-                                  inode->i_generation, ops, inode);
+       ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
+                             inode->i_generation, res->l_name);
+       ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
+}
+
+static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
+{
+       __be64 inode_blkno_be;
+
+       memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
+              sizeof(__be64));
+
+       return be64_to_cpu(inode_blkno_be);
+}
+
+void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
+                               u64 parent, struct inode *inode)
+{
+       int len;
+       u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
+       __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
+       struct ocfs2_lock_res *lockres = &dl->dl_lockres;
+
+       ocfs2_lock_res_init_once(lockres);
+
+       /*
+        * Unfortunately, the standard lock naming scheme won't work
+        * here because we have two 16 byte values to use. Instead,
+        * we'll stuff the inode number as a binary value. We still
+        * want error prints to show something without garbling the
+        * display, so drop a null byte in there before the inode
+        * number. A future version of OCFS2 will likely use all
+        * binary lock names. The stringified names have been a
+        * tremendous aid in debugging, but now that the debugfs
+        * interface exists, we can mangle things there if need be.
+        *
+        * NOTE: We also drop the standard "pad" value (the total lock
+        * name size stays the same though - the last part is all
+        * zeros due to the memset in ocfs2_lock_res_init_once()
+        */
+       len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
+                      "%c%016llx",
+                      ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
+                      (long long)parent);
+
+       BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
+
+       memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
+              sizeof(__be64));
+
+       ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
+                                  OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
+                                  dl);
 }
 
 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
@@ -330,8 +401,9 @@ static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
        /* Superblock lockres doesn't come from a slab so we call init
         * once on it manually.  */
        ocfs2_lock_res_init_once(res);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
+                             0, res->l_name);
        ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
-                                  OCFS2_SUPER_BLOCK_BLKNO, 0,
                                   &ocfs2_super_lops, osb);
 }
 
@@ -341,7 +413,8 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
        /* Rename lockres doesn't come from a slab so we call init
         * once on it manually.  */
        ocfs2_lock_res_init_once(res);
-       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 0, 0,
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
+       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
                                   &ocfs2_rename_lops, osb);
 }
 
@@ -627,9 +700,10 @@ static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
                ocfs2_schedule_blocked_lock(osb, lockres);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
+       wake_up(&lockres->l_event);
+
        ocfs2_kick_vote_thread(osb);
 
-       wake_up(&lockres->l_event);
        mlog_exit_void();
 }
 
@@ -690,9 +764,9 @@ static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
        /* set it to something invalid so if we get called again we
         * can catch it. */
        lockres->l_action = OCFS2_AST_INVALID;
-       spin_unlock_irqrestore(&lockres->l_lock, flags);
 
        wake_up(&lockres->l_event);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
 
 static void ocfs2_super_ast_func(void *opaque)
@@ -757,6 +831,27 @@ static void ocfs2_rename_bast_func(void *opaque,
        mlog_exit_void();
 }
 
+static void ocfs2_dentry_ast_func(void *opaque)
+{
+       struct ocfs2_lock_res *lockres = opaque;
+
+       BUG_ON(!lockres);
+
+       ocfs2_generic_ast_func(lockres, 1);
+}
+
+static void ocfs2_dentry_bast_func(void *opaque, int level)
+{
+       struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_dentry_lock *dl = lockres->l_priv;
+       struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
+
+       mlog(0, "Dentry bast: level: %d, name: %s\n", level,
+            lockres->l_name);
+
+       ocfs2_generic_bast_func(osb, lockres, level);
+}
+
 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
                                                int convert)
 {
@@ -1076,10 +1171,11 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
        mlog_exit_void();
 }
 
-static int ocfs2_create_new_inode_lock(struct inode *inode,
-                                      struct ocfs2_lock_res *lockres)
+int ocfs2_create_new_lock(struct ocfs2_super *osb,
+                         struct ocfs2_lock_res *lockres,
+                         int ex)
 {
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+       int level =  ex ? LKM_EXMODE : LKM_PRMODE;
        unsigned long flags;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
@@ -1087,7 +1183,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode,
        lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       return ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
+       return ocfs2_lock_create(osb, lockres, level, LKM_LOCAL);
 }
 
 /* Grants us an EX lock on the data and metadata resources, skipping
@@ -1099,6 +1195,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode,
 int ocfs2_create_new_inode_locks(struct inode *inode)
 {
        int ret;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        BUG_ON(!inode);
        BUG_ON(!ocfs2_inode_is_new(inode));
@@ -1115,22 +1212,19 @@ int ocfs2_create_new_inode_locks(struct inode *inode)
         * on a resource which has an invalid one -- we'll set it
         * valid when we release the EX. */
 
-       ret = ocfs2_create_new_inode_lock(inode,
-                                         &OCFS2_I(inode)->ip_rw_lockres);
+       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1);
        if (ret) {
                mlog_errno(ret);
                goto bail;
        }
 
-       ret = ocfs2_create_new_inode_lock(inode,
-                                         &OCFS2_I(inode)->ip_meta_lockres);
+       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1);
        if (ret) {
                mlog_errno(ret);
                goto bail;
        }
 
-       ret = ocfs2_create_new_inode_lock(inode,
-                                         &OCFS2_I(inode)->ip_data_lockres);
+       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1);
        if (ret) {
                mlog_errno(ret);
                goto bail;
@@ -1809,6 +1903,34 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
        ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
 }
 
+int ocfs2_dentry_lock(struct dentry *dentry, int ex)
+{
+       int ret;
+       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+       struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+       BUG_ON(!dl);
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
+       if (ret < 0)
+               mlog_errno(ret);
+
+       return ret;
+}
+
+void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
+{
+       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+       struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+       ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
+}
+
 /* Reference counting of the dlm debug structure. We want this because
  * open references on the debug inodes can live on after a mount, so
  * we can't rely on the ocfs2_super to always exist. */
@@ -1939,9 +2061,16 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
        if (!lockres)
                return -EINVAL;
 
-       seq_printf(m, "0x%x\t"
-                  "%.*s\t"
-                  "%d\t"
+       seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
+
+       if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
+               seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
+                          lockres->l_name,
+                          (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
+       else
+               seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
+
+       seq_printf(m, "%d\t"
                   "0x%lx\t"
                   "0x%x\t"
                   "0x%x\t"
@@ -1949,8 +2078,6 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
                   "%u\t"
                   "%d\t"
                   "%d\t",
-                  OCFS2_DLM_DEBUG_STR_VERSION,
-                  OCFS2_LOCK_ID_MAX_LEN, lockres->l_name,
                   lockres->l_level,
                   lockres->l_flags,
                   lockres->l_action,
@@ -2311,25 +2438,21 @@ void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
 
-static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
+void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
+                              struct ocfs2_lock_res *lockres)
 {
-       int status;
-
-       mlog_entry_void();
-
-       ocfs2_mark_lockres_freeing(&osb->osb_super_lockres);
-
-       status = ocfs2_drop_lock(osb, &osb->osb_super_lockres, NULL);
-       if (status < 0)
-               mlog_errno(status);
-
-       ocfs2_mark_lockres_freeing(&osb->osb_rename_lockres);
+       int ret;
 
-       status = ocfs2_drop_lock(osb, &osb->osb_rename_lockres, NULL);
-       if (status < 0)
-               mlog_errno(status);
+       ocfs2_mark_lockres_freeing(lockres);
+       ret = ocfs2_drop_lock(osb, lockres, NULL);
+       if (ret)
+               mlog_errno(ret);
+}
 
-       mlog_exit(status);
+static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
+{
+       ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
+       ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
 }
 
 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
@@ -2599,7 +2722,7 @@ leave:
 
 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
                                      struct ocfs2_lock_res *lockres,
-                                     int *requeue,
+                                     struct ocfs2_unblock_ctl *ctl,
                                      ocfs2_convert_worker_t *worker)
 {
        unsigned long flags;
@@ -2615,7 +2738,7 @@ static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
 
 recheck:
        if (lockres->l_flags & OCFS2_LOCK_BUSY) {
-               *requeue = 1;
+               ctl->requeue = 1;
                ret = ocfs2_prepare_cancel_convert(osb, lockres);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
                if (ret) {
@@ -2631,7 +2754,7 @@ recheck:
        if ((lockres->l_blocking == LKM_EXMODE)
            && (lockres->l_ex_holders || lockres->l_ro_holders)) {
                spin_unlock_irqrestore(&lockres->l_lock, flags);
-               *requeue = 1;
+               ctl->requeue = 1;
                ret = 0;
                goto leave;
        }
@@ -2641,7 +2764,7 @@ recheck:
        if (lockres->l_blocking == LKM_PRMODE &&
            lockres->l_ex_holders) {
                spin_unlock_irqrestore(&lockres->l_lock, flags);
-               *requeue = 1;
+               ctl->requeue = 1;
                ret = 0;
                goto leave;
        }
@@ -2659,7 +2782,10 @@ recheck:
        blocking = lockres->l_blocking;
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       worker(lockres, blocking);
+       ctl->unblock_action = worker(lockres, blocking);
+
+       if (ctl->unblock_action == UNBLOCK_STOP_POST)
+               goto leave;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
        if (blocking != lockres->l_blocking) {
@@ -2669,7 +2795,7 @@ recheck:
        }
 
 downconvert:
-       *requeue = 0;
+       ctl->requeue = 0;
        new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
 
        ocfs2_prepare_downconvert(lockres, new_level);
@@ -2680,14 +2806,12 @@ leave:
        return ret;
 }
 
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
-                                     int blocking)
+static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
+                                    int blocking)
 {
        struct inode *inode;
        struct address_space *mapping;
 
-       mlog_entry_void();
-
                inode = ocfs2_lock_res_inode(lockres);
        mapping = inode->i_mapping;
 
@@ -2708,11 +2832,11 @@ static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
                filemap_fdatawait(mapping);
        }
 
-       mlog_exit_void();
+       return UNBLOCK_CONTINUE;
 }
 
 int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
-                      int *requeue)
+                      struct ocfs2_unblock_ctl *ctl)
 {
        int status;
        struct inode *inode;
@@ -2726,22 +2850,20 @@ int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
        mlog(0, "unblock inode %llu\n",
             (unsigned long long)OCFS2_I(inode)->ip_blkno);
 
-       status = ocfs2_generic_unblock_lock(osb,
-                                           lockres,
-                                           requeue,
+       status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
                                            ocfs2_data_convert_worker);
        if (status < 0)
                mlog_errno(status);
 
        mlog(0, "inode %llu, requeue = %d\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
+            (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
 
        mlog_exit(status);
        return status;
 }
 
 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
-                                   int *requeue)
+                                   struct ocfs2_unblock_ctl *ctl)
 {
        int status;
        struct inode *inode;
@@ -2753,9 +2875,7 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
        inode  = ocfs2_lock_res_inode(lockres);
 
        status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
-                                           lockres,
-                                           requeue,
-                                           NULL);
+                                           lockres, ctl, NULL);
        if (status < 0)
                mlog_errno(status);
 
@@ -2763,9 +2883,8 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
        return status;
 }
 
-
-int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
-                      int *requeue)
+static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
+                             struct ocfs2_unblock_ctl *ctl)
 {
        int status;
        struct inode *inode;
@@ -2777,21 +2896,165 @@ int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
        mlog(0, "unblock inode %llu\n",
             (unsigned long long)OCFS2_I(inode)->ip_blkno);
 
-       status = ocfs2_do_unblock_meta(inode, requeue);
+       status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
        if (status < 0)
                mlog_errno(status);
 
        mlog(0, "inode %llu, requeue = %d\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
+            (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
 
        mlog_exit(status);
        return status;
 }
 
+/*
+ * Does the final reference drop on our dentry lock. Right now this
+ * happens in the vote thread, but we could choose to simplify the
+ * dlmglue API and push these off to the ocfs2_wq in the future.
+ */
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+                                    struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+       ocfs2_dentry_lock_put(osb, dl);
+}
+
+/*
+ * d_delete() matching dentries before the lock downconvert.
+ *
+ * At this point, any process waiting to destroy the
+ * dentry_lock due to last ref count is stopped by the
+ * OCFS2_LOCK_QUEUED flag.
+ *
+ * We have two potential problems
+ *
+ * 1) If we do the last reference drop on our dentry_lock (via dput)
+ *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
+ *    the downconvert to finish. Instead we take an elevated
+ *    reference and push the drop until after we've completed our
+ *    unblock processing.
+ *
+ * 2) There might be another process with a final reference,
+ *    waiting on us to finish processing. If this is the case, we
+ *    detect it and exit out - there's no more dentries anyway.
+ */
+static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
+                                      int blocking)
+{
+       struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+       struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
+       struct dentry *dentry;
+       unsigned long flags;
+       int extra_ref = 0;
+
+       /*
+        * This node is blocking another node from getting a read
+        * lock. This happens when we've renamed within a
+        * directory. We've forced the other nodes to d_delete(), but
+        * we never actually dropped our lock because it's still
+        * valid. The downconvert code will retain a PR for this node,
+        * so there's no further work to do.
+        */
+       if (blocking == LKM_PRMODE)
+               return UNBLOCK_CONTINUE;
+
+       /*
+        * Mark this inode as potentially orphaned. The code in
+        * ocfs2_delete_inode() will figure out whether it actually
+        * needs to be freed or not.
+        */
+       spin_lock(&oi->ip_lock);
+       oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
+       spin_unlock(&oi->ip_lock);
+
+       /*
+        * Yuck. We need to make sure however that the check of
+        * OCFS2_LOCK_FREEING and the extra reference are atomic with
+        * respect to a reference decrement or the setting of that
+        * flag.
+        */
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       spin_lock(&dentry_attach_lock);
+       if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
+           && dl->dl_count) {
+               dl->dl_count++;
+               extra_ref = 1;
+       }
+       spin_unlock(&dentry_attach_lock);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+       mlog(0, "extra_ref = %d\n", extra_ref);
+
+       /*
+        * We have a process waiting on us in ocfs2_dentry_iput(),
+        * which means we can't have any more outstanding
+        * aliases. There's no need to do any more work.
+        */
+       if (!extra_ref)
+               return UNBLOCK_CONTINUE;
+
+       spin_lock(&dentry_attach_lock);
+       while (1) {
+               dentry = ocfs2_find_local_alias(dl->dl_inode,
+                                               dl->dl_parent_blkno, 1);
+               if (!dentry)
+                       break;
+               spin_unlock(&dentry_attach_lock);
+
+               mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
+                    dentry->d_name.name);
+
+               /*
+                * The following dcache calls may do an
+                * iput(). Normally we don't want that from the
+                * downconverting thread, but in this case it's ok
+                * because the requesting node already has an
+                * exclusive lock on the inode, so it can't be queued
+                * for a downconvert.
+                */
+               d_delete(dentry);
+               dput(dentry);
+
+               spin_lock(&dentry_attach_lock);
+       }
+       spin_unlock(&dentry_attach_lock);
+
+       /*
+        * If we are the last holder of this dentry lock, there is no
+        * reason to downconvert so skip straight to the unlock.
+        */
+       if (dl->dl_count == 1)
+               return UNBLOCK_STOP_POST;
+
+       return UNBLOCK_CONTINUE_POST;
+}
+
+static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
+                                    struct ocfs2_unblock_ctl *ctl)
+{
+       int ret;
+       struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+       struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
+
+       mlog(0, "unblock dentry lock: %llu\n",
+            (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
+
+       ret = ocfs2_generic_unblock_lock(osb,
+                                        lockres,
+                                        ctl,
+                                        ocfs2_dentry_convert_worker);
+       if (ret < 0)
+               mlog_errno(ret);
+
+       mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
+
+       return ret;
+}
+
 /* Generic unblock function for any lockres whose private data is an
  * ocfs2_super pointer. */
 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
-                                 int *requeue)
+                                 struct ocfs2_unblock_ctl *ctl)
 {
        int status;
        struct ocfs2_super *osb;
@@ -2804,7 +3067,7 @@ static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
 
        status = ocfs2_generic_unblock_lock(osb,
                                            lockres,
-                                           requeue,
+                                           ctl,
                                            NULL);
        if (status < 0)
                mlog_errno(status);
@@ -2817,7 +3080,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
                                struct ocfs2_lock_res *lockres)
 {
        int status;
-       int requeue = 0;
+       struct ocfs2_unblock_ctl ctl = {0, 0,};
        unsigned long flags;
 
        /* Our reference to the lockres in this function can be
@@ -2842,21 +3105,25 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
                goto unqueue;
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       status = lockres->l_ops->unblock(lockres, &requeue);
+       status = lockres->l_ops->unblock(lockres, &ctl);
        if (status < 0)
                mlog_errno(status);
 
        spin_lock_irqsave(&lockres->l_lock, flags);
 unqueue:
-       if (lockres->l_flags & OCFS2_LOCK_FREEING || !requeue) {
+       if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
                lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
        } else
                ocfs2_schedule_blocked_lock(osb, lockres);
 
        mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
-            requeue ? "yes" : "no");
+            ctl.requeue ? "yes" : "no");
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
+       if (ctl.unblock_action != UNBLOCK_CONTINUE
+           && lockres->l_ops->post_unlock)
+               lockres->l_ops->post_unlock(osb, lockres);
+
        mlog_exit_void();
 }
 
index 243ae862ece53ae338fb9ab660adf5ad4ee3cdc5..340251567e9943e417ebaf4aeb50d6672b0ad1a6 100644 (file)
@@ -27,6 +27,8 @@
 #ifndef DLMGLUE_H
 #define DLMGLUE_H
 
+#include "dcache.h"
+
 #define OCFS2_LVB_VERSION 3
 
 struct ocfs2_meta_lvb {
@@ -58,8 +60,12 @@ void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res);
 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
                               enum ocfs2_lock_type type,
                               struct inode *inode);
+void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
+                               u64 parent, struct inode *inode);
 void ocfs2_lock_res_free(struct ocfs2_lock_res *res);
 int ocfs2_create_new_inode_locks(struct inode *inode);
+int ocfs2_create_new_lock(struct ocfs2_super *osb,
+                         struct ocfs2_lock_res *lockres, int ex);
 int ocfs2_drop_inode_locks(struct inode *inode);
 int ocfs2_data_lock_full(struct inode *inode,
                         int write,
@@ -93,7 +99,12 @@ void ocfs2_super_unlock(struct ocfs2_super *osb,
                        int ex);
 int ocfs2_rename_lock(struct ocfs2_super *osb);
 void ocfs2_rename_unlock(struct ocfs2_super *osb);
+int ocfs2_dentry_lock(struct dentry *dentry, int ex);
+void ocfs2_dentry_unlock(struct dentry *dentry, int ex);
+
 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres);
+void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
+                              struct ocfs2_lock_res *lockres);
 
 /* for the vote thread */
 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
index 7dd9e1e705b0cd3a7f06bbb3bce5579eca98860e..4d5d5655c185085c9e259b6669bcd35cd29d3068 100644 (file)
 #define OCFS2_LOCK_ID_MAX_LEN  32
 #define OCFS2_LOCK_ID_PAD "000000"
 
+#define OCFS2_DENTRY_LOCK_INO_START 18
+
 enum ocfs2_lock_type {
        OCFS2_LOCK_TYPE_META = 0,
        OCFS2_LOCK_TYPE_DATA,
        OCFS2_LOCK_TYPE_SUPER,
        OCFS2_LOCK_TYPE_RENAME,
        OCFS2_LOCK_TYPE_RW,
+       OCFS2_LOCK_TYPE_DENTRY,
        OCFS2_NUM_LOCK_TYPES
 };
 
@@ -63,6 +66,9 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
                case OCFS2_LOCK_TYPE_RW:
                        c = 'W';
                        break;
+               case OCFS2_LOCK_TYPE_DENTRY:
+                       c = 'N';
+                       break;
                default:
                        c = '\0';
        }
@@ -70,4 +76,23 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
        return c;
 }
 
+static char *ocfs2_lock_type_strings[] = {
+       [OCFS2_LOCK_TYPE_META] = "Meta",
+       [OCFS2_LOCK_TYPE_DATA] = "Data",
+       [OCFS2_LOCK_TYPE_SUPER] = "Super",
+       [OCFS2_LOCK_TYPE_RENAME] = "Rename",
+       /* Need to differntiate from [R]ename.. serializing writes is the
+        * important job it does, anyway. */
+       [OCFS2_LOCK_TYPE_RW] = "Write/Read",
+       [OCFS2_LOCK_TYPE_DENTRY] = "Dentry",
+};
+
+static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
+{
+#ifdef __KERNEL__
+       mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
+#endif
+       return ocfs2_lock_type_strings[type];
+}
+
 #endif  /* OCFS2_LOCKID_H */