ctdb-recoverd: Store recovery lock handle
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
index 538d4301adbf65cf39fa6cd72dfb2fe14128faa7..46357b6ed690905ae2fe05f39b566397f473495f 100644 (file)
 #include "lib/util/dlinklist.h"
 #include "lib/util/debug.h"
 #include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
 #include "lib/util/util_process.h"
 
 #include "ctdb_private.h"
 #include "ctdb_client.h"
 
-#include "common/system.h"
-#include "common/cmdline.h"
+#include "common/system_socket.h"
 #include "common/common.h"
 #include "common/logging.h"
 
+#include "server/ctdb_config.h"
+
 #include "ctdb_cluster_mutex.h"
 
 /* List of SRVID requests that need to be processed */
@@ -237,6 +239,8 @@ struct ctdb_banning_state {
        struct timeval last_reported_time;
 };
 
+struct ctdb_recovery_lock_handle;
+
 /*
   private state of recovery daemon
  */
@@ -258,6 +262,7 @@ struct ctdb_recoverd {
        uint32_t *force_rebalance_nodes;
        struct ctdb_node_capabilities *caps;
        bool frozen_on_inactive;
+       struct ctdb_recovery_lock_handle *recovery_lock_handle;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
@@ -343,87 +348,6 @@ static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
        ctdb_set_culprit_count(rec, culprit, 1);
 }
 
-
-/* this callback is called for every node that failed to execute the
-   recovered event
-*/
-static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
-
-       ctdb_set_culprit(rec, node_pnn);
-}
-
-/*
-  run the "recovered" eventscript on all nodes
- */
-static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
-{
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL, recovered_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
-
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-/* this callback is called for every node that failed to execute the
-   start recovery event
-*/
-static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
-
-       ctdb_set_culprit(rec, node_pnn);
-}
-
-/*
-  run the "startrecovery" eventscript on all nodes
- */
-static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
-{
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL,
-                                       startrecovery_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
 /*
   Retrieve capabilities from all connected nodes
  */
@@ -465,29 +389,13 @@ static int update_capabilities(struct ctdb_recoverd *rec,
        return 0;
 }
 
-static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
-       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
-}
-
-static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
-       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
-}
-
 /*
   change recovery mode on all nodes
  */
 static int set_recovery_mode(struct ctdb_context *ctdb,
                             struct ctdb_recoverd *rec,
                             struct ctdb_node_map_old *nodemap,
-                            uint32_t rec_mode, bool freeze)
+                            uint32_t rec_mode)
 {
        TDB_DATA data;
        uint32_t *nodes;
@@ -512,65 +420,10 @@ static int set_recovery_mode(struct ctdb_context *ctdb,
                return -1;
        }
 
-       /* freeze all nodes */
-       if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
-               int i;
-
-               for (i=1; i<=NUM_DB_PRIORITIES; i++) {
-                       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
-                                               nodes, i,
-                                               CONTROL_TIMEOUT(),
-                                               false, tdb_null,
-                                               NULL,
-                                               set_recmode_fail_callback,
-                                               rec) != 0) {
-                               DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
-                               talloc_free(tmp_ctx);
-                               return -1;
-                       }
-               }
-       }
-
        talloc_free(tmp_ctx);
        return 0;
 }
 
-/* update all remote nodes to use the same db priority that we have
-   this can fail if the remove node has not yet been upgraded to 
-   support this function, so we always return success and never fail
-   a recovery if this call fails.
-*/
-static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
-       struct ctdb_node_map_old *nodemap, 
-       uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
-{
-       int db;
-
-       /* step through all local databases */
-       for (db=0; db<dbmap->num;db++) {
-               struct ctdb_db_priority db_prio;
-               int ret;
-
-               db_prio.db_id     = dbmap->dbs[db].db_id;
-               ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
-                       continue;
-               }
-
-               DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
-
-               ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
-                                               CTDB_CURRENT_NODE, &db_prio);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
-                                        db_prio.db_id));
-               }
-       }
-
-       return 0;
-}                      
-
 /*
   ensure all other nodes have attached to any databases that we have
  */
@@ -623,7 +476,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                        ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
                                                 nodemap->nodes[j].pnn,
                                                 mem_ctx, name,
-                                                dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
+                                                dbmap->dbs[db].flags, NULL);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
                                return -1;
@@ -685,8 +538,9 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                                          nodemap->nodes[j].pnn));
                                return -1;
                        }
-                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
-                                          remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
+                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
+                                          mem_ctx, name,
+                                          remote_dbmap->dbs[db].flags, NULL);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
                                return -1;
@@ -702,244 +556,6 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
        return 0;
 }
 
