ctdb-daemon: Don't check if lock_ctx->ctdb_db is NULL
[samba.git] / ctdb / server / ctdb_lock.c
index ee9c0c15fa5f165edb52ee42b3fbdd0d45fc8897..5f032ae568b7297df5470b92a0c742b527e23a74 100644 (file)
    You should have received a copy of the GNU General Public License
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
-#include "includes.h"
-#include "include/ctdb_private.h"
-#include "include/ctdb_protocol.h"
-#include "tevent.h"
-#include "tdb.h"
-#include "db_wrap.h"
+#include "replace.h"
 #include "system/filesys.h"
+#include "system/network.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
+
+#include "ctdb_private.h"
+
+#include "common/common.h"
+#include "common/logging.h"
 
 /*
  * Non-blocking Locking API
  * 2. Once the locks are obtained, signal parent process via fd.
  * 3. Invoke registered callback routine with locking status.
  * 4. If the child process cannot get locks within certain time,
- *    diagnose using /proc/locks and log warning message
+ *    execute an external script to debug.
  *
  * ctdb_lock_record()      - get a lock on a record
  * ctdb_lock_db()          - get a lock on a DB
- * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
- * ctdb_lock_alldb()       - get a lock on all DBs
  *
  *  auto_mark              - whether to mark/unmark DBs in before/after callback
+ *                           = false is used for freezing databases for
+ *                           recovery since the recovery cannot start till
+ *                           databases are locked on all the nodes.
+ *                           = true is used for record locks.
  */
 
-/* FIXME: Add a tunable max_lock_processes_per_db */
-#define MAX_LOCK_PROCESSES_PER_DB              (100)
-
 enum lock_type {
        LOCK_RECORD,
        LOCK_DB,
-       LOCK_ALLDB_PRIO,
-       LOCK_ALLDB,
 };
 
 static const char * const lock_type_str[] = {
        "lock_record",
        "lock_db",
-       "lock_alldb_prio",
-       "lock_alldb",
 };
 
 struct lock_request;
@@ -89,62 +93,14 @@ struct lock_request {
 };
 
 
-/*
- * Support samba 3.6.x (and older) versions which do not set db priority.
- *
- * By default, all databases are set to priority 1. So only when priority
- * is set to 1, check for databases that need higher priority.
- */
-static bool later_db(struct ctdb_context *ctdb, const char *name)
-{
-       if (ctdb->tunable.samba3_hack == 0) {
-               return false;
-       }
-
-       if (strstr(name, "brlock") ||
-           strstr(name, "g_lock") ||
-           strstr(name, "notify_onelevel") ||
-           strstr(name, "serverid") ||
-           strstr(name, "xattr_tdb")) {
-               return true;
-       }
-
-       return false;
-}
-
-typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
-                           uint32_t priority,
-                           void *private_data);
-
-static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
-                           db_handler_t handler, void *private_data)
+int ctdb_db_iterator(struct ctdb_context *ctdb, ctdb_db_handler_t handler,
+                    void *private_data)
 {
        struct ctdb_db_context *ctdb_db;
        int ret;
 
        for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-               if (ctdb_db->priority != priority) {
-                       continue;
-               }
-               if (later_db(ctdb, ctdb_db->db_name)) {
-                       continue;
-               }
-               ret = handler(ctdb_db, priority, private_data);
-               if (ret != 0) {
-                       return -1;
-               }
-       }
-
-       /* If priority != 1, later_db check is not required and can return */
-       if (priority != 1) {
-               return 0;
-       }
-
-       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-               if (!later_db(ctdb, ctdb_db->db_name)) {
-                       continue;
-               }
-               ret = handler(ctdb_db, priority, private_data);
+               ret = handler(ctdb_db, private_data);
                if (ret != 0) {
                        return -1;
                }
@@ -153,17 +109,15 @@ static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
        return 0;
 }
 
-
 /*
  * lock all databases - mark only
  */
-static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
+static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db,
                                void *private_data)
 {
        int tdb_transaction_write_lock_mark(struct tdb_context *);
 
-       DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
-                          ctdb_db->db_name, priority));
+       DEBUG(DEBUG_INFO, ("marking locked database %s\n", ctdb_db->db_name));
 
        if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
                DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
@@ -180,47 +134,26 @@ static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priori
        return 0;
 }
 
