ctdb-recoverd: Store recovery lock handle
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
index c89649a3c559e9476424ec3c6c6ffce396e57550..46357b6ed690905ae2fe05f39b566397f473495f 100644 (file)
 #include "lib/util/dlinklist.h"
 #include "lib/util/debug.h"
 #include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
 #include "lib/util/util_process.h"
 
 #include "ctdb_private.h"
 #include "ctdb_client.h"
 
-#include "common/system.h"
-#include "common/cmdline.h"
+#include "common/system_socket.h"
 #include "common/common.h"
 #include "common/logging.h"
 
+#include "server/ctdb_config.h"
+
+#include "ctdb_cluster_mutex.h"
 
 /* List of SRVID requests that need to be processed */
 struct srvid_list {
@@ -83,6 +86,10 @@ static void srvid_requests_reply(struct ctdb_context *ctdb,
 {
        struct srvid_list *r;
 
+       if (*requests == NULL) {
+               return;
+       }
+
        for (r = (*requests)->requests; r != NULL; r = r->next) {
                srvid_request_reply(ctdb, r->request, result);
        }
@@ -232,6 +239,8 @@ struct ctdb_banning_state {
        struct timeval last_reported_time;
 };
 
+struct ctdb_recovery_lock_handle;
+
 /*
   private state of recovery daemon
  */
@@ -252,6 +261,8 @@ struct ctdb_recoverd {
        struct ctdb_iface_list_old *ifaces;
        uint32_t *force_rebalance_nodes;
        struct ctdb_node_capabilities *caps;
+       bool frozen_on_inactive;
+       struct ctdb_recovery_lock_handle *recovery_lock_handle;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
@@ -337,87 +348,6 @@ static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
        ctdb_set_culprit_count(rec, culprit, 1);
 }
 
-
-/* this callback is called for every node that failed to execute the
-   recovered event
-*/
-static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
-
-       ctdb_set_culprit(rec, node_pnn);
-}
-
-/*
-  run the "recovered" eventscript on all nodes
- */
-static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
-{
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL, recovered_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
-
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-/* this callback is called for every node that failed to execute the
-   start recovery event
-*/
-static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
-
-       ctdb_set_culprit(rec, node_pnn);
-}
-
-/*
-  run the "startrecovery" eventscript on all nodes
- */
-static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
-{
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL,
-                                       startrecovery_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
 /*
   Retrieve capabilities from all connected nodes
  */
@@ -459,29 +389,13 @@ static int update_capabilities(struct ctdb_recoverd *rec,
        return 0;
 }
 
-static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
-       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
-}
-
-static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
-       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
-}
-
 /*
   change recovery mode on all nodes
  */
 static int set_recovery_mode(struct ctdb_context *ctdb,
                             struct ctdb_recoverd *rec,
                             struct ctdb_node_map_old *nodemap,
-                            uint32_t rec_mode, bool freeze)
+                            uint32_t rec_mode)
 {
        TDB_DATA data;
        uint32_t *nodes;
@@ -506,65 +420,10 @@ static int set_recovery_mode(struct ctdb_context *ctdb,
                return -1;
        }
 
-       /* freeze all nodes */
-       if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
-               int i;
-
-               for (i=1; i<=NUM_DB_PRIORITIES; i++) {
-                       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
-                                               nodes, i,
-                                               CONTROL_TIMEOUT(),
-                                               false, tdb_null,
-                                               NULL,
-                                               set_recmode_fail_callback,
-                                               rec) != 0) {
-                               DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
-                               talloc_free(tmp_ctx);
-                               return -1;
-                       }
-               }
-       }
-
        talloc_free(tmp_ctx);
        return 0;
 }
 
-/* update all remote nodes to use the same db priority that we have
-   this can fail if the remove node has not yet been upgraded to 
-   support this function, so we always return success and never fail
-   a recovery if this call fails.
-*/
-static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
-       struct ctdb_node_map_old *nodemap, 
-       uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
-{
-       int db;
-
-       /* step through all local databases */
-       for (db=0; db<dbmap->num;db++) {
-               struct ctdb_db_priority db_prio;
-               int ret;
-
-               db_prio.db_id     = dbmap->dbs[db].db_id;
-               ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
-                       continue;
-               }
-
-               DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
-
-               ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
-                                               CTDB_CURRENT_NODE, &db_prio);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
-                                        db_prio.db_id));
-               }
-       }
-
-       return 0;
-}                      
-
 /*
   ensure all other nodes have attached to any databases that we have
  */
@@ -617,7 +476,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                        ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
                                                 nodemap->nodes[j].pnn,
                                                 mem_ctx, name,
-                                                dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
+                                                dbmap->dbs[db].flags, NULL);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
                                return -1;
@@ -679,8 +538,9 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                                          nodemap->nodes[j].pnn));
                                return -1;
                        }
-                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
-                                          remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
+                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
+                                          mem_ctx, name,
+                                          remote_dbmap->dbs[db].flags, NULL);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
                                return -1;
@@ -696,244 +556,6 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
        return 0;
 }
 