-
-/*
-  pull the remote database contents from one node into the recdb
- */
-static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
-                                   struct tdb_wrap *recdb, uint32_t dbid)
-{
-       int ret;
-       TDB_DATA outdata;
-       struct ctdb_marshall_buffer *reply;
-       struct ctdb_rec_data_old *recdata;
-       int i;
-       TALLOC_CTX *tmp_ctx = talloc_new(recdb);
-
-       ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
-                              CONTROL_TIMEOUT(), &outdata);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       reply = (struct ctdb_marshall_buffer *)outdata.dptr;
-
-       if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
-               DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       recdata = (struct ctdb_rec_data_old *)&reply->data[0];
-
-       for (i=0;
-            i<reply->count;
-            recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
-               TDB_DATA key, data;
-               struct ctdb_ltdb_header *hdr;
-               TDB_DATA existing;
-
-               key.dptr = &recdata->data[0];
-               key.dsize = recdata->keylen;
-               data.dptr = &recdata->data[key.dsize];
-               data.dsize = recdata->datalen;
-
-               hdr = (struct ctdb_ltdb_header *)data.dptr;
-
-               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
-                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-
-               /* fetch the existing record, if any */
-               existing = tdb_fetch(recdb->tdb, key);
-
-               if (existing.dptr != NULL) {
-                       struct ctdb_ltdb_header header;
-                       if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
-                               DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
-                                        (unsigned)existing.dsize, srcnode));
-                               free(existing.dptr);
-                               talloc_free(tmp_ctx);
-                               return -1;
-                       }
-                       header = *(struct ctdb_ltdb_header *)existing.dptr;
-                       free(existing.dptr);
-                       if (!(header.rsn < hdr->rsn ||
-                             (header.dmaster != ctdb_get_pnn(ctdb) &&
-                              header.rsn == hdr->rsn))) {
-                               continue;
-                       }
-               }
-
-               if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
-                       DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-       }
-
-       talloc_free(tmp_ctx);
-
-       return 0;
-}
-
-
-struct pull_seqnum_cbdata {
-       int failed;
-       uint32_t pnn;
-       uint64_t seqnum;
-};
-
-static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
-       uint64_t seqnum;
-
-       if (cb_data->failed != 0) {
-               DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
-               return;
-       }
-
-       if (res != 0) {
-               DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
-               cb_data->failed = 1;
-               return;
-       }
-
-       if (outdata.dsize != sizeof(uint64_t)) {
-               DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
-               cb_data->failed = -1;
-               return;
-       }
-
-       seqnum = *((uint64_t *)outdata.dptr);
-
-       if (seqnum > cb_data->seqnum ||
-           (cb_data->pnn == -1 && seqnum == 0)) {
-               cb_data->seqnum = seqnum;
-               cb_data->pnn = node_pnn;
-       }
-}
-
-static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
-
-       DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
-       cb_data->failed = 1;
-}
-
-static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
-                               struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map_old *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid)
-{
-       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
-       uint32_t *nodes;
-       TDB_DATA data;
-       uint32_t outdata[2];
-       struct pull_seqnum_cbdata *cb_data;
-
-       DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
-
-       outdata[0] = dbid;
-       outdata[1] = 0;
-
-       data.dsize = sizeof(outdata);
-       data.dptr  = (uint8_t *)&outdata[0];
-
-       cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
-       if (cb_data == NULL) {
-               DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       cb_data->failed = 0;
-       cb_data->pnn    = -1;
-       cb_data->seqnum = 0;
-       
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       pull_seqnum_cb,
-                                       pull_seqnum_fail_cb,
-                                       cb_data) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
-
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (cb_data->failed != 0) {
-               DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (cb_data->pnn == -1) {
-               DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
-
-       if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
-               DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-
-/*
-  pull all the remote database contents into the recdb
- */
-static int pull_remote_database(struct ctdb_context *ctdb,
-                               struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map_old *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid,
-                               bool persistent)
-{
-       int j;
-
-       if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
-               int ret;
-               ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
-               if (ret == 0) {
-                       return 0;
-               }
-       }
-
-       /* pull all records from all other nodes across onto this node
-          (this merges based on rsn)
-       */
-       for (j=0; j<nodemap->num; j++) {
-               /* don't merge from nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
-                                nodemap->nodes[j].pnn));
-                       ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
-                       return -1;
-               }
-       }
-       
-       return 0;
-}
-
-
 /*
   update flags on all active nodes
  */
@@ -956,32 +572,6 @@ static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node
        return 0;
 }
 
-/*
-  ensure all nodes have the same vnnmap we do
- */
-static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
-                                     uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
-{
-       int j, ret;
-
-       /* push the new vnn map out to all the nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* don't push to nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-
 /*
   called when a vacuum fetch has completed - just free it and do the next one
  */
@@ -1068,7 +658,7 @@ static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        const char *name;
        struct ctdb_dbid_map_old *dbmap=NULL;
-       bool persistent = false;
+       uint8_t db_flags = 0;
        struct ctdb_db_context *ctdb_db;
        struct ctdb_rec_data_old *r;
 
@@ -1087,7 +677,7 @@ static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
 
        for (i=0;i<dbmap->num;i++) {
                if (dbmap->dbs[i].db_id == recs->db_id) {
-                       persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+                       db_flags = dbmap->dbs[i].flags;
                        break;
                }
        }
@@ -1103,7 +693,7 @@ static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
        }
 
        /* attach to it */
-       ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
                goto done;
@@ -1288,330 +878,107 @@ static uint32_t new_generation(void)
        return generation;
 }
 
-
-/*
-  create a temporary working database
- */
-static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
-{
-       char *name;
-       struct tdb_wrap *recdb;
-       unsigned tdb_flags;
-
-       /* open up the temporary recovery database */
-       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
-                              ctdb->db_directory_state,
-                              ctdb->pnn);
-       if (name == NULL) {
-               return NULL;
-       }
-       unlink(name);
-
-       tdb_flags = TDB_NOLOCK;
-       if (ctdb->valgrinding) {
-               tdb_flags |= TDB_NOMMAP;
-       }
-       tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
-
-       recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
-                             tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
-       if (recdb == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
-       }
-
-       talloc_free(name);
-
-       return recdb;
-}
-
-
-/* 
-   a traverse function for pulling all relevant records from recdb
- */
-struct recdb_data {
-       struct ctdb_context *ctdb;
-       struct ctdb_marshall_buffer *recdata;
-       uint32_t len;
-       uint32_t allocated_len;
-       bool failed;
-       bool persistent;
-};
-
-static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
-{
-       struct recdb_data *params = (struct recdb_data *)p;
-       struct ctdb_rec_data_old *recdata;
-       struct ctdb_ltdb_header *hdr;
-
-       /*
-        * skip empty records - but NOT for persistent databases:
-        *
-        * The record-by-record mode of recovery deletes empty records.
-        * For persistent databases, this can lead to data corruption
-        * by deleting records that should be there:
-        *
-        * - Assume the cluster has been running for a while.
-        *
-        * - A record R in a persistent database has been created and
-        *   deleted a couple of times, the last operation being deletion,
-        *   leaving an empty record with a high RSN, say 10.
-        *
-        * - Now a node N is turned off.
-        *
-        * - This leaves the local database copy of D on N with the empty
-        *   copy of R and RSN 10. On all other nodes, the recovery has deleted
-        *   the copy of record R.
-        *
-        * - Now the record is created again while node N is turned off.
-        *   This creates R with RSN = 1 on all nodes except for N.
-        *
-        * - Now node N is turned on again. The following recovery will chose
-        *   the older empty copy of R due to RSN 10 > RSN 1.
-        *
-        * ==> Hence the record is gone after the recovery.
-        *
-        * On databases like Samba's registry, this can damage the higher-level
-        * data structures built from the various tdb-level records.
-        */
-       if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
-               return 0;
-       }
-
-       /* update the dmaster field to point to us */
-       hdr = (struct ctdb_ltdb_header *)data.dptr;
-       if (!params->persistent) {
-               hdr->dmaster = params->ctdb->pnn;
-               hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
-       }
-
-       /* add the record to the blob ready to send to the nodes */
-       recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
-       if (recdata == NULL) {
-               params->failed = true;
-               return -1;
-       }
-       if (params->len + recdata->length >= params->allocated_len) {
-               params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
-               params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
-       }
-       if (params->recdata == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
-                        recdata->length + params->len));
-               params->failed = true;
-               return -1;
-       }
-       params->recdata->count++;
-       memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
-       params->len += recdata->length;
-       talloc_free(recdata);
-
-       return 0;
-}
-
-/*
-  push the recdb database out to all nodes
- */
-static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
-                              bool persistent,
-                              struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
+static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
 {
-       struct recdb_data params;
-       struct ctdb_marshall_buffer *recdata;
-       TDB_DATA outdata;
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
-       CTDB_NO_MEMORY(ctdb, recdata);
-
-       recdata->db_id = dbid;
-
-       params.ctdb = ctdb;
-       params.recdata = recdata;
-       params.len = offsetof(struct ctdb_marshall_buffer, data);
-       params.allocated_len = params.len;
-       params.failed = false;
-       params.persistent = persistent;
-
-       if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
-               talloc_free(params.recdata);
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (params.failed) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
-               talloc_free(params.recdata);
-               talloc_free(tmp_ctx);
-               return -1;              
-       }
-
-       recdata = params.recdata;
-
-       outdata.dptr = (void *)recdata;
-       outdata.dsize = params.len;
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, outdata,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
-               talloc_free(recdata);
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
-                 dbid, recdata->count));
-
-       talloc_free(recdata);
-       talloc_free(tmp_ctx);
-
-       return 0;
+       return (rec->recovery_lock_handle != NULL);
 }
 