-int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
+int ctdb_lockdb_mark(struct ctdb_db_context *ctdb_db)
 {
-       /*
-        * This function is only used by the main dameon during recovery.
-        * At this stage, the databases have already been locked, by a
-        * dedicated child process. The freeze_mode variable is used to track
-        * whether the actual locks are held by the child process or not.
-        */
-
-       if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
-               DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
+       if (!ctdb_db_frozen(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("Attempt to mark database locked when not frozen\n"));
                return -1;
        }
 
-       return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
-}
-
-static int ctdb_lockall_mark(struct ctdb_context *ctdb)
-{
-       uint32_t priority;
-
-       for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
-               if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
-                       return -1;
-               }
-       }
-
-       return 0;
+       return db_lock_mark_handler(ctdb_db, NULL);
 }
 
-
 /*
  * lock all databases - unmark only
  */
-static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
+static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db,
                                  void *private_data)
 {
        int tdb_transaction_write_lock_unmark(struct tdb_context *);
 
-       DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
-                          ctdb_db->db_name, priority));
+       DEBUG(DEBUG_INFO, ("unmarking locked database %s\n", ctdb_db->db_name));
 
        if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
                DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
@@ -237,37 +170,17 @@ static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t prio
        return 0;
 }
 
-int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
+int ctdb_lockdb_unmark(struct ctdb_db_context *ctdb_db)
 {
-       /*
-        * This function is only used by the main daemon during recovery.
-        * At this stage, the databases have already been locked, by a
-        * dedicated child process. The freeze_mode variable is used to track
-        * whether the actual locks are held by the child process or not.
-        */
-
-       if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
-               DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
+       if (!ctdb_db_frozen(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("Attempt to unmark database locked when not frozen\n"));
                return -1;
        }
 
-       return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
+       return db_lock_unmark_handler(ctdb_db, NULL);
 }
 