-
-/*
-  pull the remote database contents from one node into the recdb
- */
-static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
-                                   struct tdb_wrap *recdb, uint32_t dbid)
-{
-       int ret;
-       TDB_DATA outdata;
-       struct ctdb_marshall_buffer *reply;
-       struct ctdb_rec_data_old *recdata;
-       int i;
-       TALLOC_CTX *tmp_ctx = talloc_new(recdb);
-
-       ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
-                              CONTROL_TIMEOUT(), &outdata);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       reply = (struct ctdb_marshall_buffer *)outdata.dptr;
-
-       if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
-               DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       recdata = (struct ctdb_rec_data_old *)&reply->data[0];
-
-       for (i=0;
-            i<reply->count;
-            recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
-               TDB_DATA key, data;
-               struct ctdb_ltdb_header *hdr;
-               TDB_DATA existing;
-
-               key.dptr = &recdata->data[0];
-               key.dsize = recdata->keylen;
-               data.dptr = &recdata->data[key.dsize];
-               data.dsize = recdata->datalen;
-
-               hdr = (struct ctdb_ltdb_header *)data.dptr;
-
-               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
-                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-
-               /* fetch the existing record, if any */
-               existing = tdb_fetch(recdb->tdb, key);
-
-               if (existing.dptr != NULL) {
-                       struct ctdb_ltdb_header header;
-                       if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
-                               DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
-                                        (unsigned)existing.dsize, srcnode));
-                               free(existing.dptr);
-                               talloc_free(tmp_ctx);
-                               return -1;
-                       }
-                       header = *(struct ctdb_ltdb_header *)existing.dptr;
-                       free(existing.dptr);
-                       if (!(header.rsn < hdr->rsn ||
-                             (header.dmaster != ctdb_get_pnn(ctdb) &&
-                              header.rsn == hdr->rsn))) {
-                               continue;
-                       }
-               }
-
-               if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
-                       DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-       }
-
-       talloc_free(tmp_ctx);
-
-       return 0;
-}
-
-
-struct pull_seqnum_cbdata {
-       int failed;
-       uint32_t pnn;
-       uint64_t seqnum;
-};
-
-static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
-       uint64_t seqnum;
-
-       if (cb_data->failed != 0) {
-               DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
-               return;
-       }
-
-       if (res != 0) {
-               DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
-               cb_data->failed = 1;
-               return;
-       }
-
-       if (outdata.dsize != sizeof(uint64_t)) {
-               DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
-               cb_data->failed = -1;
-               return;
-       }
-
-       seqnum = *((uint64_t *)outdata.dptr);
-
-       if (seqnum > cb_data->seqnum ||
-           (cb_data->pnn == -1 && seqnum == 0)) {
-               cb_data->seqnum = seqnum;
-               cb_data->pnn = node_pnn;
-       }
-}
-
-static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
-
-       DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
-       cb_data->failed = 1;
-}
-
-static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
-                               struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map_old *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid)
-{
-       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
-       uint32_t *nodes;
-       TDB_DATA data;
-       uint32_t outdata[2];
-       struct pull_seqnum_cbdata *cb_data;
-
-       DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
-
-       outdata[0] = dbid;
-       outdata[1] = 0;
-
-       data.dsize = sizeof(outdata);
-       data.dptr  = (uint8_t *)&outdata[0];
-
-       cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
-       if (cb_data == NULL) {
-               DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       cb_data->failed = 0;
-       cb_data->pnn    = -1;
-       cb_data->seqnum = 0;
-       
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       pull_seqnum_cb,
-                                       pull_seqnum_fail_cb,
-                                       cb_data) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
-
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (cb_data->failed != 0) {
-               DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (cb_data->pnn == -1) {
-               DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
-
-       if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
-               DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-
-/*
-  pull all the remote database contents into the recdb
- */
-static int pull_remote_database(struct ctdb_context *ctdb,
-                               struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map_old *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid,
-                               bool persistent)
-{
-       int j;
-
-       if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
-               int ret;
-               ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
-               if (ret == 0) {
-                       return 0;
-               }
-       }
-
-       /* pull all records from all other nodes across onto this node
-          (this merges based on rsn)
-       */
-       for (j=0; j<nodemap->num; j++) {
-               /* don't merge from nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
-                                nodemap->nodes[j].pnn));
-                       ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
-                       return -1;
-               }
-       }
-       
-       return 0;
-}
-
-
 /*
   update flags on all active nodes
  */
@@ -950,32 +572,6 @@ static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node
        return 0;
 }
 
-/*
-  ensure all nodes have the same vnnmap we do
- */
-static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
-                                     uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
-{
-       int j, ret;
-
-       /* push the new vnn map out to all the nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* don't push to nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-
 /*
   called when a vacuum fetch has completed - just free it and do the next one
  */
@@ -1062,7 +658,7 @@ static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        const char *name;
        struct ctdb_dbid_map_old *dbmap=NULL;
-       bool persistent = false;
+       uint8_t db_flags = 0;
        struct ctdb_db_context *ctdb_db;
        struct ctdb_rec_data_old *r;
 
@@ -1081,7 +677,7 @@ static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
 
        for (i=0;i<dbmap->num;i++) {
                if (dbmap->dbs[i].db_id == recs->db_id) {
-                       persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+                       db_flags = dbmap->dbs[i].flags;
                        break;
                }
        }
@@ -1097,7 +693,7 @@ static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
        }
 
        /* attach to it */
-       ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
                goto done;
@@ -1234,7 +830,7 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_ol
                                  nodemap->nodes[j].pnn));
                        ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                        talloc_free(mem_ctx);
-                       return MONITOR_FAILED;
+                       return -1;
                }
                if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
                        /* We should tell our daemon about this so it
@@ -1260,7 +856,7 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_ol
                talloc_free(remote_nodemap);
        }
        talloc_free(mem_ctx);
-       return MONITOR_OK;
+       return 0;
 }
 
 
@@ -1282,312 +878,110 @@ static uint32_t new_generation(void)
        return generation;
 }
 
+static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
+{
+       return (rec->recovery_lock_handle != NULL);
+}
 
-/*
-  create a temporary working database
- */
-static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
+struct ctdb_recovery_lock_handle {
+       bool done;
+       bool locked;
+       double latency;
+       struct ctdb_cluster_mutex_handle *h;
+};
+
+static void take_reclock_handler(char status,
+                                double latency,
+                                void *private_data)
 {
-       char *name;
-       struct tdb_wrap *recdb;
-       unsigned tdb_flags;
+       struct ctdb_recovery_lock_handle *s =
+               (struct ctdb_recovery_lock_handle *) private_data;
 
-       /* open up the temporary recovery database */
-       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
-                              ctdb->db_directory_state,
-                              ctdb->pnn);
-       if (name == NULL) {
-               return NULL;
-       }
-       unlink(name);
+       switch (status) {
+       case '0':
+               s->latency = latency;
+               break;
 
-       tdb_flags = TDB_NOLOCK;
-       if (ctdb->valgrinding) {
-               tdb_flags |= TDB_NOMMAP;
-       }
-       tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
+       case '1':
+               DEBUG(DEBUG_ERR,
+                     ("Unable to take recovery lock - contention\n"));
+               break;
 
-       recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
-                             tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
-       if (recdb == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
+       default:
+               DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
        }
 
-       talloc_free(name);
-
-       return recdb;
+       s->done = true;
+       s->locked = (status == '0') ;
 }
 
+static bool ctdb_recovery_lock(struct ctdb_recoverd *rec);
 
-/* 
-   a traverse function for pulling all relevant records from recdb
- */
-struct recdb_data {
-       struct ctdb_context *ctdb;
-       struct ctdb_marshall_buffer *recdata;
-       uint32_t len;
-       uint32_t allocated_len;
-       bool failed;
-       bool persistent;
-};
-
-static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+static void lost_reclock_handler(void *private_data)
 {
-       struct recdb_data *params = (struct recdb_data *)p;
-       struct ctdb_rec_data_old *recdata;
-       struct ctdb_ltdb_header *hdr;
-
-       /*
-        * skip empty records - but NOT for persistent databases:
-        *
-        * The record-by-record mode of recovery deletes empty records.
-        * For persistent databases, this can lead to data corruption
-        * by deleting records that should be there:
-        *
-        * - Assume the cluster has been running for a while.
-        *
-        * - A record R in a persistent database has been created and
-        *   deleted a couple of times, the last operation being deletion,
-        *   leaving an empty record with a high RSN, say 10.
-        *
-        * - Now a node N is turned off.
-        *
-        * - This leaves the local database copy of D on N with the empty
-        *   copy of R and RSN 10. On all other nodes, the recovery has deleted
-        *   the copy of record R.
-        *
-        * - Now the record is created again while node N is turned off.
-        *   This creates R with RSN = 1 on all nodes except for N.
-        *
-        * - Now node N is turned on again. The following recovery will chose
-        *   the older empty copy of R due to RSN 10 > RSN 1.
-        *
-        * ==> Hence the record is gone after the recovery.
-        *
-        * On databases like Samba's registry, this can damage the higher-level
-        * data structures built from the various tdb-level records.
-        */
-       if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
-               return 0;
-       }
-
-       /* update the dmaster field to point to us */
-       hdr = (struct ctdb_ltdb_header *)data.dptr;
-       if (!params->persistent) {
-               hdr->dmaster = params->ctdb->pnn;
-               hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
-       }
-
-       /* add the record to the blob ready to send to the nodes */
-       recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
-       if (recdata == NULL) {
-               params->failed = true;
-               return -1;
-       }
-       if (params->len + recdata->length >= params->allocated_len) {
-               params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
-               params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
-       }
-       if (params->recdata == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
-                        recdata->length + params->len));
-               params->failed = true;
-               return -1;
-       }
-       params->recdata->count++;
-       memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
-       params->len += recdata->length;
-       talloc_free(recdata);
-
-       return 0;
-}
-
-/*
-  push the recdb database out to all nodes
- */
-static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
-                              bool persistent,
-                              struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
-{
-       struct recdb_data params;
-       struct ctdb_marshall_buffer *recdata;
-       TDB_DATA outdata;
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
-       CTDB_NO_MEMORY(ctdb, recdata);
-
-       recdata->db_id = dbid;
-
-       params.ctdb = ctdb;
-       params.recdata = recdata;
-       params.len = offsetof(struct ctdb_marshall_buffer, data);
-       params.allocated_len = params.len;
-       params.failed = false;
-       params.persistent = persistent;
-
-       if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
-               talloc_free(params.recdata);
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (params.failed) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
-               talloc_free(params.recdata);
-               talloc_free(tmp_ctx);
-               return -1;              
-       }
-
-       recdata = params.recdata;
-
-       outdata.dptr = (void *)recdata;
-       outdata.dsize = params.len;
+       struct ctdb_recoverd *rec = talloc_get_type_abort(
+               private_data, struct ctdb_recoverd);
 
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, outdata,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
-               talloc_free(recdata);
-               talloc_free(tmp_ctx);
-               return -1;
+       DEBUG(DEBUG_ERR,
+             ("Recovery lock helper terminated unexpectedly - "
+              "trying to retake recovery lock\n"));
+       TALLOC_FREE(rec->recovery_lock_handle);
+       if (! ctdb_recovery_lock(rec)) {
+               DEBUG(DEBUG_ERR, ("Failed to take recovery lock\n"));
        }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