-
-/*
-  go through a full recovery on one database 
- */
-static int recover_database(struct ctdb_recoverd *rec, 
-                           TALLOC_CTX *mem_ctx,
-                           uint32_t dbid,
-                           bool persistent,
-                           uint32_t pnn, 
-                           struct ctdb_node_map_old *nodemap,
-                           uint32_t transaction_id)
-{
-       struct tdb_wrap *recdb;
-       int ret;
-       struct ctdb_context *ctdb = rec->ctdb;
-       TDB_DATA data;
-       struct ctdb_transdb w;
-       uint32_t *nodes;
-
-       recdb = create_recdb(ctdb, mem_ctx);
-       if (recdb == NULL) {
-               return -1;
-       }
-
-       /* pull all remote databases onto the recdb */
-       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
-
-       /* wipe all the remote databases. This is safe as we are in a transaction */
-       w.db_id = dbid;
-       w.tid = transaction_id;
-
-       data.dptr = (void *)&w;
-       data.dsize = sizeof(w);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
-               talloc_free(recdb);
-               return -1;
-       }
-       
-       /* push out the correct database. This sets the dmaster and skips 
-          the empty records */
-       ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
-       if (ret != 0) {
-               talloc_free(recdb);
-               return -1;
-       }
-
-       /* all done with this database */
-       talloc_free(recdb);
-
-       return 0;
-}
-
-static bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
-{
-       return (ctdb->recovery_lock_handle != NULL);
-}
-
-struct hold_reclock_state {
+struct ctdb_recovery_lock_handle {
        bool done;
        bool locked;
+       double latency;
+       struct ctdb_cluster_mutex_handle *h;
 };
 
-static void hold_reclock_handler(struct ctdb_context *ctdb,
-                                char status,
+static void take_reclock_handler(char status,
                                 double latency,
-                                struct ctdb_cluster_mutex_handle *h,
                                 void *private_data)
 {
-       struct hold_reclock_state *s =
-               (struct hold_reclock_state *) private_data;
+       struct ctdb_recovery_lock_handle *s =
+               (struct ctdb_recovery_lock_handle *) private_data;
 
        switch (status) {
        case '0':
-               ctdb->recovery_lock_handle = h;
-               ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(),
-                                                  latency);
+               s->latency = latency;
                break;
 
        case '1':
                DEBUG(DEBUG_ERR,
                      ("Unable to take recovery lock - contention\n"));
-               talloc_free(h);
                break;
 
        default:
                DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
-               talloc_free(h);
        }
 
        s->done = true;
        s->locked = (status == '0') ;
 }
 
-static bool ctdb_recovery_lock(struct ctdb_context *ctdb)
+static bool ctdb_recovery_lock(struct ctdb_recoverd *rec);
+
+static void lost_reclock_handler(void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type_abort(
+               private_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR,
+             ("Recovery lock helper terminated unexpectedly - "
+              "trying to retake recovery lock\n"));
+       TALLOC_FREE(rec->recovery_lock_handle);
+       if (! ctdb_recovery_lock(rec)) {
+               DEBUG(DEBUG_ERR, ("Failed to take recovery lock\n"));
+       }
+}
+
+static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
 {
+       struct ctdb_context *ctdb = rec->ctdb;
        struct ctdb_cluster_mutex_handle *h;
-       struct hold_reclock_state s = {
-               .done = false,
-               .locked = false,
+       struct ctdb_recovery_lock_handle *s;
+
+       s = talloc_zero(rec, struct ctdb_recovery_lock_handle);
+       if (s == NULL) {
+               DBG_ERR("Memory allocation error\n");
+               return false;
        };
 
-       h = ctdb_cluster_mutex(ctdb, ctdb->recovery_lock, 0);
+       h = ctdb_cluster_mutex(s,
+                              ctdb,
+                              ctdb->recovery_lock,
+                              0,
+                              take_reclock_handler,
+                              s,
+                              lost_reclock_handler,
+                              rec);
        if (h == NULL) {
+               talloc_free(s);
                return false;
        }
 
-       ctdb_cluster_mutex_set_handler(h, hold_reclock_handler, &s);
-
-       while (!s.done) {
+       while (! s->done) {
                tevent_loop_once(ctdb->ev);
        }
 
-       /* Ensure no attempts to access to s after function return */
-       ctdb_cluster_mutex_set_handler(h, hold_reclock_handler, NULL);
+       if (! s->locked) {
+               talloc_free(s);
+               return false;
+       }
+
+       rec->recovery_lock_handle = s;
+       s->h = h;
+       ctdb_ctrl_report_recd_lock_latency(ctdb,
+                                          CONTROL_TIMEOUT(),
+                                          s->latency);
 
-       return s.locked;
+       return true;
 }
 
-static void ctdb_recovery_unlock(struct ctdb_context *ctdb)
+static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
 {
-       if (ctdb->recovery_lock_handle != NULL) {
+       if (rec->recovery_lock_handle != NULL) {
                DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
-               TALLOC_FREE(ctdb->recovery_lock_handle);
+               TALLOC_FREE(rec->recovery_lock_handle);
        }
 }
 
@@ -1627,125 +994,36 @@ static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
                        continue;
                }
                ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
-               if (ban_state->count < 2*ctdb->num_nodes) {
-                       continue;
-               }
-
-               DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
-                       ctdb->nodes[i]->pnn, ban_state->count,
-                       ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
-               ban_state->count = 0;
-
-               /* Banning ourself? */
-               if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
-                       *self_ban = true;
-               }
-       }
-}
-
-static bool do_takeover_run(struct ctdb_recoverd *rec,
-                           struct ctdb_node_map_old *nodemap)
-{
-       uint32_t *nodes = NULL;
-       struct ctdb_disable_message dtr;
-       TDB_DATA data;
-       int i;
-       uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
-       int ret;
-       bool ok;
-
-       DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
-
-       if (ctdb_op_is_in_progress(rec->takeover_run)) {
-               DEBUG(DEBUG_ERR, (__location__
-                                 " takeover run already in progress \n"));
-               ok = false;
-               goto done;
-       }
-
-       if (!ctdb_op_begin(rec->takeover_run)) {
-               ok = false;
-               goto done;
-       }
-
-       /* Disable IP checks (takeover runs, really) on other nodes
-        * while doing this takeover run.  This will stop those other
-        * nodes from triggering takeover runs when think they should
-        * be hosting an IP but it isn't yet on an interface.  Don't
-        * wait for replies since a failure here might cause some
-        * noise in the logs but will not actually cause a problem.
-        */
-       ZERO_STRUCT(dtr);
-       dtr.srvid = 0; /* No reply */
-       dtr.pnn = -1;
-
-       data.dptr  = (uint8_t*)&dtr;
-       data.dsize = sizeof(dtr);
-
-       nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
-
-       /* Disable for 60 seconds.  This can be a tunable later if
-        * necessary.
-        */
-       dtr.timeout = 60;
-       for (i = 0; i < talloc_array_length(nodes); i++) {
-               if (ctdb_client_send_message(rec->ctdb, nodes[i],
-                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
-                                            data) != 0) {
-                       DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
-               }
-       }
-
-       ret = ctdb_takeover_run(rec->ctdb, nodemap,
-                               rec->force_rebalance_nodes);
-
-       /* Reenable takeover runs and IP checks on other nodes */
-       dtr.timeout = 0;
-       for (i = 0; i < talloc_array_length(nodes); i++) {
-               if (ctdb_client_send_message(rec->ctdb, nodes[i],
-                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
-                                            data) != 0) {
-                       DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
+               if (ban_state->count < 2*ctdb->num_nodes) {
+                       continue;
                }
-       }
 
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
-               ok = false;
-               goto done;
-       }
+               DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
 
-       ok = true;
-       /* Takeover run was successful so clear force rebalance targets */
-       if (rebalance_nodes == rec->force_rebalance_nodes) {
-               TALLOC_FREE(rec->force_rebalance_nodes);
-       } else {
-               DEBUG(DEBUG_WARNING,
-                     ("Rebalance target nodes changed during takeover run - not clearing\n"));
+               /* Banning ourself? */
+               if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
+                       *self_ban = true;
+               }
        }