-static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
-{
-       uint32_t priority;
-
-       for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
-               if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-
 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
 
 /*
@@ -275,23 +188,27 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb);
  */
 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
 {
+       if (lock_ctx->request) {
+               lock_ctx->request->lctx = NULL;
+       }
        if (lock_ctx->child > 0) {
-               ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
-               DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
-               if (lock_ctx->ctdb_db) {
-                       lock_ctx->ctdb_db->lock_num_current--;
+               ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGTERM);
+               if (lock_ctx->type == LOCK_RECORD) {
+                       DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
+               } else {
+                       DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
                }
+               lock_ctx->ctdb_db->lock_num_current--;
                CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
-               if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
-                       CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
-               }
+               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
        } else {
-               DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
-               lock_ctx->ctdb->lock_num_pending--;
-               CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
-               if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
-                       CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
+               if (lock_ctx->type == LOCK_RECORD) {
+                       DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
+               } else {
+                       DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
                }
+               CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
+               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
        }
 
        ctdb_lock_schedule(lock_ctx->ctdb);
@@ -305,20 +222,16 @@ static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
  */
 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
 {
-       lock_request->lctx->request = NULL;
-       return 0;
-}
+       if (lock_request->lctx == NULL) {
+               return 0;
+       }
 
-void ctdb_lock_free_request_context(struct lock_request *lock_req)
-{
-       struct lock_context *lock_ctx;
+       lock_request->lctx->request = NULL;
+       TALLOC_FREE(lock_request->lctx);
 
-       lock_ctx = lock_req->lctx;
-       talloc_free(lock_req);
-       talloc_free(lock_ctx);
+       return 0;
 }
 
-
 /*
  * Process all the callbacks waiting for lock
  *
@@ -327,53 +240,50 @@ void ctdb_lock_free_request_context(struct lock_request *lock_req)
 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
 {
        struct lock_request *request;
+       bool auto_mark = lock_ctx->auto_mark;
 
-       if (lock_ctx->auto_mark && locked) {
+       if (auto_mark && locked) {
                switch (lock_ctx->type) {
                case LOCK_RECORD:
                        tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
                        break;
 
                case LOCK_DB:
-                       tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
-                       break;
-
-               case LOCK_ALLDB_PRIO:
-                       ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
-                       break;
-
-               case LOCK_ALLDB:
-                       ctdb_lockall_mark(lock_ctx->ctdb);
+                       (void)ctdb_lockdb_mark(lock_ctx->ctdb_db);
                        break;
                }
        }
 
        request = lock_ctx->request;
-       if (lock_ctx->auto_mark) {
-               /* Reset the destructor, so request is not removed from the list */
-               talloc_set_destructor(request, NULL);
+       if (auto_mark) {
+               /* Since request may be freed in the callback, unset the lock
+                * context, so request destructor will not free lock context.
+                */
+               request->lctx = NULL;
        }
+
+       /* Since request may be freed in the callback, unset the request */
+       lock_ctx->request = NULL;
+
        request->callback(request->private_data, locked);
 
-       if (lock_ctx->auto_mark && locked) {
+       if (!auto_mark) {
+               return;
+       }
+
+       if (locked) {
                switch (lock_ctx->type) {
                case LOCK_RECORD:
                        tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
                        break;
 
                case LOCK_DB:
-                       tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
-                       break;
-
-               case LOCK_ALLDB_PRIO:
-                       ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
-                       break;
-
-               case LOCK_ALLDB:
-                       ctdb_lockall_unmark(lock_ctx->ctdb);
+                       ctdb_lockdb_unmark(lock_ctx->ctdb_db);
                        break;
                }
        }
+
+       talloc_free(lock_ctx);
 }
 
 
@@ -419,7 +329,6 @@ static void ctdb_lock_handler(struct tevent_context *ev,
                            void *private_data)
 {
        struct lock_context *lock_ctx;
-       TALLOC_CTX *tmp_ctx = NULL;
        char c;
        bool locked;
        double t;
@@ -428,57 +337,161 @@ static void ctdb_lock_handler(struct tevent_context *ev,
        lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
 
        /* cancel the timeout event */
-       if (lock_ctx->ttimer) {
-               TALLOC_FREE(lock_ctx->ttimer);
-       }
+       TALLOC_FREE(lock_ctx->ttimer);
 
        t = timeval_elapsed(&lock_ctx->start_time);
        id = lock_bucket_id(t);
 
-       if (lock_ctx->auto_mark) {
-               tmp_ctx = talloc_new(ev);
-               talloc_steal(tmp_ctx, lock_ctx);
-       }
-
        /* Read the status from the child process */
-       if (read(lock_ctx->fd[0], &c, 1) != 1) {
+       if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
                locked = false;
        } else {
                locked = (c == 0 ? true : false);
        }
 
        /* Update statistics */
-       CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
        CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
-       if (lock_ctx->ctdb_db) {
-               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
-               CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
-       }
+       CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
 
        if (locked) {
-               if (lock_ctx->ctdb_db) {
-                       CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
-                       CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
-                                           lock_type_str[lock_ctx->type], locks.latency,
-                                           lock_ctx->start_time);
-
-                       CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
-                       CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
-               }
+               CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
+               CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
+                                   lock_type_str[lock_ctx->type], locks.latency,
+                                   lock_ctx->start_time);
+
+               CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
+               CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
        } else {
                CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
-               if (lock_ctx->ctdb_db) {
-                       CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
-               }
+               CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
        }
 
        process_callbacks(lock_ctx, locked);
+}
 