-                 dbid, recdata->count));
-
-       talloc_free(recdata);
-       talloc_free(tmp_ctx);
-
-       return 0;
 }
 
-
-/*
-  go through a full recovery on one database 
- */
-static int recover_database(struct ctdb_recoverd *rec, 
-                           TALLOC_CTX *mem_ctx,
-                           uint32_t dbid,
-                           bool persistent,
-                           uint32_t pnn, 
-                           struct ctdb_node_map_old *nodemap,
-                           uint32_t transaction_id)
+static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
 {
-       struct tdb_wrap *recdb;
-       int ret;
        struct ctdb_context *ctdb = rec->ctdb;
-       TDB_DATA data;
-       struct ctdb_transdb w;
-       uint32_t *nodes;
-
-       recdb = create_recdb(ctdb, mem_ctx);
-       if (recdb == NULL) {
-               return -1;
-       }
+       struct ctdb_cluster_mutex_handle *h;
+       struct ctdb_recovery_lock_handle *s;
 
-       /* pull all remote databases onto the recdb */
-       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
-               return -1;
+       s = talloc_zero(rec, struct ctdb_recovery_lock_handle);
+       if (s == NULL) {
+               DBG_ERR("Memory allocation error\n");
+               return false;
+       };
+
+       h = ctdb_cluster_mutex(s,
+                              ctdb,
+                              ctdb->recovery_lock,
+                              0,
+                              take_reclock_handler,
+                              s,
+                              lost_reclock_handler,
+                              rec);
+       if (h == NULL) {
+               talloc_free(s);
+               return false;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
-
-       /* wipe all the remote databases. This is safe as we are in a transaction */
-       w.db_id = dbid;
-       w.tid = transaction_id;
-
-       data.dptr = (void *)&w;
-       data.dsize = sizeof(w);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
-               talloc_free(recdb);
-               return -1;
-       }
-       
-       /* push out the correct database. This sets the dmaster and skips 
-          the empty records */
-       ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
-       if (ret != 0) {
-               talloc_free(recdb);
-               return -1;
+       while (! s->done) {
+               tevent_loop_once(ctdb->ev);
        }
 
-       /* all done with this database */
-       talloc_free(recdb);
-
-       return 0;
-}
-
-/* when we start a recovery, make sure all nodes use the same reclock file
-   setting
-*/
-static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
-{
-       struct ctdb_context *ctdb = rec->ctdb;
-       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
-       TDB_DATA data;
-       uint32_t *nodes;
-
-       if (ctdb->recovery_lock_file == NULL) {
-               data.dptr  = NULL;
-               data.dsize = 0;
-       } else {
-               data.dsize = strlen(ctdb->recovery_lock_file) + 1;
-               data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
+       if (! s->locked) {
+               talloc_free(s);
+               return false;
        }
 
-       nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(),
-                                       false, data,
-                                       NULL, NULL,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
+       rec->recovery_lock_handle = s;
+       s->h = h;
+       ctdb_ctrl_report_recd_lock_latency(ctdb,
+                                          CONTROL_TIMEOUT(),
+                                          s->latency);
 
-       talloc_free(tmp_ctx);
-       return 0;
+       return true;
 }
 
-
-/*
- * this callback is called for every node that failed to execute ctdb_takeover_run()
- * and set flag to re-run takeover run.
- */
-static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
 {
-       DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
-
-       if (callback_data != NULL) {
-               struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-               DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
-
-               ctdb_set_culprit(rec, node_pnn);
+       if (rec->recovery_lock_handle != NULL) {
+               DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
+               TALLOC_FREE(rec->recovery_lock_handle);
        }
 }
 
-
 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
 {
        struct ctdb_context *ctdb = rec->ctdb;
@@ -1617,111 +1011,19 @@ static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
        }
 }
 
-static bool do_takeover_run(struct ctdb_recoverd *rec,
-                           struct ctdb_node_map_old *nodemap,
-                           bool banning_credits_on_fail)
-{
-       uint32_t *nodes = NULL;
-       struct ctdb_disable_message dtr;
-       TDB_DATA data;
-       int i;
-       uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
-       int ret;
-       bool ok;
-
-       DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
-
-       if (ctdb_op_is_in_progress(rec->takeover_run)) {
-               DEBUG(DEBUG_ERR, (__location__
-                                 " takeover run already in progress \n"));
-               ok = false;
-               goto done;
-       }
-
-       if (!ctdb_op_begin(rec->takeover_run)) {
-               ok = false;
-               goto done;
-       }
-
-       /* Disable IP checks (takeover runs, really) on other nodes
-        * while doing this takeover run.  This will stop those other
-        * nodes from triggering takeover runs when think they should
-        * be hosting an IP but it isn't yet on an interface.  Don't
-        * wait for replies since a failure here might cause some
-        * noise in the logs but will not actually cause a problem.
-        */
-       ZERO_STRUCT(dtr);
-       dtr.srvid = 0; /* No reply */
-       dtr.pnn = -1;
-
-       data.dptr  = (uint8_t*)&dtr;
-       data.dsize = sizeof(dtr);
-
-       nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
-
-       /* Disable for 60 seconds.  This can be a tunable later if
-        * necessary.
-        */
-       dtr.timeout = 60;
-       for (i = 0; i < talloc_array_length(nodes); i++) {
-               if (ctdb_client_send_message(rec->ctdb, nodes[i],
-                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
-                                            data) != 0) {
-                       DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
-               }
-       }
-
-       ret = ctdb_takeover_run(rec->ctdb, nodemap,
-                               rec->force_rebalance_nodes,
-                               takeover_fail_callback,
-                               banning_credits_on_fail ? rec : NULL);
-
-       /* Reenable takeover runs and IP checks on other nodes */
-       dtr.timeout = 0;
-       for (i = 0; i < talloc_array_length(nodes); i++) {
-               if (ctdb_client_send_message(rec->ctdb, nodes[i],
-                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
-                                            data) != 0) {
-                       DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
-               }
-       }
-
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
-               ok = false;
-               goto done;
-       }
-
-       ok = true;
-       /* Takeover run was successful so clear force rebalance targets */
-       if (rebalance_nodes == rec->force_rebalance_nodes) {
-               TALLOC_FREE(rec->force_rebalance_nodes);
-       } else {
-               DEBUG(DEBUG_WARNING,
-                     ("Rebalance target nodes changed during takeover run - not clearing\n"));
-       }
-done:
-       rec->need_takeover_run = !ok;
-       talloc_free(nodes);
-       ctdb_op_end(rec->takeover_run);
-
-       DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
-       return ok;
-}
-
-struct recovery_helper_state {
+struct helper_state {
        int fd[2];
        pid_t pid;
        int result;
        bool done;
 };
 
-static void ctdb_recovery_handler(struct tevent_context *ev,
-                                 struct tevent_fd *fde,
-                                 uint16_t flags, void *private_data)
+static void helper_handler(struct tevent_context *ev,
+                          struct tevent_fd *fde,
+                          uint16_t flags, void *private_data)
 {
-       struct recovery_helper_state *state = talloc_get_type_abort(
-               private_data, struct recovery_helper_state);
+       struct helper_state *state = talloc_get_type_abort(
+               private_data, struct helper_state);
        int ret;
 
        ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
@@ -1732,22 +1034,16 @@ static void ctdb_recovery_handler(struct tevent_context *ev,
        state->done = true;
 }
 
-
-static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
+static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
+                     const char *prog, const char *arg, const char *type)
 {
-       static char prog[PATH_MAX+1] = "";
-       const char **args;
-       struct recovery_helper_state *state;
+       struct helper_state *state;
        struct tevent_fd *fde;
+       const char **args;
        int nargs, ret;
+       uint32_t recmaster = rec->recmaster;
 
-       if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
-                            "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
-                            "ctdb_recovery_helper")) {
-               ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
-       }
-
-       state = talloc_zero(mem_ctx, struct recovery_helper_state);
+       state = talloc_zero(mem_ctx, struct helper_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
                return -1;
@@ -1758,7 +1054,7 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
        ret = pipe(state->fd);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,
-                     ("Failed to create pipe for recovery helper\n"));
+                     ("Failed to create pipe for %s helper\n", type));
                goto fail;
        }
 
