r21140: enable the pending lock notify code in the ctdb backend so that timed
authorAndrew Tridgell <tridge@samba.org>
Mon, 5 Feb 2007 03:23:45 +0000 (03:23 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 19:44:33 +0000 (14:44 -0500)
locks retry immediately when another blocking lock is removed.
(This used to be commit 169920a5341392e84d5b5fb5f39c2b9d5243bb3b)

source4/cluster/ctdb/brlock_ctdb.c

index e847a6f50ce927beb1e25e02cfe217ae4824aa2a..ffc5facbb4b7d53799c10b3902151a213f62c9fc 100644 (file)
@@ -31,8 +31,6 @@
 #include "ntvfs/common/brlock.h"
 #include "include/ctdb.h"
 
-#define ENABLE_NOTIFIES 0
-
 enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2, 
                   FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
                   FUNC_BRL_CLOSE=5};
@@ -296,26 +294,19 @@ static int brl_ctdb_lock_func(struct ctdb_call_info *call)
        struct lock_struct lock, *locks=NULL;
        NTSTATUS status = NT_STATUS_OK;
 
-#if ENABLE_NOTIFIES
        /* if this is a pending lock, then with the chainlock held we
           try to get the real lock. If we succeed then we don't need
           to make it pending. This prevents a possible race condition
           where the pending lock gets created after the lock that is
           preventing the real lock gets removed */
-       if (lock_type >= PENDING_READ_LOCK) {
-               enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
-
-               /* here we need to force that the last_lock isn't overwritten */
-               lock = brlh->last_lock;
-               status = brl_ctdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
-               brlh->last_lock = lock;
-
-               if (NT_STATUS_IS_OK(status)) {
-                       tdb_chainunlock(brl->w->tdb, kbuf);
-                       return NT_STATUS_OK;
+       if (req->lock_type >= PENDING_READ_LOCK) {
+               enum brl_type lock_type = req->lock_type;
+               req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
+               if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
+                       return 0;
                }
+               req->lock_type = lock_type;
        }
-#endif
 
        dbuf = call->record_data;
 
@@ -383,6 +374,7 @@ static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
        struct ctdb_lock_req req;
        struct ctdb_call call;
        int ret;
+       NTSTATUS status;
 
        call.call_id = FUNC_BRL_LOCK;
        call.key.dptr = brlh->key.data;
@@ -405,18 +397,35 @@ static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
                return NT_STATUS_INTERNAL_DB_CORRUPTION;
        }
 
-       return NT_STATUS(call.status);
+       status = NT_STATUS(call.status);
+
+       if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
+               struct lock_struct lock;
+               lock.context.smbpid = smbpid;
+               lock.context.server = brl->server;
+               lock.context.ctx = brl;
+               lock.ntvfs = brlh->ntvfs;
+               lock.start = start;
+               lock.size = size;
+               lock.lock_type = lock_type;
+               lock.notify_ptr = notify_ptr;
+               status = brl_ctdb_lock_failed(brlh, &lock);
+       }
+
+       return status;
 }
 
-#if ENABLE_NOTIFIES
 /*
-  we are removing a lock that might be holding up a pending lock. Scan for pending
-  locks that cover this range and if we find any then notify the server that it should
-  retry the lock
+  we are removing a lock that might be holding up a pending lock. Scan
+  for pending locks that cover this range and if we find any then
+  notify the server that it should retry the lock. In this backend, we
+  notify by sending the list of locks that need to be notified on back
+  in the reply_data of the ctdb call. The caller then does the
+  messaging for us. 
 */