-done:
-       rec->need_takeover_run = !ok;
-       talloc_free(nodes);
-       ctdb_op_end(rec->takeover_run);
-
-       DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
-       return ok;
 }
 
-struct recovery_helper_state {
+struct helper_state {
        int fd[2];
        pid_t pid;
        int result;
        bool done;
 };
 
-static void ctdb_recovery_handler(struct tevent_context *ev,
-                                 struct tevent_fd *fde,
-                                 uint16_t flags, void *private_data)
+static void helper_handler(struct tevent_context *ev,
+                          struct tevent_fd *fde,
+                          uint16_t flags, void *private_data)
 {
-       struct recovery_helper_state *state = talloc_get_type_abort(
-               private_data, struct recovery_helper_state);
+       struct helper_state *state = talloc_get_type_abort(
+               private_data, struct helper_state);
        int ret;
 
        ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
@@ -1756,22 +1034,16 @@ static void ctdb_recovery_handler(struct tevent_context *ev,
        state->done = true;
 }
 
-
-static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
+static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
+                     const char *prog, const char *arg, const char *type)
 {
-       static char prog[PATH_MAX+1] = "";
-       const char **args;
-       struct recovery_helper_state *state;
+       struct helper_state *state;
        struct tevent_fd *fde;
+       const char **args;
        int nargs, ret;
+       uint32_t recmaster = rec->recmaster;
 
-       if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
-                            "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
-                            "ctdb_recovery_helper")) {
-               ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
-       }
-
-       state = talloc_zero(mem_ctx, struct recovery_helper_state);
+       state = talloc_zero(mem_ctx, struct helper_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
                return -1;
@@ -1782,7 +1054,7 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
        ret = pipe(state->fd);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,
-                     ("Failed to create pipe for recovery helper\n"));
+                     ("Failed to create pipe for %s helper\n", type));
                goto fail;
        }
 
