Merge branch 'hwmon-for-linus' of git://jdelvare.pck.nerim.net/jdelvare-2.6
[sfrench/cifs-2.6.git] / fs / ocfs2 / dlm / dlmlock.c
index 671d4ff222cc083c15aa63ed33048c899da2d26b..e5ca3db197f63c647a37c201b70b259a746de2e8 100644 (file)
@@ -53,7 +53,7 @@
 #define MLOG_MASK_PREFIX ML_DLM
 #include "cluster/masklog.h"
 
-static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(dlm_cookie_lock);
 static u64 dlm_next_cookie = 1;
 
 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
@@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
                                          res->lockname.len)) {
                        kick_thread = 1;
                        call_ast = 1;
+               } else {
+                       mlog(0, "%s: returning DLM_NORMAL to "
+                            "node %u for reco lock\n", dlm->name,
+                            lock->ml.node);
                }
        } else {
                /* for NOQUEUE request, unless we get the
                 * lock right away, return DLM_NOTQUEUED */
-               if (flags & LKM_NOQUEUE)
+               if (flags & LKM_NOQUEUE) {
                        status = DLM_NOTQUEUED;
-               else {
+                       if (dlm_is_recovery_lock(res->lockname.name,
+                                                res->lockname.len)) {
+                               mlog(0, "%s: returning NOTQUEUED to "
+                                    "node %u for reco lock\n", dlm->name,
+                                    lock->ml.node);
+                       }
+               } else {
                        dlm_lock_get(lock);
                        list_add_tail(&lock->list, &res->blocked);
                        kick_thread = 1;
@@ -191,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
                                      struct dlm_lock *lock, int flags)
 {
        enum dlm_status status = DLM_DENIED;
+       int lockres_changed = 1;
 
        mlog_entry("type=%d\n", lock->ml.type);
        mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
@@ -216,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
        lock->lock_pending = 0;
        if (status != DLM_NORMAL) {
-               if (status != DLM_NOTQUEUED)
+               if (status == DLM_RECOVERING &&
+                   dlm_is_recovery_lock(res->lockname.name,
+                                        res->lockname.len)) {
+                       /* recovery lock was mastered by dead node.
+                        * we need to have calc_usage shoot down this
+                        * lockres and completely remaster it. */
+                       mlog(0, "%s: recovery lock was owned by "
+                            "dead node %u, remaster it now.\n",
+                            dlm->name, res->owner);
+               } else if (status != DLM_NOTQUEUED) {
+                       /*
+                        * DO NOT call calc_usage, as this would unhash
+                        * the remote lockres before we ever get to use
+                        * it.  treat as if we never made any change to
+                        * the lockres.
+                        */
+                       lockres_changed = 0;
                        dlm_error(status);
+               }
                dlm_revert_pending_lock(res, lock);
                dlm_lock_put(lock);
        } else if (dlm_is_recovery_lock(res->lockname.name, 
@@ -229,12 +257,12 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
                mlog(0, "%s: $RECOVERY lock for this node (%u) is "
                     "mastered by %u; got lock, manually granting (no ast)\n",
                     dlm->name, dlm->node_num, res->owner);
-               list_del_init(&lock->list);
-               list_add_tail(&lock->list, &res->granted);
+               list_move_tail(&lock->list, &res->granted);
        }
        spin_unlock(&res->spinlock);
 
-       dlm_lockres_calc_usage(dlm, res);
+       if (lockres_changed)
+               dlm_lockres_calc_usage(dlm, res);
 
        wake_up(&res->wq);
        return status;
@@ -271,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
        if (tmpret >= 0) {
                // successfully sent and received
                ret = status;  // this is already a dlm_status
+               if (ret == DLM_REJECTED) {
+                       mlog(ML_ERROR, "%s:%.*s: BUG.  this is a stale lockres "
+                            "no longer owned by %u.  that node is coming back "
+                            "up currently.\n", dlm->name, create.namelen,
+                            create.name, res->owner);
+                       dlm_print_one_lock_resource(res);
+                       BUG();
+               }
        } else {
                mlog_errno(tmpret);
                if (dlm_is_host_down(tmpret)) {
@@ -372,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
        struct dlm_lock *lock;
        int kernel_allocated = 0;
 
-       lock = kcalloc(1, sizeof(*lock), GFP_KERNEL);
+       lock = kzalloc(sizeof(*lock), GFP_NOFS);
        if (!lock)
                return NULL;
 
        if (!lksb) {
                /* zero memory only if kernel-allocated */
-               lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL);
+               lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
                if (!lksb) {
                        kfree(lock);
                        return NULL;
@@ -419,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
        if (!dlm_grab(dlm))
                return DLM_REJECTED;
 
-       mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
-                       "Domain %s not fully joined!\n", dlm->name);
-
        name = create->name;
        namelen = create->namelen;
+       status = DLM_REJECTED;
+       if (!dlm_domain_fully_joined(dlm)) {
+               mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
+                    "sending a create_lock message for lock %.*s!\n",
+                    dlm->name, create->node_idx, namelen, name);
+               dlm_error(status);
+               goto leave;
+       }
 
        status = DLM_IVBUFLEN;
        if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -499,8 +540,8 @@ static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
 
 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
                        struct dlm_lockstatus *lksb, int flags,
-                       const char *name, dlm_astlockfunc_t *ast, void *data,
-                       dlm_bastlockfunc_t *bast)
+                       const char *name, int namelen, dlm_astlockfunc_t *ast,
+                       void *data, dlm_bastlockfunc_t *bast)
 {
        enum dlm_status status;
        struct dlm_lock_resource *res = NULL;
@@ -530,7 +571,7 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
        recovery = (flags & LKM_RECOVERY);
 
        if (recovery &&
-           (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) {
+           (!dlm_is_recovery_lock(name, namelen) || convert) ) {
                dlm_error(status);
                goto error;
        }
@@ -602,7 +643,7 @@ retry_convert:
                }
 
                status = DLM_IVBUFLEN;
-               if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) {
+               if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
                        dlm_error(status);
                        goto error;
                }
@@ -618,7 +659,7 @@ retry_convert:
                        dlm_wait_for_recovery(dlm);
 
                /* find or create the lock resource */
-               res = dlm_get_lock_resource(dlm, name, flags);
+               res = dlm_get_lock_resource(dlm, name, namelen, flags);
                if (!res) {
                        status = DLM_IVLOCKID;
                        dlm_error(status);
@@ -659,18 +700,22 @@ retry_lock:
                        msleep(100);
                        /* no waiting for dlm_reco_thread */
                        if (recovery) {
-                               if (status == DLM_RECOVERING) {
-                                       mlog(0, "%s: got RECOVERING "
-                                            "for $REOCVERY lock, master "
-                                            "was %u\n", dlm->name, 
-                                            res->owner);
-                                       dlm_wait_for_node_death(dlm, res->owner, 
-                                                       DLM_NODE_DEATH_WAIT_MAX);
-                               }
+                               if (status != DLM_RECOVERING)
+                                       goto retry_lock;
+
+                               mlog(0, "%s: got RECOVERING "
+                                    "for $RECOVERY lock, master "
+                                    "was %u\n", dlm->name,
+                                    res->owner);
+                               /* wait to see the node go down, then
+                                * drop down and allow the lockres to
+                                * get cleaned up.  need to remaster. */
+                               dlm_wait_for_node_death(dlm, res->owner,
+                                               DLM_NODE_DEATH_WAIT_MAX);
                        } else {
                                dlm_wait_for_recovery(dlm);
+                               goto retry_lock;
                        }
-                       goto retry_lock;
                }
 
                if (status != DLM_NORMAL) {