-static void brl_ctdb_notify_unlock(struct brl_context *brl,
-                             struct lock_struct *locks, int count, 
-                             struct lock_struct *removed_lock)
+static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
+                                 struct lock_struct *locks, int count, 
+                                  struct lock_struct *removed_lock)
 {
        int i, last_notice;
 
@@ -429,36 +438,68 @@ static void brl_ctdb_notify_unlock(struct brl_context *brl,
        for (i=0;i<count;i++) {
                if (locks[i].lock_type >= PENDING_READ_LOCK &&
                    brl_ctdb_overlap(&locks[i], removed_lock)) {
+                       struct lock_struct *nlocks;
+                       int ncount;
+
                        if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
                                continue;
                        }
                        if (locks[i].lock_type == PENDING_WRITE_LOCK) {
                                last_notice = i;
                        }
-                       messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
-                                          MSG_BRL_RETRY, locks[i].notify_ptr);
+                       if (call->reply_data == NULL) {
+                               call->reply_data = talloc_zero(call, TDB_DATA);
+                               if (call->reply_data == NULL) {
+                                       return CTDB_ERR_NOMEM;
+                               }
+                       }
+                       /* add to the list of pending locks to notify caller of */
+                       ncount = call->reply_data->dsize / sizeof(struct lock_struct);
+                       nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr, 
+                                               struct lock_struct, ncount + 1);
+                       if (nlocks == NULL) {
+                               return CTDB_ERR_NOMEM;
+                       }
+                       call->reply_data->dptr = (uint8_t *)nlocks;
+                       nlocks[ncount] = locks[i];
+                       call->reply_data->dsize += sizeof(struct lock_struct);
                }
        }
+
+       return 0;
 }
-#endif
 
 /*
   send notifications for all pending locks - the file is being closed by this
   user
 */
-static void brl_ctdb_notify_all(struct brl_context *brl,
-                          struct lock_struct *locks, int count)
+static int brl_ctdb_notify_all(struct ctdb_call_info *call,
+                               struct lock_struct *locks, int count)
 {
        int i;
        for (i=0;i<count;i++) {
                if (locks->lock_type >= PENDING_READ_LOCK) {
-#if ENABLE_NOTIFIES
-                       brl_ctdb_notify_unlock(brl, locks, count, &locks[i]);
-#endif
+                       int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
+                       if (ret != 0) return ret;
                }
        }
+       return 0;
 }
 
+/*
+  send off any messages needed to notify of pending locks that should now retry
+*/
+static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
+{
+       struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
+       int i, count = reply_data->dsize / sizeof(struct lock_struct);
+       for (i=0;i<count;i++) {
+               messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
+                                  MSG_BRL_RETRY, locks[i].notify_ptr);
+       }
+}
+
+
 struct ctdb_unlock_req {
        uint16_t smbpid;
        uint64_t start;
@@ -515,9 +556,7 @@ static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
 
 found:
        if (i < count) {
-#if ENABLE_NOTIFIES
                struct lock_struct removed_lock = *lock;
-#endif
 
                call->new_data = talloc(call, TDB_DATA);
                if (call->new_data == NULL) {
@@ -535,9 +574,8 @@ found:
                       (count-(i+1))*sizeof(*lock));
                
                if (count > 1) {
-#if ENABLE_NOTIFIES
-                       brl_ctdb_notify_unlock(req->brl, locks, count, &removed_lock);
-#endif
+                       int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
+                       if (ret != 0) return ret;
                }
        }
 
@@ -584,6 +622,8 @@ static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
                return NT_STATUS_INTERNAL_DB_CORRUPTION;
        }
 
+       brl_ctdb_notify_send(brl, &call.reply_data);
+
        return NT_STATUS(call.status);
 }
 
@@ -808,6 +848,8 @@ static int brl_ctdb_close_func(struct ctdb_call_info *call)
                if (call->new_data == NULL) {
                        return CTDB_ERR_NOMEM;
                }
+
+               brl_ctdb_notify_all(call, locks, count);
                
                call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
                if (call->new_data->dptr == NULL) {
@@ -850,6 +892,8 @@ static NTSTATUS brl_ctdb_close(struct brl_context *brl,
                return NT_STATUS_INTERNAL_DB_CORRUPTION;
        }
 
+       brl_ctdb_notify_send(brl, &call.reply_data);
+
        return NT_STATUS(call.status);
 }