-       if (lock_ctx->auto_mark) {
-               talloc_free(tmp_ctx);
+struct lock_log_entry {
+       struct db_hash_context *lock_log;
+       TDB_DATA key;
+       unsigned long log_sec;
+       struct tevent_timer *timer;
+};
+
+static int lock_log_fetch_parser(uint8_t *keybuf, size_t keylen,
+                                uint8_t *databuf, size_t datalen,
+                                void *private_data)
+{
+       struct lock_log_entry **entry =
+               (struct lock_log_entry **)private_data;
+
+       if (datalen != sizeof(struct lock_log_entry *)) {
+               return EINVAL;
        }
+
+       *entry = talloc_get_type_abort(*(void **)databuf,
+                                      struct lock_log_entry);
+       return 0;
 }
 
+static void lock_log_cleanup(struct tevent_context *ev,
+                            struct tevent_timer *ttimer,
+                            struct timeval current_time,
+                            void *private_data)
+{
+       struct lock_log_entry *entry = talloc_get_type_abort(
+               private_data, struct lock_log_entry);
+       int ret;
+
+       entry->timer = NULL;
+
+       ret = db_hash_delete(entry->lock_log, entry->key.dptr,
+                            entry->key.dsize);
+       if (ret != 0) {
+               return;
+       }
+       talloc_free(entry);
+}
+
+static bool lock_log_skip(struct tevent_context *ev,
+                         struct db_hash_context *lock_log,
+                         TDB_DATA key, unsigned long elapsed_sec)
+{
+       struct lock_log_entry *entry = NULL;
+       int ret;
+
+       ret = db_hash_fetch(lock_log, key.dptr, key.dsize,
+                           lock_log_fetch_parser, &entry);
+       if (ret == ENOENT) {
+
+               entry = talloc_zero(lock_log, struct lock_log_entry);
+               if (entry == NULL) {
+                       goto fail;
+               }
+
+               entry->lock_log = lock_log;
+
+               entry->key.dptr = talloc_memdup(entry, key.dptr, key.dsize);
+               if (entry->key.dptr == NULL) {
+                       talloc_free(entry);
+                       goto fail;
+               }
+               entry->key.dsize = key.dsize;
+
+               entry->log_sec = elapsed_sec;
+               entry->timer = tevent_add_timer(ev, entry,
+                                               timeval_current_ofs(30, 0),
+                                               lock_log_cleanup, entry);
+               if (entry->timer == NULL) {
+                       talloc_free(entry);
+                       goto fail;
+               }
+
+               ret = db_hash_add(lock_log, key.dptr, key.dsize,
+                                 (uint8_t *)&entry,
+                                 sizeof(struct lock_log_entry *));
+               if (ret != 0) {
+                       talloc_free(entry);
+                       goto fail;
+               }
+
+               return false;
+
+       } else if (ret == EINVAL) {
+
+               ret = db_hash_delete(lock_log, key.dptr, key.dsize);
+               if (ret != 0) {
+                       goto fail;
+               }
+
+               return false;
+
+       } else if (ret == 0) {
+
+               if (elapsed_sec <= entry->log_sec) {
+                       return true;
+               }
+
+               entry->log_sec = elapsed_sec;
+
+               TALLOC_FREE(entry->timer);
+               entry->timer = tevent_add_timer(ev, entry,
+                                               timeval_current_ofs(30, 0),
+                                               lock_log_cleanup, entry);
+               if (entry->timer == NULL) {
+                       ret = db_hash_delete(lock_log, key.dptr, key.dsize);
+                       if (ret != 0) {
+                               goto fail;
+                       }
+                       talloc_free(entry);
+               }
+
+               return false;
+       }
+
+
+fail:
+       return false;
+
+}
 
 /*
  * Callback routine when required locks are not obtained within timeout
@@ -489,36 +502,55 @@ static void ctdb_lock_timeout_handler(struct tevent_context *ev,
                                    struct timeval current_time,
                                    void *private_data)
 {
-       static const char * debug_locks = NULL;
+       static char debug_locks[PATH_MAX+1] = "";
        struct lock_context *lock_ctx;
        struct ctdb_context *ctdb;
        pid_t pid;
+       double elapsed_time;
+       bool skip;
+       char *keystr;
 
        lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
        ctdb = lock_ctx->ctdb;
 
-       if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
-               DEBUG(DEBUG_WARNING,
-                     ("Unable to get %s lock on database %s for %.0lf seconds\n",
-                      (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
-                      lock_ctx->ctdb_db->db_name,
-                      timeval_elapsed(&lock_ctx->start_time)));
-       } else {
+       elapsed_time = timeval_elapsed(&lock_ctx->start_time);
+
+       /* For database locks, always log */
+       if (lock_ctx->type == LOCK_DB) {
                DEBUG(DEBUG_WARNING,
-                     ("Unable to get ALLDB locks for %.0lf seconds\n",
-                      timeval_elapsed(&lock_ctx->start_time)));
+                     ("Unable to get DB lock on database %s for "
+                      "%.0lf seconds\n",
+                      lock_ctx->ctdb_db->db_name, elapsed_time));
+               goto lock_debug;
        }
 