@@ -1772,19 +1068,22 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
        }
 
        args[0] = talloc_asprintf(args, "%d", state->fd[1]);
+       if (args[0] == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
+               goto fail;
+       }
        args[1] = rec->ctdb->daemon.name;
-       args[2] = talloc_asprintf(args, "%u", new_generation());
+       args[2] = arg;
        args[3] = NULL;
 
-       if (args[0] == NULL || args[2] == NULL) {
-               DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
-               goto fail;
+       if (args[2] == NULL) {
+               nargs = 3;
        }
 
-       if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
-                                    args, NULL, NULL, &state->pid)) {
+       state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
+       if (state->pid == -1) {
                DEBUG(DEBUG_ERR,
-                     ("Failed to create child for recovery helper\n"));
+                     ("Failed to create child for %s helper\n", type));
                goto fail;
        }
 
@@ -1794,7 +1093,7 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
        state->done = false;
 
        fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
-                           TEVENT_FD_READ, ctdb_recovery_handler, state);
+                           TEVENT_FD_READ, helper_handler, state);
        if (fde == NULL) {
                goto fail;
        }
@@ -1802,6 +1101,14 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
 
        while (!state->done) {
                tevent_loop_once(rec->ctdb->ev);
+
+               /* If recmaster changes, we have lost election */
+               if (recmaster != rec->recmaster) {
+                       D_ERR("Recmaster changed to %u, aborting %s\n",
+                             rec->recmaster, type);
+                       state->result = 1;
+                       break;
+               }
        }
 
        close(state->fd[0]);
@@ -1829,159 +1136,153 @@ fail:
        return -1;
 }
 
-static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
-                             uint32_t pnn, struct ctdb_node_map_old *nodemap,
-                             struct ctdb_vnn_map *vnnmap,
-                             struct ctdb_dbid_map_old *dbmap)
+
+static int ctdb_takeover(struct ctdb_recoverd *rec,
+                        uint32_t *force_rebalance_nodes)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
-       uint32_t generation;
-       TDB_DATA data;
-       uint32_t *nodes;
-       int ret, i, j;
+       static char prog[PATH_MAX+1] = "";
+       char *arg;
+       int i, ret;
 
-       /* set recovery mode to active on all nodes */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
-               return -1;
+       if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
+                            "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
+                            "ctdb_takeover_helper")) {
+               ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
        }
 
-       /* execute the "startrecovery" event script on all nodes */
-       ret = run_startrecovery_eventscript(rec, nodemap);
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
-               return -1;
+       arg = NULL;
+       for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
+               uint32_t pnn = force_rebalance_nodes[i];
+               if (arg == NULL) {
+                       arg = talloc_asprintf(rec, "%u", pnn);
+               } else {
+                       arg = talloc_asprintf_append(arg, ",%u", pnn);
+               }
+               if (arg == NULL) {
+                       DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
+                       return -1;
+               }
+       }
+
+       if (ctdb_config.failover_disabled) {
+               ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
+               if (ret != 0) {
+                       D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
+                       return -1;
+               }
        }
 
-       /* pick a new generation number */
-       generation = new_generation();
+       return helper_run(rec, rec, prog, arg, "takeover");
+}
 
-       /* change the vnnmap on this node to use the new generation 
-          number but not on any other nodes.
-          this guarantees that if we abort the recovery prematurely
-          for some reason (a node stops responding?)
-          that we can just return immediately and we will reenter
-          recovery shortly again.
-          I.e. we deliberately leave the cluster with an inconsistent
-          generation id to allow us to abort recovery at any stage and
-          just restart it from scratch.
-        */
-       vnnmap->generation = generation;
-       ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-               return -1;
+static bool do_takeover_run(struct ctdb_recoverd *rec,
+                           struct ctdb_node_map_old *nodemap)
+{
+       uint32_t *nodes = NULL;
+       struct ctdb_disable_message dtr;
+       TDB_DATA data;
+       int i;
+       uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
+       int ret;
+       bool ok;
+
+       DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
+
+       if (ctdb_op_is_in_progress(rec->takeover_run)) {
+               DEBUG(DEBUG_ERR, (__location__
+                                 " takeover run already in progress \n"));
+               ok = false;
+               goto done;
+       }
+
+       if (!ctdb_op_begin(rec->takeover_run)) {
+               ok = false;
+               goto done;
        }
 
-       /* Database generations are updated when the transaction is commited to
-        * the databases.  So make sure to use the final generation as the
-        * transaction id
+       /* Disable IP checks (takeover runs, really) on other nodes
+        * while doing this takeover run.  This will stop those other
+        * nodes from triggering takeover runs when think they should
+        * be hosting an IP but it isn't yet on an interface.  Don't
+        * wait for replies since a failure here might cause some
+        * noise in the logs but will not actually cause a problem.
         */
-       generation = new_generation();
+       ZERO_STRUCT(dtr);
+       dtr.srvid = 0; /* No reply */
+       dtr.pnn = -1;
 
-       data.dptr = (void *)&generation;
-       data.dsize = sizeof(uint32_t);
+       data.dptr  = (uint8_t*)&dtr;
+       data.dsize = sizeof(dtr);
 
-       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL,
-                                       transaction_start_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
-               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL,
-                                       NULL,
-                                       NULL) != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
+       nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
+
+       /* Disable for 60 seconds.  This can be a tunable later if
+        * necessary.
+        */
+       dtr.timeout = 60;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
                }