@@ -1796,21 +1068,22 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
        }
 
        args[0] = talloc_asprintf(args, "%d", state->fd[1]);
-       args[1] = rec->ctdb->daemon.name;
-       args[2] = talloc_asprintf(args, "%u", new_generation());
-       args[3] = NULL;
-
-       if (args[0] == NULL || args[2] == NULL) {
+       if (args[0] == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
                goto fail;
        }
+       args[1] = rec->ctdb->daemon.name;
+       args[2] = arg;
+       args[3] = NULL;
 
-       setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
+       if (args[2] == NULL) {
+               nargs = 3;
+       }
 
-       if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
-                                    args, NULL, NULL, &state->pid)) {
+       state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
+       if (state->pid == -1) {
                DEBUG(DEBUG_ERR,
-                     ("Failed to create child for recovery helper\n"));
+                     ("Failed to create child for %s helper\n", type));
                goto fail;
        }
 
@@ -1820,7 +1093,7 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
        state->done = false;
 
        fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
-                           TEVENT_FD_READ, ctdb_recovery_handler, state);
+                           TEVENT_FD_READ, helper_handler, state);
        if (fde == NULL) {
                goto fail;
        }
@@ -1828,6 +1101,14 @@ static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
 
        while (!state->done) {
                tevent_loop_once(rec->ctdb->ev);
+
+               /* If recmaster changes, we have lost election */
+               if (recmaster != rec->recmaster) {
+                       D_ERR("Recmaster changed to %u, aborting %s\n",
+                             rec->recmaster, type);
+                       state->result = 1;
+                       break;
+               }
        }
 
        close(state->fd[0]);
@@ -1855,159 +1136,153 @@ fail:
        return -1;
 }
 