-       /* Fire a child process to find the blocking process. */
-       if (debug_locks == NULL) {
-               debug_locks = getenv("CTDB_DEBUG_LOCKS");
-               if (debug_locks == NULL) {
-                       debug_locks = talloc_asprintf(ctdb,
-                                                     "%s/debug_locks.sh",
-                                                     getenv("CTDB_BASE"));
-               }
+       /* For record locks, check if we have already logged */
+       skip = lock_log_skip(ev, lock_ctx->ctdb_db->lock_log,
+                            lock_ctx->key, (unsigned long)elapsed_time);
+       if (skip) {
+               goto skip_lock_debug;
        }
-       if (debug_locks != NULL) {
+
+       keystr = hex_encode_talloc(lock_ctx, lock_ctx->key.dptr,
+                                  lock_ctx->key.dsize);
+       DEBUG(DEBUG_WARNING,
+             ("Unable to get RECORD lock on database %s for %.0lf seconds"
+              " (key %s)\n",
+              lock_ctx->ctdb_db->db_name, elapsed_time,
+              keystr ? keystr : ""));
+       TALLOC_FREE(keystr);
+
+       /* If a node stopped/banned, don't spam the logs */
+       if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
+               goto skip_lock_debug;
+       }
+
+lock_debug:
+
+       if (ctdb_set_helper("lock debugging helper",
+                           debug_locks, sizeof(debug_locks),
+                           "CTDB_DEBUG_LOCKS",
+                           getenv("CTDB_BASE"), "debug_locks.sh")) {
                pid = vfork();
                if (pid == 0) {
                        execl(debug_locks, debug_locks, NULL);
@@ -528,9 +560,11 @@ static void ctdb_lock_timeout_handler(struct tevent_context *ev,
        } else {
                DEBUG(DEBUG_WARNING,
                      (__location__
-                      " Unable to setup lock debugging - no memory?\n"));
+                      " Unable to setup lock debugging\n"));
        }
 
+skip_lock_debug:
+
        /* reset the timeout timer */
        // talloc_free(lock_ctx->ttimer);
        lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
@@ -540,40 +574,12 @@ static void ctdb_lock_timeout_handler(struct tevent_context *ev,
                                            (void *)lock_ctx);
 }
 
-
-static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
-                           void *private_data)
-{
-       int *count = (int *)private_data;
-
-       (*count)++;
-
-       return 0;
-}
-
-struct db_namelist {
-       char **names;
-       int n;
-};
-
-static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
-                          void *private_data)
+static bool lock_helper_args(TALLOC_CTX *mem_ctx,
+                            struct lock_context *lock_ctx, int fd,
+                            int *argc, const char ***argv)
 {
-       struct db_namelist *list = (struct db_namelist *)private_data;
-
-       list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
-       list->n++;
-
-       return 0;
-}
-
-static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ctx, int fd)
-{
-       struct ctdb_context *ctdb = lock_ctx->ctdb;
-       char **args = NULL;
-       int nargs, i;
-       int priority;
-       struct db_namelist list;
+       const char **args = NULL;
+       int nargs = 0, i;
 
        switch (lock_ctx->type) {
        case LOCK_RECORD:
@@ -583,36 +589,25 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
        case LOCK_DB:
                nargs = 5;
                break;
-
-       case LOCK_ALLDB_PRIO:
-               nargs = 4;
-               ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
-               break;
-
-       case LOCK_ALLDB:
-               nargs = 4;
-               for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
-                       ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
-               }
-               break;
        }
 
        /* Add extra argument for null termination */
        nargs++;
 
-       args = talloc_array(mem_ctx, char *, nargs);
+       args = talloc_array(mem_ctx, const char *, nargs);
        if (args == NULL) {
-               return NULL;
+               return false;
        }
 
-       args[0] = talloc_strdup(args, "ctdb_lock_helper");
-       args[1] = talloc_asprintf(args, "%d", getpid());
-       args[2] = talloc_asprintf(args, "%d", fd);
+       args[0] = talloc_asprintf(args, "%d", getpid());
+       args[1] = talloc_asprintf(args, "%d", fd);
 
        switch (lock_ctx->type) {
        case LOCK_RECORD:
-               args[3] = talloc_strdup(args, "RECORD");
-               args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[2] = talloc_strdup(args, "RECORD");
+               args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                               tdb_get_flags(lock_ctx->ctdb_db->ltdb->tdb));
                if (lock_ctx->key.dsize == 0) {
                        args[5] = talloc_strdup(args, "NULL");
                } else {
@@ -621,24 +616,10 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
                break;
 
        case LOCK_DB:
-               args[3] = talloc_strdup(args, "DB");
-               args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
-               break;
-
-       case LOCK_ALLDB_PRIO:
-               args[3] = talloc_strdup(args, "DB");
-               list.names = args;
-               list.n = 4;
-               ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
-               break;
-
-       case LOCK_ALLDB:
-               args[3] = talloc_strdup(args, "DB");
-               list.names = args;
-               list.n = 4;
-               for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
-                       ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
-               }
+               args[2] = talloc_strdup(args, "DB");
+               args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                               tdb_get_flags(lock_ctx->ctdb_db->ltdb->tdb));
                break;
        }
 