-               return -1;
        }
 
-       DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
+       ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
 
-       for (i=0;i<dbmap->num;i++) {
-               ret = recover_database(rec, mem_ctx,
-                                      dbmap->dbs[i].db_id,
-                                      dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
-                                      pnn, nodemap, generation);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
-                       return -1;
+       /* Reenable takeover runs and IP checks on other nodes */
+       dtr.timeout = 0;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
                }
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
-
-       /* commit all the changes */
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
-               return -1;
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
+               ok = false;
+               goto done;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
-
-       /* build a new vnn map with all the currently active and
-          unbanned nodes */
-       vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
-       CTDB_NO_MEMORY(ctdb, vnnmap);
-       vnnmap->generation = generation;
-       vnnmap->size = 0;
-       vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
-       CTDB_NO_MEMORY(ctdb, vnnmap->map);
-       for (i=j=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (!ctdb_node_has_capabilities(rec->caps,
-                                               ctdb->nodes[i]->pnn,
-                                               CTDB_CAP_LMASTER)) {
-                       /* this node can not be an lmaster */
-                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
-                       continue;
-               }
+       ok = true;
+       /* Takeover run was successful so clear force rebalance targets */
+       if (rebalance_nodes == rec->force_rebalance_nodes) {
+               TALLOC_FREE(rec->force_rebalance_nodes);
+       } else {
+               DEBUG(DEBUG_WARNING,
+                     ("Rebalance target nodes changed during takeover run - not clearing\n"));
+       }
+done:
+       rec->need_takeover_run = !ok;
+       talloc_free(nodes);
+       ctdb_op_end(rec->takeover_run);
 
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[j++] = nodemap->nodes[i].pnn;
+       DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
+       return ok;
+}
 
-       }
-       if (vnnmap->size == 0) {
-               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[0] = pnn;
-       }
+static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
+{
+       static char prog[PATH_MAX+1] = "";
+       const char *arg;
 
-       /* update to the new vnnmap on all nodes */
-       ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
-               return -1;
+       if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
+                            "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
+                            "ctdb_recovery_helper")) {
+               ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
-
-       /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
+       arg = talloc_asprintf(mem_ctx, "%u", new_generation());
+       if (arg == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
                return -1;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
+       setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
 
-       return 0;
+       return helper_run(rec, mem_ctx, prog, arg, "recovery");
 }
 
 /*
@@ -1994,9 +1295,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
        struct ctdb_context *ctdb = rec->ctdb;
        int i, ret;
        struct ctdb_dbid_map_old *dbmap;
-       struct timeval start_time;
        bool self_ban;
-       bool par_recovery;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
@@ -2029,35 +1328,47 @@ static int do_recovery(struct ctdb_recoverd *rec,
                goto fail;
        }
 
-        if (ctdb->recovery_lock_file != NULL) {
-               if (ctdb_recovery_have_lock(ctdb)) {
-                       DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
+       if (ctdb->recovery_lock != NULL) {
+               if (ctdb_recovery_have_lock(rec)) {
+                       D_NOTICE("Already holding recovery lock\n");
                } else {
-                       start_time = timeval_current();
-                       DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
-                                            ctdb->recovery_lock_file));
-                       if (!ctdb_recovery_lock(ctdb)) {
-                               if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
-                                       /* If ctdb is trying first recovery, it's
-                                        * possible that current node does not know
-                                        * yet who the recmaster is.
+                       bool ok;
+
+                       D_NOTICE("Attempting to take recovery lock (%s)\n",
+                                ctdb->recovery_lock);
+
+                       ok = ctdb_recovery_lock(rec);
+                       if (! ok) {
+                               D_ERR("Unable to take recovery lock\n");
+
+                               if (pnn != rec->recmaster) {
+                                       D_NOTICE("Recovery master changed to %u,"
+                                                " aborting recovery\n",
+                                                rec->recmaster);
+                                       rec->need_recovery = false;
+                                       goto fail;
+                               }
+
+                               if (ctdb->runstate ==
+                                   CTDB_RUNSTATE_FIRST_RECOVERY) {
+                                       /*
+                                        * First recovery?  Perhaps
+                                        * current node does not yet
+                                        * know who the recmaster is.
                                         */
-                                       DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
-                                                         " - retrying recovery\n"));
+                                       D_ERR("Retrying recovery\n");
                                        goto fail;
                                }
 
-                               DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
-                                                "and ban ourself for %u seconds\n",
-                                                ctdb->tunable.recovery_ban_period));
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
+                               D_ERR("Abort recovery, "
+                                     "ban this node for %u seconds\n",
+                                     ctdb->tunable.recovery_ban_period);
+                               ctdb_ban_node(rec,
+                                             pnn,
+                                             ctdb->tunable.recovery_ban_period);
                                goto fail;
                        }
-                       ctdb_ctrl_report_recd_lock_latency(ctdb,
-                                                          CONTROL_TIMEOUT(),
-                                                          timeval_elapsed(&start_time));
-                       DEBUG(DEBUG_NOTICE,
-                             ("Recovery lock taken successfully by recovery daemon\n"));
+                       D_NOTICE("Recovery lock taken successfully\n");
                }
        }
 
@@ -2088,18 +1399,6 @@ static int do_recovery(struct ctdb_recoverd *rec,
        }
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
 
-       /* update the database priority for all remote databases */
-       ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
-       }
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
-
-
-       /* update all other nodes to use the same setting for reclock files
-          as the local recovery master.
-       */
-       sync_recovery_lock_file_across_cluster(rec);
 
        /* Retrieve capabilities from all connected nodes */
        ret = update_capabilities(rec, nodemap);
@@ -2129,41 +1428,12 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
-       /* Check if all participating nodes have parallel recovery capability */
-       par_recovery = true;
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               if (!(rec->caps[i].capabilities &
-                     CTDB_CAP_PARALLEL_RECOVERY)) {
-                       par_recovery = false;
-                       break;
-               }
-       }
-
-       if (par_recovery) {
-               ret = db_recovery_parallel(rec, mem_ctx);
-       } else {
-               ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
-                                        dbmap);
-       }
-
+       ret = db_recovery_parallel(rec, mem_ctx);
        if (ret != 0) {
                goto fail;
        }
 
-       do_takeover_run(rec, nodemap, false);
-
-       /* execute the "recovered" event script on all nodes */
-       ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
-               goto fail;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
+       do_takeover_run(rec, nodemap);
 
        /* send a message to all clients telling them that the cluster 
           has been reconfigured */