-static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
-                             uint32_t pnn, struct ctdb_node_map_old *nodemap,
-                             struct ctdb_vnn_map *vnnmap,
-                             struct ctdb_dbid_map_old *dbmap)
+
+static int ctdb_takeover(struct ctdb_recoverd *rec,
+                        uint32_t *force_rebalance_nodes)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
-       uint32_t generation;
-       TDB_DATA data;
-       uint32_t *nodes;
-       int ret, i, j;
+       static char prog[PATH_MAX+1] = "";
+       char *arg;
+       int i, ret;
 
-       /* set recovery mode to active on all nodes */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
-               return -1;
+       if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
+                            "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
+                            "ctdb_takeover_helper")) {
+               ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
        }
 
-       /* execute the "startrecovery" event script on all nodes */
-       ret = run_startrecovery_eventscript(rec, nodemap);
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
-               return -1;
+       arg = NULL;
+       for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
+               uint32_t pnn = force_rebalance_nodes[i];
+               if (arg == NULL) {
+                       arg = talloc_asprintf(rec, "%u", pnn);
+               } else {
+                       arg = talloc_asprintf_append(arg, ",%u", pnn);
+               }
+               if (arg == NULL) {
+                       DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
+                       return -1;
+               }
        }
 
-       /* pick a new generation number */
-       generation = new_generation();
-
-       /* change the vnnmap on this node to use the new generation 
-          number but not on any other nodes.
-          this guarantees that if we abort the recovery prematurely
-          for some reason (a node stops responding?)
-          that we can just return immediately and we will reenter
-          recovery shortly again.
-          I.e. we deliberately leave the cluster with an inconsistent
-          generation id to allow us to abort recovery at any stage and
-          just restart it from scratch.
-        */
-       vnnmap->generation = generation;
-       ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-               return -1;
+       if (ctdb_config.failover_disabled) {
+               ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
+               if (ret != 0) {
+                       D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
+                       return -1;
+               }
        }
 
-       /* Database generations are updated when the transaction is commited to
-        * the databases.  So make sure to use the final generation as the
-        * transaction id
-        */
-       generation = new_generation();
+       return helper_run(rec, rec, prog, arg, "takeover");
+}
 
-       data.dptr = (void *)&generation;
-       data.dsize = sizeof(uint32_t);
+static bool do_takeover_run(struct ctdb_recoverd *rec,
+                           struct ctdb_node_map_old *nodemap)
+{
+       uint32_t *nodes = NULL;
+       struct ctdb_disable_message dtr;
+       TDB_DATA data;
+       int i;
+       uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
+       int ret;
+       bool ok;
 
-       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL,
-                                       transaction_start_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
-               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL,
-                                       NULL,
-                                       NULL) != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
-               }
-               return -1;
-       }
+       DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
 
-       DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
+       if (ctdb_op_is_in_progress(rec->takeover_run)) {
+               DEBUG(DEBUG_ERR, (__location__
+                                 " takeover run already in progress \n"));
+               ok = false;
+               goto done;
+       }
 
-       for (i=0;i<dbmap->num;i++) {
-               ret = recover_database(rec, mem_ctx,
-                                      dbmap->dbs[i].db_id,
-                                      dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
-                                      pnn, nodemap, generation);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
-                       return -1;
-               }
+       if (!ctdb_op_begin(rec->takeover_run)) {
+               ok = false;
+               goto done;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
+       /* Disable IP checks (takeover runs, really) on other nodes
+        * while doing this takeover run.  This will stop those other
+        * nodes from triggering takeover runs when think they should
+        * be hosting an IP but it isn't yet on an interface.  Don't
+        * wait for replies since a failure here might cause some
+        * noise in the logs but will not actually cause a problem.
+        */
+       ZERO_STRUCT(dtr);
+       dtr.srvid = 0; /* No reply */
+       dtr.pnn = -1;
 
-       /* commit all the changes */
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
-               return -1;
-       }
+       data.dptr  = (uint8_t*)&dtr;
+       data.dsize = sizeof(dtr);
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
+       nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
 
-       /* build a new vnn map with all the currently active and
-          unbanned nodes */
-       vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
-       CTDB_NO_MEMORY(ctdb, vnnmap);
-       vnnmap->generation = generation;
-       vnnmap->size = 0;
-       vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
-       CTDB_NO_MEMORY(ctdb, vnnmap->map);
-       for (i=j=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (!ctdb_node_has_capabilities(rec->caps,
-                                               ctdb->nodes[i]->pnn,
-                                               CTDB_CAP_LMASTER)) {
-                       /* this node can not be an lmaster */
-                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
-                       continue;
+       /* Disable for 60 seconds.  This can be a tunable later if
+        * necessary.
+        */
+       dtr.timeout = 60;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
                }
+       }
 
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[j++] = nodemap->nodes[i].pnn;
+       ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
 
-       }
-       if (vnnmap->size == 0) {
-               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[0] = pnn;
+       /* Reenable takeover runs and IP checks on other nodes */
+       dtr.timeout = 0;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
+               }
        }
 
-       /* update to the new vnnmap on all nodes */
-       ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
-               return -1;
+               DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
+               ok = false;
+               goto done;
+       }
+
+       ok = true;
+       /* Takeover run was successful so clear force rebalance targets */
+       if (rebalance_nodes == rec->force_rebalance_nodes) {
+               TALLOC_FREE(rec->force_rebalance_nodes);
+       } else {
+               DEBUG(DEBUG_WARNING,
+                     ("Rebalance target nodes changed during takeover run - not clearing\n"));
        }