@@ -648,119 +629,93 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
        for (i=0; i<nargs-1; i++) {
                if (args[i] == NULL) {
                        talloc_free(args);
-                       return NULL;
+                       return false;
                }
        }
 
-       return args;
+       *argc = nargs;
+       *argv = args;
+       return true;
 }
 
-
 /*
- * Find the lock context of a given type
+ * Find a lock request that can be scheduled
  */
-static struct lock_context *find_lock_context(struct lock_context *lock_list,
-                                             struct ctdb_db_context *ctdb_db,
-                                             TDB_DATA key,
-                                             uint32_t priority,
-                                             enum lock_type type,
-                                             uint32_t key_hash)
+static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
 {
-       struct lock_context *lock_ctx;
+       struct lock_context *lock_ctx, *next_ctx;
+       struct ctdb_db_context *ctdb_db;
+
+       /* First check if there are database lock requests */
+
+       for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
+            lock_ctx = next_ctx) {
+
+               if (lock_ctx->request != NULL) {
+                       /* Found a lock context with a request */
+                       return lock_ctx;
+               }
 
-       /* Search active locks */
-       for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
-               if (lock_ctx->type != type) {
+               next_ctx = lock_ctx->next;
+
+               DEBUG(DEBUG_INFO, ("Removing lock context without lock "
+                                  "request\n"));
+               DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
+               CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
+               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
+               talloc_free(lock_ctx);
+       }
+
+       /* Next check database queues */
+       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+               if (ctdb_db->lock_num_current ==
+                   ctdb->tunable.lock_processes_per_db) {
                        continue;
                }
 
-               switch (lock_ctx->type) {
-               case LOCK_RECORD:
-                       if (ctdb_db == lock_ctx->ctdb_db &&
-                           key_hash == lock_ctx->key_hash) {
-                               goto done;
-                       }
-                       break;
+               for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
+                    lock_ctx = next_ctx) {
 
-               case LOCK_DB:
-                       if (ctdb_db == lock_ctx->ctdb_db) {
-                               goto done;
-                       }
-                       break;
+                       next_ctx = lock_ctx->next;
 
-               case LOCK_ALLDB_PRIO:
-                       if (priority == lock_ctx->priority) {
-                               goto done;
+                       if (lock_ctx->request != NULL) {
+                               return lock_ctx;
                        }
-                       break;
 
-               case LOCK_ALLDB:
-                       goto done;
-                       break;
+                       DEBUG(DEBUG_INFO, ("Removing lock context without "
+                                          "lock request\n"));
+                       DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
+                       CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
+                       CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
+                       talloc_free(lock_ctx);
                }
        }
 
-       /* Did not find the lock context we are searching for */
-       lock_ctx = NULL;
-
-done:
-       return lock_ctx;
-
+       return NULL;
 }
 