@@ -2203,7 +1473,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
           We now wait for rerecovery_timeout before we allow
           another recovery to take place.
        */
-       DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
+       DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
        ctdb_op_disable(rec->recovery, ctdb->ev,
                        ctdb->tunable.rerecovery_timeout);
        return 0;
@@ -2423,24 +1693,6 @@ static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
 }
 
 
-static void ctdb_rebalance_timeout(struct tevent_context *ev,
-                                  struct tevent_timer *te,
-                                  struct timeval t, void *p)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
-
-       if (rec->force_rebalance_nodes == NULL) {
-               DEBUG(DEBUG_ERR,
-                     ("Rebalance timeout occurred - no nodes to rebalance\n"));
-               return;
-       }
-
-       DEBUG(DEBUG_NOTICE,
-             ("Rebalance timeout occurred - trigger takeover run\n"));
-       rec->need_takeover_run = true;
-}
-
-
 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
                                        void *private_data)
 {
@@ -2450,7 +1702,6 @@ static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
        uint32_t pnn;
        uint32_t *t;
        int len;
-       uint32_t deferred_rebalance;
 
        if (rec->recmaster != ctdb_get_pnn(ctdb)) {
                return;
@@ -2489,45 +1740,10 @@ static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
        talloc_free(rec->force_rebalance_nodes);
 
        rec->force_rebalance_nodes = t;
-
-       /* If configured, setup a deferred takeover run to make sure
-        * that certain nodes get IPs rebalanced to them.  This will
-        * be cancelled if a successful takeover run happens before
-        * the timeout.  Assign tunable value to variable for
-        * readability.
-        */
-       deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
-       if (deferred_rebalance != 0) {
-               tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
-                                timeval_current_ofs(deferred_rebalance, 0),
-                                ctdb_rebalance_timeout, rec);
-       }
 }
 
 
 
-static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
-                                  void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(
-               private_data, struct ctdb_recoverd);
-       struct ctdb_public_ip *ip;
-
-       if (rec->recmaster != rec->ctdb->pnn) {
-               DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
-               return;
-       }
-
-       if (data.dsize != sizeof(struct ctdb_public_ip)) {
-               DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
-               return;
-       }
-
-       ip = (struct ctdb_public_ip *)data.dptr;
-
-       update_ip_assignment_tree(rec->ctdb, ip);
-}
-
 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
                                    TDB_DATA data,
                                    struct ctdb_op_state *op_state)
@@ -2636,8 +1852,6 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb,
        int32_t ret;
        struct srvid_requests *current;
 
-       DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
-
        /* Only process requests that are currently pending.  More
         * might come in while the takeover run is in progress and
         * they will need to be processed later since they might
@@ -2646,7 +1860,7 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb,
        current = rec->reallocate_requests;
        rec->reallocate_requests = NULL;
 
-       if (do_takeover_run(rec, rec->nodemap, false)) {
+       if (do_takeover_run(rec, rec->nodemap)) {
                ret = ctdb_get_pnn(ctdb);
        } else {
                ret = -1;
@@ -2658,6 +1872,30 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb,
        srvid_requests_reply(ctdb, &current, result);
 }
 
+/*
+ * handler for assigning banning credits
+ */
+static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       uint32_t ban_pnn;
+
+       /* Ignore if we are not recmaster */
+       if (rec->ctdb->pnn != rec->recmaster) {
+               return;
+       }
+
+       if (data.dsize != sizeof(uint32_t)) {
+               DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
+                                 data.dsize));
+               return;
+       }
+
+       ban_pnn = *(uint32_t *)data.dptr;
+
+       ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
+}
 
 /*
   handler for recovery master elections
@@ -2702,12 +1940,10 @@ static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
        TALLOC_FREE(rec->send_election_te);
 
        /* Release the recovery lock file */
-       if (ctdb_recovery_have_lock(ctdb)) {
-               ctdb_recovery_unlock(ctdb);
+       if (ctdb_recovery_have_lock(rec)) {
+               ctdb_recovery_unlock(rec);
        }
 
-       clear_ip_assignment_tree(ctdb);
-
        /* ok, let that guy become recmaster then */
        ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
                                     CTDB_CURRENT_NODE, em->pnn);
@@ -2733,7 +1969,7 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
        DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
 
        /* set all nodes to recovery mode to stop all internode traffic */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
@@ -2772,7 +2008,6 @@ static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
        struct ctdb_node_map_old *nodemap=NULL;
        TALLOC_CTX *tmp_ctx;
        int i;
-       int disabled_flag_changed;
 
        if (data.dsize != sizeof(*c)) {
                DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
@@ -2804,28 +2039,8 @@ static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
                DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
-       disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
-
        nodemap->nodes[i].flags = c->new_flags;
 
-       ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(),
-                                  CTDB_CURRENT_NODE, &ctdb->recovery_mode);
-
-       if (ret == 0 &&
-           rec->recmaster == ctdb->pnn &&
-           ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
-               /* Only do the takeover run if the perm disabled or unhealthy
-                  flags changed since these will cause an ip failover but not
-                  a recovery.
-                  If the node became disconnected or banned this will also
-                  lead to an ip address failover but that is handled 
-                  during recovery
-               */
-               if (disabled_flag_changed) {
-                       rec->need_takeover_run = true;
-               }
-       }
-
        talloc_free(tmp_ctx);
 }
 
@@ -3122,86 +2337,110 @@ static bool interfaces_have_changed(struct ctdb_context *ctdb,
        return ret;
 }
 
-/* called to check that the local allocation of public ip addresses is ok.
-*/
-static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
+/* Check that the local allocation of public IP addresses is correct
+ * and do some house-keeping */
+static int verify_local_ip_allocation(struct ctdb_context *ctdb,
+                                     struct ctdb_recoverd *rec,
+                                     uint32_t pnn,
+                                     struct ctdb_node_map_old *nodemap)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
        int ret, j;
        bool need_takeover_run = false;
+       struct ctdb_public_ip_list_old *ips = NULL;
+
+       /* If we are not the recmaster then do some housekeeping */
+       if (rec->recmaster != pnn) {
+               /* Ignore any IP reallocate requests - only recmaster
+                * processes them
+                */
+               TALLOC_FREE(rec->reallocate_requests);
+               /* Clear any nodes that should be force rebalanced in
+                * the next takeover run.  If the recovery master role
+                * has moved then we don't want to process these some
+                * time in the future.
+                */
+               TALLOC_FREE(rec->force_rebalance_nodes);
+       }
+
+       /* Return early if disabled... */
+       if (ctdb_config.failover_disabled ||
+           ctdb_op_is_disabled(rec->takeover_run)) {
+               return  0;
+       }
 
        if (interfaces_have_changed(ctdb, rec)) {
-               DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
-                                    "local node %u - force takeover run\n",
-                                    pnn));
                need_takeover_run = true;
        }
 