+done:
+       rec->need_takeover_run = !ok;
+       talloc_free(nodes);
+       ctdb_op_end(rec->takeover_run);
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
+       DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
+       return ok;
+}
 
-       /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
+static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
+{
+       static char prog[PATH_MAX+1] = "";
+       const char *arg;
+
+       if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
+                            "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
+                            "ctdb_recovery_helper")) {
+               ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
+       }
+
+       arg = talloc_asprintf(mem_ctx, "%u", new_generation());
+       if (arg == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
                return -1;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
+       setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
 
-       return 0;
+       return helper_run(rec, mem_ctx, prog, arg, "recovery");
 }
 
 /*
@@ -2021,7 +1296,6 @@ static int do_recovery(struct ctdb_recoverd *rec,
        int i, ret;
        struct ctdb_dbid_map_old *dbmap;
        bool self_ban;
-       bool par_recovery;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
@@ -2054,31 +1328,47 @@ static int do_recovery(struct ctdb_recoverd *rec,
                goto fail;
        }
 
-        if (ctdb->recovery_lock != NULL) {
-               if (ctdb_recovery_have_lock(ctdb)) {
-                       DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
+       if (ctdb->recovery_lock != NULL) {
+               if (ctdb_recovery_have_lock(rec)) {
+                       D_NOTICE("Already holding recovery lock\n");
                } else {
-                       DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
-                                            ctdb->recovery_lock));
-                       if (!ctdb_recovery_lock(ctdb)) {
-                               if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
-                                       /* If ctdb is trying first recovery, it's
-                                        * possible that current node does not know
-                                        * yet who the recmaster is.
+                       bool ok;
+
+                       D_NOTICE("Attempting to take recovery lock (%s)\n",
+                                ctdb->recovery_lock);
+
+                       ok = ctdb_recovery_lock(rec);
+                       if (! ok) {
+                               D_ERR("Unable to take recovery lock\n");
+
+                               if (pnn != rec->recmaster) {
+                                       D_NOTICE("Recovery master changed to %u,"
+                                                " aborting recovery\n",
+                                                rec->recmaster);
+                                       rec->need_recovery = false;
+                                       goto fail;
+                               }
+
+                               if (ctdb->runstate ==
+                                   CTDB_RUNSTATE_FIRST_RECOVERY) {
+                                       /*
+                                        * First recovery?  Perhaps
+                                        * current node does not yet
+                                        * know who the recmaster is.
                                         */
-                                       DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
-                                                         " - retrying recovery\n"));
+                                       D_ERR("Retrying recovery\n");
                                        goto fail;
                                }
 
-                               DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
-                                                "and ban ourself for %u seconds\n",
-                                                ctdb->tunable.recovery_ban_period));
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
+                               D_ERR("Abort recovery, "
+                                     "ban this node for %u seconds\n",
+                                     ctdb->tunable.recovery_ban_period);
+                               ctdb_ban_node(rec,
+                                             pnn,
+                                             ctdb->tunable.recovery_ban_period);
                                goto fail;
                        }
-                       DEBUG(DEBUG_NOTICE,
-                             ("Recovery lock taken successfully by recovery daemon\n"));
+                       D_NOTICE("Recovery lock taken successfully\n");
                }
        }
 
@@ -2109,13 +1399,6 @@ static int do_recovery(struct ctdb_recoverd *rec,
        }
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
 
-       /* update the database priority for all remote databases */
-       ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
-       }
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
-
 
        /* Retrieve capabilities from all connected nodes */
        ret = update_capabilities(rec, nodemap);
@@ -2145,42 +1428,13 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
-       /* Check if all participating nodes have parallel recovery capability */
-       par_recovery = true;
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               if (!(rec->caps[i].capabilities &
-                     CTDB_CAP_PARALLEL_RECOVERY)) {
-                       par_recovery = false;
-                       break;
-               }
-       }
-
-       if (par_recovery) {
-               ret = db_recovery_parallel(rec, mem_ctx);
-       } else {
-               ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
-                                        dbmap);
-       }
-
+       ret = db_recovery_parallel(rec, mem_ctx);
        if (ret != 0) {
                goto fail;
        }
 
        do_takeover_run(rec, nodemap);
 
-       /* execute the "recovered" event script on all nodes */
-       ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
-               goto fail;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
-
        /* send a message to all clients telling them that the cluster 
           has been reconfigured */
        ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
@@ -2219,7 +1473,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
           We now wait for rerecovery_timeout before we allow
           another recovery to take place.
        */
-       DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
+       DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
        ctdb_op_disable(rec->recovery, ctdb->ev,
                        ctdb->tunable.rerecovery_timeout);
        return 0;
@@ -2490,28 +1744,6 @@ static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
 
 
 
-static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
-                                  void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(
-               private_data, struct ctdb_recoverd);
-       struct ctdb_public_ip *ip;
-
-       if (rec->recmaster != rec->ctdb->pnn) {
-               DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
-               return;
-       }
-
-       if (data.dsize != sizeof(struct ctdb_public_ip)) {
-               DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
-               return;
-       }
-
-       ip = (struct ctdb_public_ip *)data.dptr;
-
-       update_ip_assignment_tree(rec->ctdb, ip);
-}
-
 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
                                    TDB_DATA data,
                                    struct ctdb_op_state *op_state)