-
 /*
  * Schedule a new lock child process
  * Set up callback handler and timeout handler
  */
 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
 {
-       struct lock_context *lock_ctx, *next_ctx;
-       int ret;
+       struct lock_context *lock_ctx;
+       int ret, argc;
        TALLOC_CTX *tmp_ctx;
-       const char *helper = BINDIR "/ctdb_lock_helper";
-       static const char *prog = NULL;
-       char **args;
-
-       if (prog == NULL) {
-               const char *t;
+       static char prog[PATH_MAX+1] = "";
+       const char **args;
 
-               t = getenv("CTDB_LOCK_HELPER");
-               if (t != NULL) {
-                       prog = talloc_strdup(ctdb, t);
-               } else {
-                       prog = talloc_strdup(ctdb, helper);
-               }
-               CTDB_NO_MEMORY_VOID(ctdb, prog);
-       }
-
-       if (ctdb->lock_pending == NULL) {
-               return;
+       if (!ctdb_set_helper("lock helper",
+                            prog, sizeof(prog),
+                            "CTDB_LOCK_HELPER",
+                            CTDB_HELPER_BINDIR, "ctdb_lock_helper")) {
+               ctdb_die(ctdb, __location__
+                        " Unable to set lock helper\n");
        }
 
        /* Find a lock context with requests */
-       lock_ctx = ctdb->lock_pending;
-       while (lock_ctx != NULL) {
-               next_ctx = lock_ctx->next;
-               if (! lock_ctx->request) {
-                       DEBUG(DEBUG_INFO, ("Removing lock context without lock request\n"));
-                       DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
-                       ctdb->lock_num_pending--;
-                       CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
-                       if (lock_ctx->ctdb_db) {
-                               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
-                       }
-                       talloc_free(lock_ctx);
-               } else {
-                       if (lock_ctx->ctdb_db == NULL ||
-                           lock_ctx->ctdb_db->lock_num_current < MAX_LOCK_PROCESSES_PER_DB) {
-                               /* Found a lock context with lock requests */
-                               break;
-                       }
-               }
-               lock_ctx = next_ctx;
-       }
-
+       lock_ctx = ctdb_find_lock_context(ctdb);
        if (lock_ctx == NULL) {
                return;
        }
@@ -783,9 +738,17 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                return;
        }
 
+       if (! ctdb->do_setsched) {
+               ret = setenv("CTDB_NOSETSCHED", "1", 1);
+               if (ret != 0) {
+                       DEBUG(DEBUG_WARNING,
+                             ("Failed to set CTDB_NOSETSCHED variable\n"));
+               }
+       }
+
        /* Create arguments for lock helper */
-       args = lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1]);
-       if (args == NULL) {
+       if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
+                             &argc, &args)) {
                DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
                close(lock_ctx->fd[0]);
                close(lock_ctx->fd[1]);
@@ -793,9 +756,9 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                return;
        }
 
-       lock_ctx->child = vfork();
-
-       if (lock_ctx->child == (pid_t)-1) {
+       lock_ctx->child = ctdb_vfork_exec(lock_ctx, ctdb, prog, argc,
+                                         (const char **)args);
+       if (lock_ctx->child == -1) {
                DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
                close(lock_ctx->fd[0]);
                close(lock_ctx->fd[1]);
@@ -803,23 +766,9 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                return;
        }
 
-
-       /* Child process */
-       if (lock_ctx->child == 0) {
-               ret = execv(prog, args);
-               if (ret < 0) {
-                       DEBUG(DEBUG_ERR, ("Failed to execute helper %s (%d, %s)\n",
-                                         prog, errno, strerror(errno)));
-               }
-               _exit(1);
-       }
-
        /* Parent process */
-       ctdb_track_child(ctdb, lock_ctx->child);
        close(lock_ctx->fd[1]);
 
-       talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
-
        talloc_free(tmp_ctx);
 
        /* Set up timeout handler */
@@ -829,9 +778,8 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                                            ctdb_lock_timeout_handler,
                                            (void *)lock_ctx);
        if (lock_ctx->ttimer == NULL) {
-               ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
+               ctdb_kill(ctdb, lock_ctx->child, SIGTERM);
                lock_ctx->child = -1;
-               talloc_set_destructor(lock_ctx, NULL);
                close(lock_ctx->fd[0]);
                return;
        }
@@ -840,35 +788,39 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
        lock_ctx->tfd = tevent_add_fd(ctdb->ev,
                                      lock_ctx,
                                      lock_ctx->fd[0],
-                                     EVENT_FD_READ,
+                                     TEVENT_FD_READ,
                                      ctdb_lock_handler,
                                      (void *)lock_ctx);
        if (lock_ctx->tfd == NULL) {
                TALLOC_FREE(lock_ctx->ttimer);
-               ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
+               ctdb_kill(ctdb, lock_ctx->child, SIGTERM);
                lock_ctx->child = -1;
-               talloc_set_destructor(lock_ctx, NULL);
                close(lock_ctx->fd[0]);
                return;
        }
        tevent_fd_set_auto_close(lock_ctx->tfd);
 
        /* Move the context from pending to current */