-       /* verify that we have the ip addresses we should have
-          and we don't have ones we shouldnt have.
-          if we find an inconsistency we set recmode to
-          active on the local node and wait for the recmaster
-          to do a full blown recovery.
-          also if the pnn is -1 and we are healthy and can host the ip
-          we also request a ip reallocation.
-       */
-       if (ctdb->tunable.disable_ip_failover == 0) {
-               struct ctdb_public_ip_list_old *ips = NULL;
+       /* If there are unhosted IPs but this node can host them then
+        * trigger an IP reallocation */
 
-               /* read the *available* IPs from the local node */
-               ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
-                       talloc_free(mem_ctx);
-                       return -1;
-               }
+       /* Read *available* IPs from local node */
+       ret = ctdb_ctrl_get_public_ips_flags(
+               ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
+               CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
+               talloc_free(mem_ctx);
+               return -1;
+       }
 
-               for (j=0; j<ips->num; j++) {
-                       if (ips->ips[j].pnn == -1 &&
-                           nodemap->nodes[pnn].flags == 0) {
-                               DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
-                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
-                               need_takeover_run = true;
-                       }
+       for (j=0; j<ips->num; j++) {
+               if (ips->ips[j].pnn == -1 &&
+                   nodemap->nodes[pnn].flags == 0) {
+                       DEBUG(DEBUG_WARNING,
+                             ("Unassigned IP %s can be served by this node\n",
+                              ctdb_addr_to_str(&ips->ips[j].addr)));
+                       need_takeover_run = true;
                }
+       }
 
-               talloc_free(ips);
+       talloc_free(ips);
 
-               /* read the *known* IPs from the local node */
-               ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
-                       talloc_free(mem_ctx);
-                       return -1;
-               }
+       if (!ctdb->do_checkpublicip) {
+               goto done;
+       }
 
-               for (j=0; j<ips->num; j++) {
-                       if (ips->ips[j].pnn == pnn) {
-                               if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
-                                       DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
-                                               ctdb_addr_to_str(&ips->ips[j].addr)));
-                                       need_takeover_run = true;
-                               }
-                       } else {
-                               if (ctdb->do_checkpublicip &&
-                                   ctdb_sys_have_ip(&ips->ips[j].addr)) {
+       /* Validate the IP addresses that this node has on network
+        * interfaces.  If there is an inconsistency between reality
+        * and the state expected by CTDB then try to fix it by
+        * triggering an IP reallocation or releasing extraneous IP
+        * addresses. */
 
-                                       DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
-                                               ctdb_addr_to_str(&ips->ips[j].addr)));
+       /* Read *known* IPs from local node */
+       ret = ctdb_ctrl_get_public_ips_flags(
+               ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
+               talloc_free(mem_ctx);
+               return -1;
+       }
 
-                                       if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
-                                               DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
-                                       }
-                               }
+       for (j=0; j<ips->num; j++) {
+               if (ips->ips[j].pnn == pnn) {
+                       if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                               DEBUG(DEBUG_ERR,
+                                     ("Assigned IP %s not on an interface\n",
+                                      ctdb_addr_to_str(&ips->ips[j].addr)));
+                               need_takeover_run = true;
+                       }
+               } else {
+                       if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                               DEBUG(DEBUG_ERR,
+                                     ("IP %s incorrectly on an interface\n",
+                                      ctdb_addr_to_str(&ips->ips[j].addr)));
+                               need_takeover_run = true;
                        }
                }
        }
 
+done:
        if (need_takeover_run) {
                struct ctdb_srvid_message rd;
                TDB_DATA data;
 
-               DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
+               DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
 
                ZERO_STRUCT(rd);
                rd.pnn = ctdb->pnn;
@@ -3211,7 +2450,8 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_rec
 
                ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
+                       DEBUG(DEBUG_ERR,
+                             ("Failed to send takeover run request\n"));
                }
        }
        talloc_free(mem_ctx);
@@ -3253,55 +2493,8 @@ static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
        return 0;
 }
 
-static int update_recovery_lock_file(struct ctdb_context *ctdb)
-{
-       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
-       const char *reclockfile;
-
-       if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
-               DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
-               talloc_free(tmp_ctx);
-               return -1;      
-       }
-
-       if (reclockfile == NULL) {
-               if (ctdb->recovery_lock_file != NULL) {
-                       DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
-                       talloc_free(ctdb->recovery_lock_file);
-                       ctdb->recovery_lock_file = NULL;
-                       ctdb_recovery_unlock(ctdb);
-               }
-               talloc_free(tmp_ctx);
-               return 0;
-       }
-
-       if (ctdb->recovery_lock_file == NULL) {
-               DEBUG(DEBUG_NOTICE,
-                     ("Recovery lock file enabled (%s)\n", reclockfile));
-               ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
-               ctdb_recovery_unlock(ctdb);
-               talloc_free(tmp_ctx);
-               return 0;
-       }
-
-
-       if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
-               talloc_free(tmp_ctx);
-               return 0;
-       }
-
-       DEBUG(DEBUG_NOTICE,
-             ("Recovery lock file changed (now %s)\n", reclockfile));
-       talloc_free(ctdb->recovery_lock_file);
-       ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
-       ctdb_recovery_unlock(ctdb);
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
-                                                   TALLOC_CTX *mem_ctx)
+static bool validate_recovery_master(struct ctdb_recoverd *rec,
+                                    TALLOC_CTX *mem_ctx)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        uint32_t pnn = ctdb_get_pnn(ctdb);
@@ -3315,7 +2508,8 @@ static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
        if (rec->recmaster == CTDB_UNKNOWN_PNN) {
                DEBUG(DEBUG_NOTICE,
                      ("Initial recovery master set - forcing election\n"));
-               return MONITOR_ELECTION_NEEDED;
+               force_election(rec, pnn, nodemap);
+               return false;
        }
 
        /*
@@ -3332,7 +2526,8 @@ static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
                      (" Current recmaster node %u does not have CAP_RECMASTER,"
                       " but we (node %u) have - force an election\n",
                       rec->recmaster, pnn));
-               return MONITOR_ELECTION_NEEDED;
+               force_election(rec, pnn, nodemap);
+               return false;
        }
 
        /* Verify that the master node has not been deleted.  This
@@ -3345,7 +2540,8 @@ static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
                DEBUG(DEBUG_ERR,
                      ("Recmaster node %u has been deleted. Force election\n",
                       rec->recmaster));
-               return MONITOR_ELECTION_NEEDED;
+               force_election(rec, pnn, nodemap);
+               return false;
        }
 
        /* if recovery master is disconnected/deleted we must elect a new recmaster */
@@ -3354,7 +2550,8 @@ static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
                DEBUG(DEBUG_NOTICE,
                      ("Recmaster node %u is disconnected/deleted. Force election\n",
                       rec->recmaster));
-               return MONITOR_ELECTION_NEEDED;
+               force_election(rec, pnn, nodemap);
+               return false;
        }
 
        /* get nodemap from the recovery master to check if it is inactive */
@@ -3365,7 +2562,8 @@ static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
                      (__location__
                       " Unable to get nodemap from recovery master %u\n",
                          rec->recmaster));
-               return MONITOR_FAILED;
+               /* No election, just error */
+               return false;
        }
 
 
@@ -3381,10 +2579,11 @@ static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
                 */
                nodemap->nodes[rec->recmaster].flags =
                        recmaster_nodemap->nodes[rec->recmaster].flags;
-               return MONITOR_ELECTION_NEEDED;
+               force_election(rec, pnn, nodemap);
+               return false;
        }
 
-       return MONITOR_OK;
+       return true;
 }
 
 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