@@ -2708,12 +1940,10 @@ static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
        TALLOC_FREE(rec->send_election_te);
 
        /* Release the recovery lock file */
-       if (ctdb_recovery_have_lock(ctdb)) {
-               ctdb_recovery_unlock(ctdb);
+       if (ctdb_recovery_have_lock(rec)) {
+               ctdb_recovery_unlock(rec);
        }
 
-       clear_ip_assignment_tree(ctdb);
-
        /* ok, let that guy become recmaster then */
        ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
                                     CTDB_CURRENT_NODE, em->pnn);
@@ -2739,7 +1969,7 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
        DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
 
        /* set all nodes to recovery mode to stop all internode traffic */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
@@ -3134,7 +2364,7 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb,
        }
 
        /* Return early if disabled... */
-       if (ctdb->tunable.disable_ip_failover != 0 ||
+       if (ctdb_config.failover_disabled ||
            ctdb_op_is_disabled(rec->takeover_run)) {
                return  0;
        }
@@ -3198,16 +2428,9 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb,
                } else {
                        if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
                                DEBUG(DEBUG_ERR,
-                                     ("IP %s incorrectly on an interface - releasing\n",
+                                     ("IP %s incorrectly on an interface\n",
                                       ctdb_addr_to_str(&ips->ips[j].addr)));
-                               ret = ctdb_ctrl_release_ip(ctdb,
-                                                          CONTROL_TIMEOUT(),
-                                                          CTDB_CURRENT_NODE,
-                                                          &ips->ips[j]);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,
-                                             ("Failed to release IP address\n"));
-                               }
+                               need_takeover_run = true;
                        }
                }
        }
@@ -3434,6 +2657,13 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
+       ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
+                                  CTDB_CURRENT_NODE, &ctdb->recovery_mode);
+       if (ret != 0) {
+               D_ERR("Failed to read recmode from local node\n");
+               return;
+       }
+
        /* if the local daemon is STOPPED or BANNED, we verify that the databases are
           also frozen and that the recmode is set to active.
        */
@@ -3446,10 +2676,6 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                 */
                rec->priority_time = timeval_current();
 
-               ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
-               }
                if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
                        DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
 
@@ -3493,9 +2719,11 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       /* Check if an IP takeover run is needed and trigger one if
-        * necessary */
-       verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
+       if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
+               /* Check if an IP takeover run is needed and trigger one if
+                * necessary */
+               verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
+       }
 
        /* if we are not the recmaster then we do not need to check
           if recovery is needed
@@ -3564,7 +2792,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
 
         if (ctdb->recovery_lock != NULL) {
                /* We must already hold the recovery lock */
-               if (!ctdb_recovery_have_lock(ctdb)) {
+               if (!ctdb_recovery_have_lock(rec)) {
                        DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
                        ctdb_set_culprit(rec, ctdb->pnn);
                        do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
@@ -3787,6 +3015,9 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                }
        }
 
+       /* FIXME: Add remote public IP checking to ensure that nodes
+        * have the IP addresses that are allocated to them. */
+
 takeover_run_checks:
 
        /* If there are IP takeover runs requested or the previous one
@@ -3797,11 +3028,26 @@ takeover_run_checks:
        }
 }
 
+static void recd_sig_term_handler(struct tevent_context *ev,
+                                 struct tevent_signal *se, int signum,
+                                 int count, void *dont_care,
+                                 void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type_abort(
+               private_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
+       ctdb_recovery_unlock(rec);
+       exit(0);
+}
+
+
 /*
   the main monitoring loop
  */
 static void monitor_cluster(struct ctdb_context *ctdb)
 {
+       struct tevent_signal *se;
        struct ctdb_recoverd *rec;
 
        DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
@@ -3811,6 +3057,7 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        rec->ctdb = ctdb;
        rec->recmaster = CTDB_UNKNOWN_PNN;
+       rec->recovery_lock_handle = NULL;
 
        rec->takeover_run = ctdb_op_init(rec, "takeover runs");
        CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
@@ -3821,6 +3068,13 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        rec->priority_time = timeval_current();
        rec->frozen_on_inactive = false;
 
+       se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
+                              recd_sig_term_handler, rec);
+       if (se == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
+               exit(1);
+       }
+
        /* register a message port for sending memory dumps */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
 
@@ -3849,9 +3103,6 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        /* register a message port for disabling the ip check for a short while */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
 
-       /* register a message port for updating the recovery daemons node assignment for an ip */
-       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
-
        /* register a message port for forcing a rebalance of a node next
           reallocation */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
@@ -3960,6 +3211,7 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
        int fd[2];
        struct tevent_signal *se;
        struct tevent_fd *fde;
+       int ret;
 
        if (pipe(fd) != 0) {
                return -1;
@@ -3986,8 +3238,13 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        srandom(getpid() ^ time(NULL));
 
-       prctl_set_comment("ctdb_recovered");
-       if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
+       ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
+       if (ret != 0) {
+               return -1;
+       }
+
+       prctl_set_comment("ctdb_recoverd");
+       if (switch_from_server_to_client(ctdb) != 0) {
                DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
                exit(1);
        }