-       DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
-       ctdb->lock_num_pending--;
-       DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
-       if (lock_ctx->ctdb_db) {
-               lock_ctx->ctdb_db->lock_num_current++;
-               CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
-               CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
+       if (lock_ctx->type == LOCK_RECORD) {
+               DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
+               DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx);
+       } else {
+               DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
+               DLIST_ADD_END(ctdb->lock_current, lock_ctx);
        }
+       CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
+       CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
+       lock_ctx->ctdb_db->lock_num_current++;
+       CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
+       CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
 }
 
 
 /*
  * Lock record / db depending on type
  */
-static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
+static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
+                                              struct ctdb_context *ctdb,
                                               struct ctdb_db_context *ctdb_db,
                                               TDB_DATA key,
                                               uint32_t priority,
@@ -891,6 +843,11 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
                return NULL;
        }
 
+       if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
+               talloc_free(lock_ctx);
+               return NULL;
+       }
+
        lock_ctx->type = type;
        lock_ctx->ctdb = ctdb;
        lock_ctx->ctdb_db = ctdb_db;
@@ -900,6 +857,7 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
                if (lock_ctx->key.dptr == NULL) {
                        DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
                        talloc_free(lock_ctx);
+                       talloc_free(request);
                        return NULL;
                }
                lock_ctx->key_hash = ctdb_hash(&key);
@@ -909,10 +867,17 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
        lock_ctx->priority = priority;
        lock_ctx->auto_mark = auto_mark;
 
+       lock_ctx->request = request;
        lock_ctx->child = -1;
 
-       DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
-       ctdb->lock_num_pending++;
+       /* Non-record locks are required by recovery and should be scheduled
+        * immediately, so keep them at the head of the pending queue.
+        */
+       if (lock_ctx->type == LOCK_RECORD) {
+               DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx);
+       } else {
+               DLIST_ADD_END(ctdb->lock_pending, lock_ctx);
+       }
        CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
        if (ctdb_db) {
                CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
@@ -921,17 +886,12 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
        /* Start the timer when we activate the context */
        lock_ctx->start_time = timeval_current();
 
-       if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
-               talloc_free(lock_ctx);
-               return NULL;
-       }
-
        request->lctx = lock_ctx;
        request->callback = callback;
        request->private_data = private_data;
 
        talloc_set_destructor(request, ctdb_lock_request_destructor);
-       lock_ctx->request = request;
+       talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
 
        ctdb_lock_schedule(ctdb);
 
@@ -942,13 +902,15 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
 /*
  * obtain a lock on a record in a database
  */
-struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
+struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
+                                     struct ctdb_db_context *ctdb_db,
                                      TDB_DATA key,
                                      bool auto_mark,
                                      void (*callback)(void *, bool),
                                      void *private_data)
 {
-       return ctdb_lock_internal(ctdb_db->ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb_db->ctdb,
                                  ctdb_db,
                                  key,
                                  0,
@@ -962,12 +924,14 @@ struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
 /*
  * obtain a lock on a database
  */
-struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
+struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
+                                 struct ctdb_db_context *ctdb_db,
                                  bool auto_mark,
                                  void (*callback)(void *, bool),
                                  void *private_data)
 {
-       return ctdb_lock_internal(ctdb_db->ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb_db->ctdb,
                                  ctdb_db,
                                  tdb_null,
                                  0,
@@ -976,48 +940,3 @@ struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
                                  LOCK_DB,
                                  auto_mark);
 }
-
-
-/*
- * obtain locks on all databases of specified priority
- */
-struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
-                                         uint32_t priority,
-                                         bool auto_mark,
-                                         void (*callback)(void *, bool),
-                                         void *private_data)
-{
-       if (priority < 1 || priority > NUM_DB_PRIORITIES) {
-               DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
-               return NULL;
-       }
-
-       return ctdb_lock_internal(ctdb,
-                                 NULL,
-                                 tdb_null,
-                                 priority,
-                                 callback,
-                                 private_data,
-                                 LOCK_ALLDB_PRIO,
-                                 auto_mark);
-}
-
-
-/*
- * obtain locks on all databases
- */
-struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
-                                    bool auto_mark,
-                                    void (*callback)(void *, bool),
-                                    void *private_data)
-{
-       return ctdb_lock_internal(ctdb,
-                                 NULL,
-                                 tdb_null,
-                                 0,
-                                 callback,
-                                 private_data,
-                                 LOCK_ALLDB,
-                                 auto_mark);
-}
-