@@ -3438,12 +2637,6 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       /* get the current recovery lock file from the server */
-       if (update_recovery_lock_file(ctdb) != 0) {
-               DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
-               return;
-       }
-
        pnn = ctdb_get_pnn(ctdb);
 
        /* get nodemap */
@@ -3464,6 +2657,13 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
+       ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
+                                  CTDB_CURRENT_NODE, &ctdb->recovery_mode);
+       if (ret != 0) {
+               D_ERR("Failed to read recmode from local node\n");
+               return;
+       }
+
        /* if the local daemon is STOPPED or BANNED, we verify that the databases are
           also frozen and that the recmode is set to active.
        */
@@ -3476,10 +2676,6 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                 */
                rec->priority_time = timeval_current();
 
-               ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
-               }
                if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
                        DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
 
@@ -3489,11 +2685,18 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
 
                                return;
                        }
-                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+               }
+               if (! rec->frozen_on_inactive) {
+                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
+                                              CTDB_CURRENT_NODE);
                        if (ret != 0) {
-                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
+                               DEBUG(DEBUG_ERR,
+                                     (__location__ " Failed to freeze node "
+                                      "in STOPPED or BANNED state\n"));
                                return;
                        }
+
+                       rec->frozen_on_inactive = true;
                }
 
                /* If this node is stopped or banned then it is not the recovery
@@ -3503,19 +2706,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       /* If we are not the recmaster then do some housekeeping */
-       if (rec->recmaster != pnn) {
-               /* Ignore any IP reallocate requests - only recmaster
-                * processes them
-                */
-               TALLOC_FREE(rec->reallocate_requests);
-               /* Clear any nodes that should be force rebalanced in
-                * the next takeover run.  If the recovery master role
-                * has moved then we don't want to process these some
-                * time in the future.
-                */
-               TALLOC_FREE(rec->force_rebalance_nodes);
-       }
+       rec->frozen_on_inactive = false;
 
        /* Retrieve capabilities from all connected nodes */
        ret = update_capabilities(rec, nodemap);
@@ -3524,30 +2715,16 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       switch (validate_recovery_master(rec, mem_ctx)) {
-       case MONITOR_RECOVERY_NEEDED:
-               /* can not happen */
-               return;
-       case MONITOR_ELECTION_NEEDED:
-               force_election(rec, pnn, nodemap);
-               return;
-       case MONITOR_OK:
-               break;
-       case MONITOR_FAILED:
+       if (! validate_recovery_master(rec, mem_ctx)) {
                return;
        }
 
-       /* verify that we have all ip addresses we should have and we dont
-        * have addresses we shouldnt have.
-        */ 
-       if (ctdb->tunable.disable_ip_failover == 0 &&
-           !ctdb_op_is_disabled(rec->takeover_run)) {
-               if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
-               }
+       if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
+               /* Check if an IP takeover run is needed and trigger one if
+                * necessary */
+               verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
        }
 
-
        /* if we are not the recmaster then we do not need to check
           if recovery is needed
         */
@@ -3558,12 +2735,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
 
        /* ensure our local copies of flags are right */
        ret = update_local_flags(rec, nodemap);
-       if (ret == MONITOR_ELECTION_NEEDED) {
-               DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
-               force_election(rec, pnn, nodemap);
-               return;
-       }
-       if (ret != MONITOR_OK) {
+       if (ret != 0) {
                DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
                return;
        }
@@ -3618,9 +2790,9 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
 
-        if (ctdb->recovery_lock_file != NULL) {
+        if (ctdb->recovery_lock != NULL) {
                /* We must already hold the recovery lock */
-               if (!ctdb_recovery_have_lock(ctdb)) {
+               if (!ctdb_recovery_have_lock(rec)) {
                        DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
                        ctdb_set_culprit(rec, ctdb->pnn);
                        do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
@@ -3629,18 +2801,12 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
 
-       /* if there are takeovers requested, perform it and notify the waiters */
-       if (!ctdb_op_is_disabled(rec->takeover_run) &&
-           rec->reallocate_requests) {
-               process_ipreallocate_requests(ctdb, rec);
-       }
-
        /* If recoveries are disabled then there is no use doing any
         * nodemap or flags checks.  Recoveries might be disabled due
         * to "reloadnodes", so doing these checks might cause an
         * unnecessary recovery.  */
        if (ctdb_op_is_disabled(rec->recovery)) {
-               return;
+               goto takeover_run_checks;
        }
 
        /* get the nodemap for all active remote nodes
@@ -3849,22 +3015,39 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                }
        }
 
-       /* we might need to change who has what IP assigned */
-       if (rec->need_takeover_run) {
-               /* If takeover run fails, then the offending nodes are
-                * assigned ban culprit counts. And we re-try takeover.
-                * If takeover run fails repeatedly, the node would get
-                * banned.
-                */
-               do_takeover_run(rec, nodemap, true);
+       /* FIXME: Add remote public IP checking to ensure that nodes
+        * have the IP addresses that are allocated to them. */
+
+takeover_run_checks:
+
+       /* If there are IP takeover runs requested or the previous one
+        * failed then perform one and notify the waiters */
+       if (!ctdb_op_is_disabled(rec->takeover_run) &&
+           (rec->reallocate_requests || rec->need_takeover_run)) {
+               process_ipreallocate_requests(ctdb, rec);
        }
 }
 
+static void recd_sig_term_handler(struct tevent_context *ev,
+                                 struct tevent_signal *se, int signum,
+                                 int count, void *dont_care,
+                                 void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type_abort(
+               private_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
+       ctdb_recovery_unlock(rec);
+       exit(0);
+}
+
+
 /*
   the main monitoring loop
  */
 static void monitor_cluster(struct ctdb_context *ctdb)
 {
+       struct tevent_signal *se;
        struct ctdb_recoverd *rec;
 
        DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
@@ -3874,6 +3057,7 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        rec->ctdb = ctdb;
        rec->recmaster = CTDB_UNKNOWN_PNN;
+       rec->recovery_lock_handle = NULL;
 
        rec->takeover_run = ctdb_op_init(rec, "takeover runs");
        CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
@@ -3882,10 +3066,22 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
 
        rec->priority_time = timeval_current();
+       rec->frozen_on_inactive = false;
+
+       se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
+                              recd_sig_term_handler, rec);
+       if (se == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
+               exit(1);
+       }
 
        /* register a message port for sending memory dumps */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
 
+       /* when a node is assigned banning credits */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
+                                       banning_handler, rec);
+
        /* register a message port for recovery elections */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
 
@@ -3907,9 +3103,6 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        /* register a message port for disabling the ip check for a short while */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
 
-       /* register a message port for updating the recovery daemons node assignment for an ip */
-       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
-
        /* register a message port for forcing a rebalance of a node next
           reallocation */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
@@ -4018,6 +3211,7 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
        int fd[2];
        struct tevent_signal *se;
        struct tevent_fd *fde;
+       int ret;
 
        if (pipe(fd) != 0) {
                return -1;
@@ -4044,8 +3238,13 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        srandom(getpid() ^ time(NULL));
 
-       prctl_set_comment("ctdb_recovered");
-       if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
+       ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
+       if (ret != 0) {
+               return -1;
+       }
+
+       prctl_set_comment("ctdb_recoverd");
+       if (switch_from_server_to_client(ctdb) != 0) {
                DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
                exit(1);
        }