ctdb-daemon: Rename struct ctdb_dbid_map to ctdb_dbid_map_old
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
index 3d7c43240adfd44cc086e1b63418c5b06bb98576..e1f580a4fb2bc9114e3a078e67ce0c005977b476 100644 (file)
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
+#include "replace.h"
 #include "system/filesys.h"
 #include "system/time.h"
 #include "system/network.h"
 #include "system/wait.h"
-#include "popt.h"
-#include "cmdline.h"
-#include "../include/ctdb_client.h"
-#include "../include/ctdb_private.h"
+
+#include <popt.h>
+#include <talloc.h>
+#include <tevent.h>
+#include <tdb.h>
+
 #include "lib/tdb_wrap/tdb_wrap.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+#include "ctdb_logging.h"
+
+#include "common/system.h"
+#include "common/cmdline.h"
+#include "common/common.h"
 
 
 /* List of SRVID requests that need to be processed */
@@ -117,6 +129,103 @@ nomem:
        srvid_request_reply(ctdb, request, result);
 }
 
+/* An abstraction to allow an operation (takeover runs, recoveries,
+ * ...) to be disabled for a given timeout */
+struct ctdb_op_state {
+       struct tevent_timer *timer;
+       bool in_progress;
+       const char *name;
+};
+
+static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
+{
+       struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
+
+       if (state != NULL) {
+               state->in_progress = false;
+               state->name = name;
+       }
+
+       return state;
+}
+
+static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
+{
+       return state->timer != NULL;
+}
+
+static bool ctdb_op_begin(struct ctdb_op_state *state)
+{
+       if (ctdb_op_is_disabled(state)) {
+               DEBUG(DEBUG_NOTICE,
+                     ("Unable to begin - %s are disabled\n", state->name));
+               return false;
+       }
+
+       state->in_progress = true;
+       return true;
+}
+
+static bool ctdb_op_end(struct ctdb_op_state *state)
+{
+       return state->in_progress = false;
+}
+
+static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
+{
+       return state->in_progress;
+}
+
+static void ctdb_op_enable(struct ctdb_op_state *state)
+{
+       TALLOC_FREE(state->timer);
+}
+
+static void ctdb_op_timeout_handler(struct tevent_context *ev,
+                                   struct tevent_timer *te,
+                                   struct timeval yt, void *p)
+{
+       struct ctdb_op_state *state =
+               talloc_get_type(p, struct ctdb_op_state);
+
+       DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
+       ctdb_op_enable(state);
+}
+
+static int ctdb_op_disable(struct ctdb_op_state *state,
+                          struct tevent_context *ev,
+                          uint32_t timeout)
+{
+       if (timeout == 0) {
+               DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
+               ctdb_op_enable(state);
+               return 0;
+       }
+
+       if (state->in_progress) {
+               DEBUG(DEBUG_ERR,
+                     ("Unable to disable %s - in progress\n", state->name));
+               return -EAGAIN;
+       }
+
+       DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
+                           state->name, timeout));
+
+       /* Clear any old timers */
+       talloc_free(state->timer);
+
+       /* Arrange for the timeout to occur */
+       state->timer = tevent_add_timer(ev, state,
+                                       timeval_current_ofs(timeout, 0),
+                                       ctdb_op_timeout_handler, state);
+       if (state->timer == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
+               return -ENOMEM;
+       }
+
+       return 0;
+}
+
 struct ctdb_banning_state {
        uint32_t count;
        struct timeval last_reported_time;
@@ -128,29 +237,28 @@ struct ctdb_banning_state {
 struct ctdb_recoverd {
        struct ctdb_context *ctdb;
        uint32_t recmaster;
-       uint32_t num_active;
-       uint32_t num_lmasters;
-       uint32_t num_connected;
        uint32_t last_culprit_node;
-       struct ctdb_node_map *nodemap;
+       struct ctdb_node_map_old *nodemap;
        struct timeval priority_time;
        bool need_takeover_run;
        bool need_recovery;
        uint32_t node_flags;
-       struct timed_event *send_election_te;
-       struct timed_event *election_timeout;
-       struct vacuum_info *vacuum_info;
+       struct tevent_timer *send_election_te;
+       struct tevent_timer *election_timeout;
        struct srvid_requests *reallocate_requests;
-       bool takeover_run_in_progress;
-       TALLOC_CTX *takeover_runs_disable_ctx;
+       struct ctdb_op_state *takeover_run;
+       struct ctdb_op_state *recovery;
        struct ctdb_control_get_ifaces *ifaces;
        uint32_t *force_rebalance_nodes;
+       struct ctdb_node_capabilities *caps;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
 
-static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
+static void ctdb_restart_recd(struct tevent_context *ev,
+                             struct tevent_timer *te, struct timeval t,
+                             void *private_data);
 
 /*
   ban a node for a period of time
@@ -244,7 +352,7 @@ static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn
 /*
   run the "recovered" eventscript on all nodes
  */
-static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
+static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
 {
        TALLOC_CTX *tmp_ctx;
        uint32_t *nodes;
@@ -284,7 +392,7 @@ static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node
 /*
   run the "startrecovery" eventscript on all nodes
  */
-static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
+static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
 {
        TALLOC_CTX *tmp_ctx;
        uint32_t *nodes;
@@ -309,43 +417,42 @@ static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_
        return 0;
 }
 
-static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
-               DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
-               return;
-       }
-       if (node_pnn < ctdb->num_nodes) {
-               ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
-       }
-
-       if (node_pnn == ctdb->pnn) {
-               ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
-       }
-}
-
 /*
   update the node capabilities for all connected nodes
  */
-static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+static int update_capabilities(struct ctdb_recoverd *rec,
+                              struct ctdb_node_map_old *nodemap)
 {
-       uint32_t *nodes;
+       uint32_t *capp;
        TALLOC_CTX *tmp_ctx;
+       struct ctdb_node_capabilities *caps;
+       struct ctdb_context *ctdb = rec->ctdb;
 
-       tmp_ctx = talloc_new(ctdb);
+       tmp_ctx = talloc_new(rec);
        CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
-       nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(),
-                                       false, tdb_null,
-                                       async_getcap_callback, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
+       caps = ctdb_get_capabilities(ctdb, tmp_ctx,
+                                    CONTROL_TIMEOUT(), nodemap);
+
+       if (caps == NULL) {
+               DEBUG(DEBUG_ERR,
+                     (__location__ " Failed to get node capabilities\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
+       if (capp == NULL) {
+               DEBUG(DEBUG_ERR,
+                     (__location__
+                      " Capabilities don't include current node.\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
+       ctdb->capabilities = *capp;
+
+       TALLOC_FREE(rec->caps);
+       rec->caps = talloc_steal(rec, caps);
 
        talloc_free(tmp_ctx);
        return 0;
@@ -370,7 +477,10 @@ static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t
 /*
   change recovery mode on all nodes
  */
-static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
+static int set_recovery_mode(struct ctdb_context *ctdb,
+                            struct ctdb_recoverd *rec,
+                            struct ctdb_node_map_old *nodemap,
+                            uint32_t rec_mode, bool freeze)
 {
        TDB_DATA data;
        uint32_t *nodes;
@@ -396,7 +506,7 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *re
        }
 
        /* freeze all nodes */
-       if (rec_mode == CTDB_RECOVERY_ACTIVE) {
+       if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
                int i;
 
                for (i=1; i<=NUM_DB_PRIORITIES; i++) {
@@ -421,7 +531,7 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *re
 /*
   change recovery master on all node
  */
-static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
+static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn)
 {
        TDB_DATA data;
        TALLOC_CTX *tmp_ctx;
@@ -454,8 +564,8 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
    a recovery if this call fails.
 */
 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
-       struct ctdb_node_map *nodemap, 
-       uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+       struct ctdb_node_map_old *nodemap, 
+       uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
 {
        int db;
 
@@ -464,14 +574,14 @@ static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
                struct ctdb_db_priority db_prio;
                int ret;
 
-               db_prio.db_id     = dbmap->dbs[db].dbid;
-               ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
+               db_prio.db_id     = dbmap->dbs[db].db_id;
+               ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
                        continue;
                }
 
-               DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
+               DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
 
                ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
                                                CTDB_CURRENT_NODE, &db_prio);
@@ -487,11 +597,11 @@ static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
 /*
   ensure all other nodes have attached to any databases that we have
  */
-static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                                          uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
+                                          uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
 {
        int i, j, db, ret;
-       struct ctdb_dbid_map *remote_dbmap;
+       struct ctdb_dbid_map_old *remote_dbmap;
 
        /* verify that all other nodes have all our databases */
        for (j=0; j<nodemap->num; j++) {
@@ -517,7 +627,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
 
 
                        for (i=0;i<remote_dbmap->num;i++) {
-                               if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
+                               if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
                                        break;
                                }
                        }
@@ -527,7 +637,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                        }
                        /* ok so we need to create this database */
                        ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
-                                                 dbmap->dbs[db].dbid, mem_ctx,
+                                                 dbmap->dbs[db].db_id, mem_ctx,
                                                  &name);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
@@ -551,11 +661,11 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
 /*
   ensure we are attached to any databases that anyone else is attached to
  */
-static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                                         uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
+static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
+                                         uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
 {
        int i, j, db, ret;
-       struct ctdb_dbid_map *remote_dbmap;
+       struct ctdb_dbid_map_old *remote_dbmap;
 
        /* verify that we have all database any other node has */
        for (j=0; j<nodemap->num; j++) {
@@ -580,7 +690,7 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                        const char *name;
 
                        for (i=0;i<(*dbmap)->num;i++) {
-                               if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
+                               if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
                                        break;
                                }
                        }
@@ -592,7 +702,7 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                           rebuild dbmap
                         */
                        ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                           remote_dbmap->dbs[db].dbid, mem_ctx, &name);
+                                           remote_dbmap->dbs[db].db_id, mem_ctx, &name);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
                                          nodemap->nodes[j].pnn));
@@ -619,13 +729,13 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
 /*
   pull the remote database contents from one node into the recdb
  */
-static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
+static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
                                    struct tdb_wrap *recdb, uint32_t dbid)
 {
        int ret;
        TDB_DATA outdata;
        struct ctdb_marshall_buffer *reply;
-       struct ctdb_rec_data *rec;
+       struct ctdb_rec_data_old *recdata;
        int i;
        TALLOC_CTX *tmp_ctx = talloc_new(recdb);
 
@@ -644,21 +754,21 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
                talloc_free(tmp_ctx);
                return -1;
        }
-       
-       rec = (struct ctdb_rec_data *)&reply->data[0];
-       
+
+       recdata = (struct ctdb_rec_data_old *)&reply->data[0];
+
        for (i=0;
             i<reply->count;
-            rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
+            recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
                TDB_DATA key, data;
                struct ctdb_ltdb_header *hdr;
                TDB_DATA existing;
-               
-               key.dptr = &rec->data[0];
-               key.dsize = rec->keylen;
-               data.dptr = &rec->data[key.dsize];
-               data.dsize = rec->datalen;
-               
+
+               key.dptr = &recdata->data[0];
+               key.dsize = recdata->keylen;
+               data.dptr = &recdata->data[key.dsize];
+               data.dsize = recdata->datalen;
+
                hdr = (struct ctdb_ltdb_header *)data.dptr;
 
                if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
@@ -669,11 +779,11 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
 
                /* fetch the existing record, if any */
                existing = tdb_fetch(recdb->tdb, key);
-               
+
                if (existing.dptr != NULL) {
                        struct ctdb_ltdb_header header;
                        if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
-                               DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
+                               DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
                                         (unsigned)existing.dsize, srcnode));
                                free(existing.dptr);
                                talloc_free(tmp_ctx);
@@ -682,15 +792,16 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
                        header = *(struct ctdb_ltdb_header *)existing.dptr;
                        free(existing.dptr);
                        if (!(header.rsn < hdr->rsn ||
-                             (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
+                             (header.dmaster != ctdb_get_pnn(ctdb) &&
+                              header.rsn == hdr->rsn))) {
                                continue;
                        }
                }
-               
+
                if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
                        DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
                        talloc_free(tmp_ctx);
-                       return -1;                              
+                       return -1;
                }
        }
 
@@ -747,7 +858,7 @@ static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, in
 
 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
                                struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map *nodemap, 
+                               struct ctdb_node_map_old *nodemap, 
                                struct tdb_wrap *recdb, uint32_t dbid)
 {
        TALLOC_CTX *tmp_ctx = talloc_new(NULL);
@@ -818,7 +929,7 @@ static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
  */
 static int pull_remote_database(struct ctdb_context *ctdb,
                                struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map *nodemap, 
+                               struct ctdb_node_map_old *nodemap, 
                                struct tdb_wrap *recdb, uint32_t dbid,
                                bool persistent)
 {
@@ -855,7 +966,7 @@ static int pull_remote_database(struct ctdb_context *ctdb,
 /*
   update flags on all active nodes
  */
-static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
+static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
 {
        int ret;
 
@@ -871,7 +982,7 @@ static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node
 /*
   ensure all nodes have the same vnnmap we do
  */
-static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
                                      uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
 {
        int j, ret;
@@ -894,17 +1005,6 @@ static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_nod
 }
 
 
-struct vacuum_info {
-       struct vacuum_info *next, *prev;
-       struct ctdb_recoverd *rec;
-       uint32_t srcnode;
-       struct ctdb_db_context *ctdb_db;
-       struct ctdb_marshall_buffer *recs;
-       struct ctdb_rec_data *r;
-};
-
-static void vacuum_fetch_next(struct vacuum_info *v);
-
 /*
   called when a vacuum fetch has completed - just free it and do the next one
  */
@@ -914,179 +1014,138 @@ static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
 }
 
 
-/*
-  process the next element from the vacuum list
-*/
-static void vacuum_fetch_next(struct vacuum_info *v)
+/**
+ * Process one elements of the vacuum fetch list:
+ * Migrate it over to us with the special flag
+ * CTDB_CALL_FLAG_VACUUM_MIGRATION.
+ */
+static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
+                                    uint32_t pnn,
+                                    struct ctdb_rec_data_old *r)
 {
+       struct ctdb_client_call_state *state;
+       TDB_DATA data;
+       struct ctdb_ltdb_header *hdr;
        struct ctdb_call call;
-       struct ctdb_rec_data *r;
 
-       while (v->recs->count) {
-               struct ctdb_client_call_state *state;
-               TDB_DATA data;
-               struct ctdb_ltdb_header *hdr;
+       ZERO_STRUCT(call);
+       call.call_id = CTDB_NULL_FUNC;
+       call.flags = CTDB_IMMEDIATE_MIGRATION;
+       call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
 
-               ZERO_STRUCT(call);
-               call.call_id = CTDB_NULL_FUNC;
-               call.flags = CTDB_IMMEDIATE_MIGRATION;
-               call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
+       call.key.dptr = &r->data[0];
+       call.key.dsize = r->keylen;
 
-               r = v->r;
-               v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
-               v->recs->count--;
-
-               call.key.dptr = &r->data[0];
-               call.key.dsize = r->keylen;
-
-               /* ensure we don't block this daemon - just skip a record if we can't get
-                  the chainlock */
-               if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
-                       continue;
-               }
-
-               data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
-               if (data.dptr == NULL) {
-                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
-                       continue;
-               }
+       /* ensure we don't block this daemon - just skip a record if we can't get
+          the chainlock */
+       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
+               return true;
+       }
 
-               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
-                       free(data.dptr);
-                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
-                       continue;
-               }
-               
-               hdr = (struct ctdb_ltdb_header *)data.dptr;
-               if (hdr->dmaster == v->rec->ctdb->pnn) {
-                       /* its already local */
-                       free(data.dptr);
-                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
-                       continue;
-               }
+       data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
+       if (data.dptr == NULL) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
+               return true;
+       }
 
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
                free(data.dptr);
+               tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
+               return true;
+       }
 
-               state = ctdb_call_send(v->ctdb_db, &call);
-               tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
-               if (state == NULL) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
-                       talloc_free(v);
-                       return;
-               }
-               state->async.fn = vacuum_fetch_callback;
-               state->async.private_data = NULL;
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+       if (hdr->dmaster == pnn) {
+               /* its already local */
+               free(data.dptr);
+               tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
+               return true;
        }
 
-       talloc_free(v);
-}
+       free(data.dptr);
 
+       state = ctdb_call_send(ctdb_db, &call);
+       tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
+       if (state == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
+               return false;
+       }
+       state->async.fn = vacuum_fetch_callback;
+       state->async.private_data = NULL;
 
-/*
-  destroy a vacuum info structure
- */
-static int vacuum_info_destructor(struct vacuum_info *v)
-{
-       DLIST_REMOVE(v->rec->vacuum_info, v);
-       return 0;
+       return true;
 }
 
 
 /*
   handler for vacuum fetch
 */
-static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                                TDB_DATA data, void *private_data)
+static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
+                                void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       struct ctdb_context *ctdb = rec->ctdb;
        struct ctdb_marshall_buffer *recs;
        int ret, i;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        const char *name;
-       struct ctdb_dbid_map *dbmap=NULL;
+       struct ctdb_dbid_map_old *dbmap=NULL;
        bool persistent = false;
        struct ctdb_db_context *ctdb_db;
-       struct ctdb_rec_data *r;
-       uint32_t srcnode;
-       struct vacuum_info *v;
+       struct ctdb_rec_data_old *r;
 
        recs = (struct ctdb_marshall_buffer *)data.dptr;
-       r = (struct ctdb_rec_data *)&recs->data[0];
 
        if (recs->count == 0) {
-               talloc_free(tmp_ctx);
-               return;
-       }
-
-       srcnode = r->reqid;
-
-       for (v=rec->vacuum_info;v;v=v->next) {
-               if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
-                       /* we're already working on records from this node */
-                       talloc_free(tmp_ctx);
-                       return;
-               }
+               goto done;
        }
 
        /* work out if the database is persistent */
        ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
-               talloc_free(tmp_ctx);
-               return;
+               goto done;
        }
 
        for (i=0;i<dbmap->num;i++) {
-               if (dbmap->dbs[i].dbid == recs->db_id) {
+               if (dbmap->dbs[i].db_id == recs->db_id) {
                        persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
                        break;
                }
        }
        if (i == dbmap->num) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
-               talloc_free(tmp_ctx);
-               return;         
+               goto done;
        }
 
        /* find the name of this database */
        if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
-               talloc_free(tmp_ctx);
-               return;
+               goto done;
        }
 
        /* attach to it */
        ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
-               talloc_free(tmp_ctx);
-               return;
-       }
-
-       v = talloc_zero(rec, struct vacuum_info);
-       if (v == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
-               talloc_free(tmp_ctx);
-               return;
+               goto done;
        }
 
-       v->rec = rec;
-       v->srcnode = srcnode;
-       v->ctdb_db = ctdb_db;
-       v->recs = talloc_memdup(v, recs, data.dsize);
-       if (v->recs == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
-               talloc_free(v);
-               talloc_free(tmp_ctx);
-               return;         
-       }
-       v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
+       r = (struct ctdb_rec_data_old *)&recs->data[0];
+       while (recs->count) {
+               bool ok;
 
-       DLIST_ADD(rec->vacuum_info, v);
+               ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
+               if (!ok) {
+                       break;
+               }
 
-       talloc_set_destructor(v, vacuum_info_destructor);
+               r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
+               recs->count--;
+       }
 
-       vacuum_fetch_next(v);
+done:
        talloc_free(tmp_ctx);
 }
 
@@ -1094,13 +1153,13 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
 /*
  * handler for database detach
  */
-static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
-                                   TDB_DATA data, void *private_data)
+static void detach_database_handler(uint64_t srvid, TDB_DATA data,
+                                   void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data,
-                                                   struct ctdb_recoverd);
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       struct ctdb_context *ctdb = rec->ctdb;
        uint32_t db_id;
-       struct vacuum_info *v, *vnext;
        struct ctdb_db_context *ctdb_db;
 
        if (data.dsize != sizeof(db_id)) {
@@ -1114,17 +1173,6 @@ static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-       /* Stop any active vacuum fetch */
-       v = rec->vacuum_info;
-       while (v != NULL) {
-               vnext = v->next;
-
-               if (v->ctdb_db->db_id == db_id) {
-                       talloc_free(v);
-               }
-               v = vnext;
-       }
-
        DLIST_REMOVE(ctdb->db_list, ctdb_db);
 
        DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
@@ -1135,7 +1183,8 @@ static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
 /*
   called when ctdb_wait_timeout should finish
  */
-static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
+static void ctdb_wait_handler(struct tevent_context *ev,
+                             struct tevent_timer *te,
                              struct timeval yt, void *p)
 {
        uint32_t *timed_out = (uint32_t *)p;
@@ -1149,16 +1198,18 @@ static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
 {
        uint32_t timed_out = 0;
        time_t usecs = (secs - (time_t)secs) * 1000000;
-       event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
+       tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
+                        ctdb_wait_handler, &timed_out);
        while (!timed_out) {
-               event_loop_once(ctdb->ev);
+               tevent_loop_once(ctdb->ev);
        }
 }
 
 /*
   called when an election times out (ends)
  */
-static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
+static void ctdb_election_timeout(struct tevent_context *ev,
+                                 struct tevent_timer *te,
                                  struct timeval t, void *p)
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
@@ -1177,7 +1228,7 @@ static void ctdb_wait_election(struct ctdb_recoverd *rec)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        while (rec->election_timeout) {
-               event_loop_once(ctdb->ev);
+               tevent_loop_once(ctdb->ev);
        }
 }
 
@@ -1185,7 +1236,7 @@ static void ctdb_wait_election(struct ctdb_recoverd *rec)
   Update our local flags from all remote connected nodes. 
   This is only run when we are or we belive we are the recovery master
  */
-static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
+static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
 {
        int j;
        struct ctdb_context *ctdb = rec->ctdb;
@@ -1195,7 +1246,7 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
           they are the same as for this node
         */
        for (j=0; j<nodemap->num; j++) {
-               struct ctdb_node_map *remote_nodemap=NULL;
+               struct ctdb_node_map_old *remote_nodemap=NULL;
                int ret;
 
                if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
@@ -1242,7 +1293,7 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
 }
 
 
-/* Create a new random generation ip. 
+/* Create a new random generation id.
    The generation id can not be the INVALID_GENERATION id
 */
 static uint32_t new_generation(void)
@@ -1312,7 +1363,7 @@ struct recdb_data {
 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
 {
        struct recdb_data *params = (struct recdb_data *)p;
-       struct ctdb_rec_data *rec;
+       struct ctdb_rec_data_old *recdata;
        struct ctdb_ltdb_header *hdr;
 
        /*
@@ -1357,25 +1408,25 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
        }
 
        /* add the record to the blob ready to send to the nodes */
-       rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
-       if (rec == NULL) {
+       recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
+       if (recdata == NULL) {
                params->failed = true;
                return -1;
        }
-       if (params->len + rec->length >= params->allocated_len) {
-               params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
+       if (params->len + recdata->length >= params->allocated_len) {
+               params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
                params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
        }
        if (params->recdata == NULL) {
                DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
-                        rec->length + params->len));
+                        recdata->length + params->len));
                params->failed = true;
                return -1;
        }
        params->recdata->count++;
-       memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
-       params->len += rec->length;
-       talloc_free(rec);
+       memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
+       params->len += recdata->length;
+       talloc_free(recdata);
 
        return 0;
 }
@@ -1385,7 +1436,7 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
  */
 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
                               bool persistent,
-                              struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
+                              struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
 {
        struct recdb_data params;
        struct ctdb_marshall_buffer *recdata;
@@ -1457,14 +1508,14 @@ static int recover_database(struct ctdb_recoverd *rec,
                            uint32_t dbid,
                            bool persistent,
                            uint32_t pnn, 
-                           struct ctdb_node_map *nodemap,
+                           struct ctdb_node_map_old *nodemap,
                            uint32_t transaction_id)
 {
        struct tdb_wrap *recdb;
        int ret;
        struct ctdb_context *ctdb = rec->ctdb;
        TDB_DATA data;
-       struct ctdb_control_wipe_database w;
+       struct ctdb_control_transdb w;
        uint32_t *nodes;
 
        recdb = create_recdb(ctdb, mem_ctx);
@@ -1515,7 +1566,7 @@ static int recover_database(struct ctdb_recoverd *rec,
 
 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
                                         struct ctdb_recoverd *rec,
-                                        struct ctdb_node_map *nodemap,
+                                        struct ctdb_node_map_old *nodemap,
                                         uint32_t *culprit)
 {
        int j;
@@ -1566,7 +1617,7 @@ static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
                }
 
                if (ctdb->do_checkpublicip &&
-                   rec->takeover_runs_disable_ctx == NULL &&
+                   !ctdb_op_is_disabled(rec->takeover_run) &&
                    verify_remote_ip_allocation(ctdb,
                                                 node->known_public_ips,
                                                 node->pnn)) {
@@ -1678,7 +1729,7 @@ static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
 }
 
 static bool do_takeover_run(struct ctdb_recoverd *rec,
-                           struct ctdb_node_map *nodemap,
+                           struct ctdb_node_map_old *nodemap,
                            bool banning_credits_on_fail)
 {
        uint32_t *nodes = NULL;
@@ -1691,19 +1742,14 @@ static bool do_takeover_run(struct ctdb_recoverd *rec,
 
        DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
 
-       if (rec->takeover_run_in_progress) {
+       if (ctdb_op_is_in_progress(rec->takeover_run)) {
                DEBUG(DEBUG_ERR, (__location__
                                  " takeover run already in progress \n"));
                ok = false;
                goto done;
        }
 
-       rec->takeover_run_in_progress = true;
-
-       /* If takeover runs are in disabled then fail... */
-       if (rec->takeover_runs_disable_ctx != NULL) {
-               DEBUG(DEBUG_ERR,
-                     ("Takeover runs are disabled so refusing to run one\n"));
+       if (!ctdb_op_begin(rec->takeover_run)) {
                ok = false;
                goto done;
        }
@@ -1767,113 +1813,145 @@ static bool do_takeover_run(struct ctdb_recoverd *rec,
 done:
        rec->need_takeover_run = !ok;
        talloc_free(nodes);
-       rec->takeover_run_in_progress = false;
+       ctdb_op_end(rec->takeover_run);
 
        DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
        return ok;
 }
 
+struct recovery_helper_state {
+       int fd[2];
+       pid_t pid;
+       int result;
+       bool done;
+};
 
-/*
-  we are the recmaster, and recovery is needed - start a recovery run
- */
-static int do_recovery(struct ctdb_recoverd *rec, 
-                      TALLOC_CTX *mem_ctx, uint32_t pnn,
-                      struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
+static void ctdb_recovery_handler(struct tevent_context *ev,
+                                 struct tevent_fd *fde,
+                                 uint16_t flags, void *private_data)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
-       int i, j, ret;
-       uint32_t generation;
-       struct ctdb_dbid_map *dbmap;
-       TDB_DATA data;
-       uint32_t *nodes;
-       struct timeval start_time;
-       uint32_t culprit = (uint32_t)-1;
-       bool self_ban;
+       struct recovery_helper_state *state = talloc_get_type_abort(
+               private_data, struct recovery_helper_state);
+       int ret;
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
+       ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
+       if (ret != sizeof(state->result)) {
+               state->result = EPIPE;
+       }
 
-       /* if recovery fails, force it again */
-       rec->need_recovery = true;
+       state->done = true;
+}
 
-       if (rec->election_timeout) {
-               /* an election is in progress */
-               DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
-               return -1;
+
+static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
+{
+       static char prog[PATH_MAX+1] = "";
+       const char **args;
+       struct recovery_helper_state *state;
+       struct tevent_fd *fde;
+       int nargs, ret;
+
+       if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
+                            "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
+                            "ctdb_recovery_helper")) {
+               ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
        }
 
-       ban_misbehaving_nodes(rec, &self_ban);
-       if (self_ban) {
-               DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
+       state = talloc_zero(mem_ctx, struct recovery_helper_state);
+       if (state == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
                return -1;
        }
 
-        if (ctdb->recovery_lock_file != NULL) {
-               DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
-               start_time = timeval_current();
-               if (!ctdb_recovery_lock(ctdb, true)) {
-                       if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
-                               /* If ctdb is trying first recovery, it's
-                                * possible that current node does not know yet
-                                * who the recmaster is.
-                                */
-                               DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
-                                               " - retrying recovery\n"));
-                               return -1;
-                       }
+       state->pid = -1;
 
-                       DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
-                                        "and ban ourself for %u seconds\n",
-                                        ctdb->tunable.recovery_ban_period));
-                       ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
-                       return -1;
-               }
-               ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
-               DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
+       ret = pipe(state->fd);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to create pipe for recovery helper\n"));
+               goto fail;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
+       set_close_on_exec(state->fd[0]);
 
-       /* get a list of all databases */
-       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
-               return -1;
+       nargs = 4;
+       args = talloc_array(state, const char *, nargs);
+       if (args == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
+               goto fail;
        }
 
-       /* we do the db creation before we set the recovery mode, so the freeze happens
-          on all databases we will be dealing with. */
+       args[0] = talloc_asprintf(args, "%d", state->fd[1]);
+       args[1] = rec->ctdb->daemon.name;
+       args[2] = talloc_asprintf(args, "%u", new_generation());
+       args[3] = NULL;
 
-       /* verify that we have all the databases any other node has */
-       ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
-               return -1;
+       if (args[0] == NULL || args[2] == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
+               goto fail;
        }
 
-       /* verify that all other nodes have all our databases */
-       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
-               return -1;
+       if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
+                                    args, NULL, NULL, &state->pid)) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to create child for recovery helper\n"));
+               goto fail;
        }
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
 
-       /* update the database priority for all remote databases */
-       ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
+       close(state->fd[1]);
+       state->fd[1] = -1;
+
+       state->done = false;
+
+       fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
+                           TEVENT_FD_READ, ctdb_recovery_handler, state);
+       if (fde == NULL) {
+               goto fail;
        }
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
+       tevent_fd_set_auto_close(fde);
 
+       while (!state->done) {
+               tevent_loop_once(rec->ctdb->ev);
+       }
 
-       /* update all other nodes to use the same setting for reclock files
-          as the local recovery master.
-       */
-       sync_recovery_lock_file_across_cluster(rec);
+       close(state->fd[0]);
+       state->fd[0] = -1;
+
+       if (state->result != 0) {
+               goto fail;
+       }
+
+       ctdb_kill(rec->ctdb, state->pid, SIGKILL);
+       talloc_free(state);
+       return 0;
+
+fail:
+       if (state->fd[0] != -1) {
+               close(state->fd[0]);
+       }
+       if (state->fd[1] != -1) {
+               close(state->fd[1]);
+       }
+       if (state->pid != -1) {
+               ctdb_kill(rec->ctdb, state->pid, SIGKILL);
+       }
+       talloc_free(state);
+       return -1;
+}
+
+static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
+                             uint32_t pnn, struct ctdb_node_map_old *nodemap,
+                             struct ctdb_vnn_map *vnnmap,
+                             struct ctdb_dbid_map_old *dbmap)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+       uint32_t generation;
+       TDB_DATA data;
+       uint32_t *nodes;
+       int ret, i, j;
 
        /* set recovery mode to active on all nodes */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return -1;
@@ -1886,27 +1964,6 @@ static int do_recovery(struct ctdb_recoverd *rec,
                return -1;
        }
 
-       /*
-         update all nodes to have the same flags that we have
-        */
-       for (i=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
-                       continue;
-               }
-
-               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
-               if (ret != 0) {
-                       if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                               DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
-                       } else {
-                               DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
-                               return -1;
-                       }
-               }
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
-
        /* pick a new generation number */
        generation = new_generation();
 
@@ -1927,6 +1984,12 @@ static int do_recovery(struct ctdb_recoverd *rec,
                return -1;
        }
 
+       /* Database generations are updated when the transaction is commited to
+        * the databases.  So make sure to use the final generation as the
+        * transaction id
+        */
+       generation = new_generation();
+
        data.dptr = (void *)&generation;
        data.dsize = sizeof(uint32_t);
 
@@ -1953,11 +2016,11 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        for (i=0;i<dbmap->num;i++) {
                ret = recover_database(rec, mem_ctx,
-                                      dbmap->dbs[i].dbid,
+                                      dbmap->dbs[i].db_id,
                                       dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
                                       pnn, nodemap, generation);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
+                       DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
                        return -1;
                }
        }
@@ -1974,83 +2037,258 @@ static int do_recovery(struct ctdb_recoverd *rec,
                return -1;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
-       
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
+
+       /* build a new vnn map with all the currently active and
+          unbanned nodes */
+       vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
+       CTDB_NO_MEMORY(ctdb, vnnmap);
+       vnnmap->generation = generation;
+       vnnmap->size = 0;
+       vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
+       CTDB_NO_MEMORY(ctdb, vnnmap->map);
+       for (i=j=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               if (!ctdb_node_has_capabilities(rec->caps,
+                                               ctdb->nodes[i]->pnn,
+                                               CTDB_CAP_LMASTER)) {
+                       /* this node can not be an lmaster */
+                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
+                       continue;
+               }
+
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[j++] = nodemap->nodes[i].pnn;
+
+       }
+       if (vnnmap->size == 0) {
+               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[0] = pnn;
+       }
+
+       /* update to the new vnnmap on all nodes */
+       ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
+
+       /* update recmaster to point to us for all nodes */
+       ret = set_recovery_master(ctdb, nodemap, pnn);
+       if (ret!=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
+
+       /* disable recovery mode */
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
+
+       return 0;
+}
+
+/*
+  we are the recmaster, and recovery is needed - start a recovery run
+ */
+static int do_recovery(struct ctdb_recoverd *rec,
+                      TALLOC_CTX *mem_ctx, uint32_t pnn,
+                      struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+       int i, ret;
+       struct ctdb_dbid_map_old *dbmap;
+       struct timeval start_time;
+       uint32_t culprit = (uint32_t)-1;
+       bool self_ban;
+       bool par_recovery;
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
+
+       /* Check if the current node is still the recmaster.  It's possible that
+        * re-election has changed the recmaster, but we have not yet updated
+        * that information.
+        */
+       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(),
+                                    pnn, &ctdb->recovery_master);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster\n"));
+               return -1;
+       }
+
+       if (pnn != ctdb->recovery_master) {
+               DEBUG(DEBUG_NOTICE,
+                     ("Recovery master changed to %u, aborting recovery\n",
+                      ctdb->recovery_master));
+               return -1;
+       }
+
+       /* if recovery fails, force it again */
+       rec->need_recovery = true;
+
+       if (!ctdb_op_begin(rec->recovery)) {
+               return -1;
+       }
+
+       if (rec->election_timeout) {
+               /* an election is in progress */
+               DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
+               goto fail;
+       }
+
+       ban_misbehaving_nodes(rec, &self_ban);
+       if (self_ban) {
+               DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
+               goto fail;
+       }
+
+        if (ctdb->recovery_lock_file != NULL) {
+               if (ctdb_recovery_have_lock(ctdb)) {
+                       DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
+               } else {
+                       start_time = timeval_current();
+                       DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
+                                            ctdb->recovery_lock_file));
+                       if (!ctdb_recovery_lock(ctdb)) {
+                               if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
+                                       /* If ctdb is trying first recovery, it's
+                                        * possible that current node does not know
+                                        * yet who the recmaster is.
+                                        */
+                                       DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
+                                                         " - retrying recovery\n"));
+                                       goto fail;
+                               }
+
+                               DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
+                                                "and ban ourself for %u seconds\n",
+                                                ctdb->tunable.recovery_ban_period));
+                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
+                               goto fail;
+                       }
+                       ctdb_ctrl_report_recd_lock_latency(ctdb,
+                                                          CONTROL_TIMEOUT(),
+                                                          timeval_elapsed(&start_time));
+                       DEBUG(DEBUG_NOTICE,
+                             ("Recovery lock taken successfully by recovery daemon\n"));
+               }
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
+
+       /* get a list of all databases */
+       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
+               goto fail;
+       }
+
+       /* we do the db creation before we set the recovery mode, so the freeze happens
+          on all databases we will be dealing with. */
+
+       /* verify that we have all the databases any other node has */
+       ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
+               goto fail;
+       }
+
+       /* verify that all other nodes have all our databases */
+       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
+               goto fail;
+       }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
+
+       /* update the database priority for all remote databases */
+       ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
+       }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
+
+
+       /* update all other nodes to use the same setting for reclock files
+          as the local recovery master.
+       */
+       sync_recovery_lock_file_across_cluster(rec);
 
        /* update the capabilities for all nodes */
-       ret = update_capabilities(ctdb, nodemap);
+       ret = update_capabilities(rec, nodemap);
        if (ret!=0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
                return -1;
        }
 
-       /* build a new vnn map with all the currently active and
-          unbanned nodes */
-       generation = new_generation();
-       vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
-       CTDB_NO_MEMORY(ctdb, vnnmap);
-       vnnmap->generation = generation;
-       vnnmap->size = 0;
-       vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
-       CTDB_NO_MEMORY(ctdb, vnnmap->map);
-       for (i=j=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
-                       /* this node can not be an lmaster */
-                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
+       /*
+         update all nodes to have the same flags that we have
+        */
+       for (i=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
                        continue;
                }
 
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[j++] = nodemap->nodes[i].pnn;
-
+               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
+               if (ret != 0) {
+                       if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                               DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
+                       } else {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                               return -1;
+                       }
+               }
        }
-       if (vnnmap->size == 0) {
-               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[0] = pnn;
-       }       
 
-       /* update to the new vnnmap on all nodes */
-       ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
-               return -1;
-       }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
+       /* Check if all participating nodes have parallel recovery capability */
+       par_recovery = true;
+       for (i=0; i<nodemap->num; i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
 
-       /* update recmaster to point to us for all nodes */
-       ret = set_recovery_master(ctdb, nodemap, pnn);
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
-               return -1;
+               if (!(rec->caps[i].capabilities &
+                     CTDB_CAP_PARALLEL_RECOVERY)) {
+                       par_recovery = false;
+                       break;
+               }
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
+       if (par_recovery) {
+               ret = db_recovery_parallel(rec, mem_ctx);
+       } else {
+               ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
+                                        dbmap);
+       }
 
-       /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
-               return -1;
+               goto fail;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
-
        /* Fetch known/available public IPs from each active node */
        ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
                                 culprit));
                rec->need_takeover_run = true;
-               return -1;
+               goto fail;
        }
 
        do_takeover_run(rec, nodemap, false);
@@ -2059,7 +2297,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
        ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
        if (ret!=0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
-               return -1;
+               goto fail;
        }
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
@@ -2070,12 +2308,13 @@ static int do_recovery(struct ctdb_recoverd *rec,
                                       CTDB_SRVID_RECONFIGURE, tdb_null);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
-               return -1;
+               goto fail;
        }
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
 
        rec->need_recovery = false;
+       ctdb_op_end(rec->recovery);
 
        /* we managed to complete a full recovery, make sure to forgive
           any past sins by the nodes that could now participate in the
@@ -2097,16 +2336,18 @@ static int do_recovery(struct ctdb_recoverd *rec,
                ban_state->count = 0;
        }
 
-
-       /* We just finished a recovery successfully. 
-          We now wait for rerecovery_timeout before we allow 
+       /* We just finished a recovery successfully.
+          We now wait for rerecovery_timeout before we allow
           another recovery to take place.
        */
        DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
-       ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
-       DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
-
+       ctdb_op_disable(rec->recovery, ctdb->ev,
+                       ctdb->tunable.rerecovery_timeout);
        return 0;
+
+fail:
+       ctdb_op_end(rec->recovery);
+       return -1;
 }
 
 
@@ -2127,7 +2368,7 @@ struct election_message {
 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
 {
        int ret, i;
-       struct ctdb_node_map *nodemap;
+       struct ctdb_node_map_old *nodemap;
        struct ctdb_context *ctdb = rec->ctdb;
 
        ZERO_STRUCTP(em);
@@ -2194,11 +2435,6 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
                return true;
        }
 
-       /* try to use the most connected node */
-       if (cmp == 0) {
-               cmp = (int)myem.num_connected - (int)em->num_connected;
-       }
-
        /* then the longest running node */
        if (cmp == 0) {
                cmp = timeval_compare(&em->priority_time, &myem.priority_time);
@@ -2251,7 +2487,7 @@ static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
 static void unban_all_nodes(struct ctdb_context *ctdb)
 {
        int ret, i;
-       struct ctdb_node_map *nodemap;
+       struct ctdb_node_map_old *nodemap;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
@@ -2279,7 +2515,9 @@ static void unban_all_nodes(struct ctdb_context *ctdb)
 /*
   we think we are winning the election - send a broadcast election request
  */
-static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
+static void election_send_request(struct tevent_context *ev,
+                                 struct tevent_timer *te,
+                                 struct timeval t, void *p)
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
        int ret;
@@ -2296,9 +2534,11 @@ static void election_send_request(struct event_context *ev, struct timed_event *
 /*
   handler for memory dumps
 */
-static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
 {
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       struct ctdb_context *ctdb = rec->ctdb;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA *dump;
        int ret;
@@ -2339,10 +2579,11 @@ DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));
 /*
   handler for reload_nodes
 */
-static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
+                                void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
 
        DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
 
@@ -2350,8 +2591,8 @@ static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid,
 }
 
 
-static void ctdb_rebalance_timeout(struct event_context *ev,
-                                  struct timed_event *te,
+static void ctdb_rebalance_timeout(struct tevent_context *ev,
+                                  struct tevent_timer *te,
                                   struct timeval t, void *p)
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
@@ -2367,16 +2608,17 @@ static void ctdb_rebalance_timeout(struct event_context *ev,
        do_takeover_run(rec, rec->nodemap, false);
 }
 
-       
-static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
-                                       uint64_t srvid,
-                                       TDB_DATA data, void *private_data)
+
+static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
+                                       void *private_data)
 {
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       struct ctdb_context *ctdb = rec->ctdb;
        uint32_t pnn;
        uint32_t *t;
        int len;
        uint32_t deferred_rebalance;
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
 
        if (rec->recmaster != ctdb_get_pnn(ctdb)) {
                return;
@@ -2424,18 +2666,19 @@ static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
         */
        deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
        if (deferred_rebalance != 0) {
-               event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
-                               timeval_current_ofs(deferred_rebalance, 0),
-                               ctdb_rebalance_timeout, rec);
+               tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
+                                timeval_current_ofs(deferred_rebalance, 0),
+                                ctdb_rebalance_timeout, rec);
        }
 }
 
 
 
-static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
+                                  void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
        struct ctdb_public_ip *ip;
 
        if (rec->recmaster != rec->ctdb->pnn) {
@@ -2453,28 +2696,10 @@ static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid,
        update_ip_assignment_tree(rec->ctdb, ip);
 }
 
-
-static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
-{
-       TALLOC_FREE(rec->takeover_runs_disable_ctx);
-}
-
-static void reenable_takeover_runs(struct event_context *ev,
-                                  struct timed_event *te,
-                                  struct timeval yt, void *p)
+static void srvid_disable_and_reply(struct ctdb_context *ctdb,
+                                   TDB_DATA data,
+                                   struct ctdb_op_state *op_state)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
-       clear_takeover_runs_disable(rec);
-}
-
-static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
-                                         uint64_t srvid, TDB_DATA data,
-                                         void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data,
-                                                   struct ctdb_recoverd);
        struct srvid_request_data *r;
        uint32_t timeout;
        TDB_DATA result;
@@ -2495,41 +2720,11 @@ static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
        r = (struct srvid_request_data *)data.dptr;
        timeout = r->data;
 
-       if (timeout == 0) {
-               DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
-               clear_takeover_runs_disable(rec);
-               ret = ctdb_get_pnn(ctdb);
-               goto done;
-       }
-
-       if (rec->takeover_run_in_progress) {
-               DEBUG(DEBUG_ERR,
-                     ("Unable to disable takeover runs - in progress\n"));
-               ret = -EAGAIN;
-               goto done;
-       }
-
-       DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
-
-       /* Clear any old timers */
-       clear_takeover_runs_disable(rec);
-
-       /* When this is non-NULL it indicates that takeover runs are
-        * disabled.  This context also holds the timeout timer.
-        */
-       rec->takeover_runs_disable_ctx = talloc_new(rec);
-       if (rec->takeover_runs_disable_ctx == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
-               ret = -ENOMEM;
+       ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
+       if (ret != 0) {
                goto done;
        }
 
-       /* Arrange for the timeout to occur */
-       event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
-                       timeval_current_ofs(timeout, 0),
-                       reenable_takeover_runs,
-                       rec);
-
        /* Returning our PNN tells the caller that we succeeded */
        ret = ctdb_get_pnn(ctdb);
 done:
@@ -2538,16 +2733,22 @@ done:
        srvid_request_reply(ctdb, (struct srvid_request *)r, result);
 }
 
-/* Backward compatibility for this SRVID - call
- * disable_takeover_runs_handler() instead
- */
-static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
-                                    TDB_DATA data, void *private_data)
+static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
+                                         void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+
+       srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
+}
+
+/* Backward compatibility for this SRVID */
+static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
+                                    void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data,
-                                                   struct ctdb_recoverd);
-       TDB_DATA data2;
-       struct srvid_request_data *req;
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       uint32_t timeout;
 
        if (data.dsize != sizeof(uint32_t)) {
                DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
@@ -2560,19 +2761,18 @@ static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-       req = talloc(ctdb, struct srvid_request_data);
-       CTDB_NO_MEMORY_VOID(ctdb, req);
+       timeout = *((uint32_t *)data.dptr);
 
-       req->srvid = 0; /* No reply */
-       req->pnn = -1;
-       req->data = *((uint32_t *)data.dptr); /* Timeout */
+       ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
+}
 
-       data2.dsize = sizeof(*req);
-       data2.dptr = (uint8_t *)req;
+static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
+                                      void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
 
-       disable_takeover_runs_handler(rec->ctdb,
-                                     CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
-                                     data2, rec);
+       srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
 }
 
 /*
@@ -2580,12 +2780,12 @@ static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
   handle this later in the monitor_cluster loop so we do not recurse
   with other requests to takeover_run()
 */
-static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
-                                 TDB_DATA data, void *private_data)
+static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
+                                 void *private_data)
 {
        struct srvid_request *request;
-       struct ctdb_recoverd *rec = talloc_get_type(private_data,
-                                                   struct ctdb_recoverd);
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
 
        if (data.dsize != sizeof(struct srvid_request)) {
                DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
@@ -2594,7 +2794,7 @@ static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        request = (struct srvid_request *)data.dptr;
 
-       srvid_request_add(ctdb, &rec->reallocate_requests, request);
+       srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
 }
 
 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
@@ -2642,13 +2842,13 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb,
 /*
   handler for recovery master elections
 */
-static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       struct ctdb_context *ctdb = rec->ctdb;
        int ret;
        struct election_message *em = (struct election_message *)data.dptr;
-       TALLOC_CTX *mem_ctx;
 
        /* Ignore election packets from ourself */
        if (ctdb->pnn == em->pnn) {
@@ -2657,13 +2857,12 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        /* we got an election packet - update the timeout for the election */
        talloc_free(rec->election_timeout);
-       rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
-                                               fast_start ?
-                                               timeval_current_ofs(0, 500000) :
-                                               timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
-                                               ctdb_election_timeout, rec);
-
-       mem_ctx = talloc_new(ctdb);
+       rec->election_timeout = tevent_add_timer(
+                       ctdb->ev, ctdb,
+                       fast_start ?
+                               timeval_current_ofs(0, 500000) :
+                               timeval_current_ofs(ctdb->tunable.election_timeout, 0),
+                       ctdb_election_timeout, rec);
 
        /* someone called an election. check their election data
           and if we disagree and we would rather be the elected node, 
@@ -2671,39 +2870,33 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
         */
        if (ctdb_election_win(rec, em)) {
                if (!rec->send_election_te) {
-                       rec->send_election_te = event_add_timed(ctdb->ev, rec, 
-                                                               timeval_current_ofs(0, 500000),
-                                                               election_send_request, rec);
+                       rec->send_election_te = tevent_add_timer(
+                                       ctdb->ev, rec,
+                                       timeval_current_ofs(0, 500000),
+                                       election_send_request, rec);
                }
-               talloc_free(mem_ctx);
                /*unban_all_nodes(ctdb);*/
                return;
        }
 
        /* we didn't win */
-       talloc_free(rec->send_election_te);
-       rec->send_election_te = NULL;
+       TALLOC_FREE(rec->send_election_te);
 
-        if (ctdb->recovery_lock_file != NULL) {
-               /* Release the recovery lock file */
-               if (em->pnn != ctdb->pnn &&
-                   ctdb_recovery_have_lock(ctdb)) {
-                       DEBUG(DEBUG_NOTICE, ("Release the recovery lock\n"));
-                       close(ctdb->recovery_lock_fd);
-                       ctdb->recovery_lock_fd = -1;
-                       unban_all_nodes(ctdb);
-               }
+       /* Release the recovery lock file */
+       if (ctdb_recovery_have_lock(ctdb)) {
+               ctdb_recovery_unlock(ctdb);
+               unban_all_nodes(ctdb);
        }
 
+       clear_ip_assignment_tree(ctdb);
+
        /* ok, let that guy become recmaster then */
        ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
-               talloc_free(mem_ctx);
                return;
        }
 
-       talloc_free(mem_ctx);
        return;
 }
 
@@ -2712,7 +2905,7 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
   force the start of the election process
  */
 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
-                          struct ctdb_node_map *nodemap)
+                          struct ctdb_node_map_old *nodemap)
 {
        int ret;
        struct ctdb_context *ctdb = rec->ctdb;
@@ -2720,18 +2913,19 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
        DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
 
        /* set all nodes to recovery mode to stop all internode traffic */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
        }
 
        talloc_free(rec->election_timeout);
-       rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
-                                               fast_start ?
-                                               timeval_current_ofs(0, 500000) :
-                                               timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
-                                               ctdb_election_timeout, rec);
+       rec->election_timeout = tevent_add_timer(
+                       ctdb->ev, ctdb,
+                       fast_start ?
+                               timeval_current_ofs(0, 500000) :
+                               timeval_current_ofs(ctdb->tunable.election_timeout, 0),
+                       ctdb_election_timeout, rec);
 
        ret = send_election_request(rec, pnn);
        if (ret!=0) {
@@ -2748,15 +2942,16 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
 /*
   handler for when a node changes its flags
 */
-static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                           TDB_DATA data, void *private_data)
+static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
 {
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       struct ctdb_context *ctdb = rec->ctdb;
        int ret;
        struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
-       struct ctdb_node_map *nodemap=NULL;
+       struct ctdb_node_map_old *nodemap=NULL;
        TALLOC_CTX *tmp_ctx;
        int i;
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
        int disabled_flag_changed;
 
        if (data.dsize != sizeof(*c)) {
@@ -2822,12 +3017,15 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
 /*
   handler for when we need to push out flag changes ot all other nodes
 */
-static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                           TDB_DATA data, void *private_data)
+static void push_flags_handler(uint64_t srvid, TDB_DATA data,
+                              void *private_data)
 {
+       struct ctdb_recoverd *rec = talloc_get_type(
+               private_data, struct ctdb_recoverd);
+       struct ctdb_context *ctdb = rec->ctdb;
        int ret;
        struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
-       struct ctdb_node_map *nodemap=NULL;
+       struct ctdb_node_map_old *nodemap=NULL;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        uint32_t recmaster;
        uint32_t *nodes;
@@ -2907,7 +3105,7 @@ static void verify_recmode_normal_callback(struct ctdb_client_control_state *sta
 
 
 /* verify that all nodes are in normal recovery mode */
-static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
 {
        struct verify_recmode_normal_data *rmdata;
        TALLOC_CTX *mem_ctx = talloc_new(ctdb);
@@ -2951,7 +3149,7 @@ static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb
           or until all nodes we expect a response from has replied
        */
        while (rmdata->count > 0) {
-               event_loop_once(ctdb->ev);
+               tevent_loop_once(ctdb->ev);
        }
 
        status = rmdata->status;
@@ -2999,7 +3197,7 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
 
 
 /* verify that all nodes agree that we are the recmaster */
-static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
+static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        struct verify_recmaster_data *rmdata;
@@ -3046,7 +3244,7 @@ static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ct
           or until all nodes we expect a response from has replied
        */
        while (rmdata->count > 0) {
-               event_loop_once(ctdb->ev);
+               tevent_loop_once(ctdb->ev);
        }
 
        status = rmdata->status;
@@ -3116,7 +3314,7 @@ static bool interfaces_have_changed(struct ctdb_context *ctdb,
 
 /* called to check that the local allocation of public ip addresses is ok.
 */
-static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
+static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
        struct ctdb_uptime *uptime1 = NULL;
@@ -3181,7 +3379,7 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_rec
           we also request a ip reallocation.
        */
        if (ctdb->tunable.disable_ip_failover == 0) {
-               struct ctdb_all_public_ips *ips = NULL;
+               struct ctdb_public_ip_list_old *ips = NULL;
 
                /* read the *available* IPs from the local node */
                ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
@@ -3255,20 +3453,20 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_rec
 
 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
 {
-       struct ctdb_node_map **remote_nodemaps = callback_data;
+       struct ctdb_node_map_old **remote_nodemaps = callback_data;
 
        if (node_pnn >= ctdb->num_nodes) {
                DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
                return;
        }
 
-       remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
+       remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
 
 }
 
 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
-       struct ctdb_node_map *nodemap,
-       struct ctdb_node_map **remote_nodemaps)
+       struct ctdb_node_map_old *nodemap,
+       struct ctdb_node_map_old **remote_nodemaps)
 {
        uint32_t *nodes;
 
@@ -3287,181 +3485,6 @@ static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
        return 0;
 }
 
-enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
-struct ctdb_check_reclock_state {
-       struct ctdb_context *ctdb;
-       struct timeval start_time;
-       int fd[2];
-       pid_t child;
-       struct timed_event *te;
-       struct fd_event *fde;
-       enum reclock_child_status status;
-};
-
-/* when we free the reclock state we must kill any child process.
-*/
-static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
-{
-       struct ctdb_context *ctdb = state->ctdb;
-
-       ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
-
-       if (state->fd[0] != -1) {
-               close(state->fd[0]);
-               state->fd[0] = -1;
-       }
-       if (state->fd[1] != -1) {
-               close(state->fd[1]);
-               state->fd[1] = -1;
-       }
-       ctdb_kill(ctdb, state->child, SIGKILL);
-       return 0;
-}
-
-/*
-  called if our check_reclock child times out. this would happen if
-  i/o to the reclock file blocks.
- */
-static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
-                                        struct timeval t, void *private_data)
-{
-       struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
-                                          struct ctdb_check_reclock_state);
-
-       DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
-       state->status = RECLOCK_TIMEOUT;
-}
-
-/* this is called when the child process has completed checking the reclock
-   file and has written data back to us through the pipe.
-*/
-static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
-                            uint16_t flags, void *private_data)
-{
-       struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
-                                            struct ctdb_check_reclock_state);
-       char c = 0;
-       int ret;
-
-       /* we got a response from our child process so we can abort the
-          timeout.
-       */
-       talloc_free(state->te);
-       state->te = NULL;
-
-       ret = sys_read(state->fd[0], &c, 1);
-       if (ret != 1 || c != RECLOCK_OK) {
-               DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
-               state->status = RECLOCK_FAILED;
-
-               return;
-       }
-
-       state->status = RECLOCK_OK;
-       return;
-}
-
-static int check_recovery_lock(struct ctdb_context *ctdb)
-{
-       int ret;
-       struct ctdb_check_reclock_state *state;
-       pid_t parent = getpid();
-
-       if (ctdb->recovery_lock_fd == -1) {
-               DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
-               return -1;
-       }
-
-       state = talloc(ctdb, struct ctdb_check_reclock_state);
-       CTDB_NO_MEMORY(ctdb, state);
-
-       state->ctdb = ctdb;
-       state->start_time = timeval_current();
-       state->status = RECLOCK_CHECKING;
-       state->fd[0] = -1;
-       state->fd[1] = -1;
-
-       ret = pipe(state->fd);
-       if (ret != 0) {
-               talloc_free(state);
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
-               return -1;
-       }
-
-       state->child = ctdb_fork(ctdb);
-       if (state->child == (pid_t)-1) {
-               DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
-               close(state->fd[0]);
-               state->fd[0] = -1;
-               close(state->fd[1]);
-               state->fd[1] = -1;
-               talloc_free(state);
-               return -1;
-       }
-
-       if (state->child == 0) {
-               char cc = RECLOCK_OK;
-               close(state->fd[0]);
-               state->fd[0] = -1;
-
-               ctdb_set_process_name("ctdb_rec_reclock");
-               debug_extra = talloc_asprintf(NULL, "recovery-lock:");
-               if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
-                       DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
-                       cc = RECLOCK_FAILED;
-               }
-
-               sys_write(state->fd[1], &cc, 1);
-               /* make sure we die when our parent dies */
-               while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
-                       sleep(5);
-               }
-               _exit(0);
-       }
-       close(state->fd[1]);
-       state->fd[1] = -1;
-       set_close_on_exec(state->fd[0]);
-
-       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
-
-       talloc_set_destructor(state, check_reclock_destructor);
-
-       state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
-                                   ctdb_check_reclock_timeout, state);
-       if (state->te == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
-               talloc_free(state);
-               return -1;
-       }
-
-       state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
-                               EVENT_FD_READ,
-                               reclock_child_handler,
-                               (void *)state);
-
-       if (state->fde == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
-               talloc_free(state);
-               return -1;
-       }
-       tevent_fd_set_auto_close(state->fde);
-
-       while (state->status == RECLOCK_CHECKING) {
-               event_loop_once(ctdb->ev);
-       }
-
-       if (state->status == RECLOCK_FAILED) {
-               DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
-               close(ctdb->recovery_lock_fd);
-               ctdb->recovery_lock_fd = -1;
-               talloc_free(state);
-               return -1;
-       }
-
-       talloc_free(state);
-       return 0;
-}
-
 static int update_recovery_lock_file(struct ctdb_context *ctdb)
 {
        TALLOC_CTX *tmp_ctx = talloc_new(NULL);
@@ -3475,24 +3498,20 @@ static int update_recovery_lock_file(struct ctdb_context *ctdb)
 
        if (reclockfile == NULL) {
                if (ctdb->recovery_lock_file != NULL) {
-                       DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
+                       DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
                        talloc_free(ctdb->recovery_lock_file);
                        ctdb->recovery_lock_file = NULL;
-                       if (ctdb->recovery_lock_fd != -1) {
-                               close(ctdb->recovery_lock_fd);
-                               ctdb->recovery_lock_fd = -1;
-                       }
+                       ctdb_recovery_unlock(ctdb);
                }
                talloc_free(tmp_ctx);
                return 0;
        }
 
        if (ctdb->recovery_lock_file == NULL) {
+               DEBUG(DEBUG_NOTICE,
+                     ("Recovery lock file enabled (%s)\n", reclockfile));
                ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
-               if (ctdb->recovery_lock_fd != -1) {
-                       close(ctdb->recovery_lock_fd);
-                       ctdb->recovery_lock_fd = -1;
-               }
+               ctdb_recovery_unlock(ctdb);
                talloc_free(tmp_ctx);
                return 0;
        }
@@ -3503,12 +3522,11 @@ static int update_recovery_lock_file(struct ctdb_context *ctdb)
                return 0;
        }
 
+       DEBUG(DEBUG_NOTICE,
+             ("Recovery lock file changed (now %s)\n", reclockfile));
        talloc_free(ctdb->recovery_lock_file);
        ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
-       if (ctdb->recovery_lock_fd != -1) {
-               close(ctdb->recovery_lock_fd);
-               ctdb->recovery_lock_fd = -1;
-       }
+       ctdb_recovery_unlock(ctdb);
 
        talloc_free(tmp_ctx);
        return 0;
@@ -3518,11 +3536,12 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                      TALLOC_CTX *mem_ctx)
 {
        uint32_t pnn;
-       struct ctdb_node_map *nodemap=NULL;
-       struct ctdb_node_map *recmaster_nodemap=NULL;
-       struct ctdb_node_map **remote_nodemaps=NULL;
+       struct ctdb_node_map_old *nodemap=NULL;
+       struct ctdb_node_map_old *recmaster_nodemap=NULL;
+       struct ctdb_node_map_old **remote_nodemaps=NULL;
        struct ctdb_vnn_map *vnnmap=NULL;
        struct ctdb_vnn_map *remote_vnnmap=NULL;
+       uint32_t num_lmasters;
        int32_t debug_level;
        int i, j, ret;
        bool self_ban;
@@ -3575,10 +3594,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
           we close the file
        */
         if (ctdb->recovery_lock_file == NULL) {
-               if (ctdb->recovery_lock_fd != -1) {
-                       close(ctdb->recovery_lock_fd);
-                       ctdb->recovery_lock_fd = -1;
-               }
+               ctdb_recovery_unlock(ctdb);
        }
 
        pnn = ctdb_get_pnn(ctdb);
@@ -3684,7 +3700,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
        /* update the capabilities for all nodes */
-       ret = update_capabilities(ctdb, nodemap);
+       ret = update_capabilities(rec, nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
                return;
@@ -3695,9 +3711,11 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
         * but we have, then force an election and try to become the new
         * recmaster.
         */
-       if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
+       if (!ctdb_node_has_capabilities(rec->caps,
+                                       rec->recmaster,
+                                       CTDB_CAP_RECMASTER) &&
            (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
-            !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
+           !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
                DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
                                  " but we (node %u) have - force an election\n",
                                  rec->recmaster, pnn));
@@ -3705,23 +3723,6 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       /* count how many active nodes there are */
-       rec->num_active    = 0;
-       rec->num_lmasters  = 0;
-       rec->num_connected = 0;
-       for (i=0; i<nodemap->num; i++) {
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                       rec->num_active++;
-                       if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
-                               rec->num_lmasters++;
-                       }
-               }
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
-                       rec->num_connected++;
-               }
-       }
-
-
        /* verify that the recmaster node is still active */
        for (j=0; j<nodemap->num; j++) {
                if (nodemap->nodes[j].pnn==rec->recmaster) {
@@ -3769,7 +3770,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
         * have addresses we shouldnt have.
         */ 
        if (ctdb->tunable.disable_ip_failover == 0 &&
-           rec->takeover_runs_disable_ctx == NULL) {
+           !ctdb_op_is_disabled(rec->takeover_run)) {
                if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
                }
@@ -3840,10 +3841,9 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
 
 
         if (ctdb->recovery_lock_file != NULL) {
-               /* we should have the reclock - check its not stale */
-               ret = check_recovery_lock(ctdb);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
+               /* We must already hold the recovery lock */
+               if (!ctdb_recovery_have_lock(ctdb)) {
+                       DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
                        ctdb_set_culprit(rec, ctdb->pnn);
                        do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                        return;
@@ -3852,14 +3852,22 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
 
 
        /* if there are takeovers requested, perform it and notify the waiters */
-       if (rec->takeover_runs_disable_ctx == NULL &&
+       if (!ctdb_op_is_disabled(rec->takeover_run) &&
            rec->reallocate_requests) {
                process_ipreallocate_requests(ctdb, rec);
        }
 
+       /* If recoveries are disabled then there is no use doing any
+        * nodemap or flags checks.  Recoveries might be disabled due
+        * to "reloadnodes", so doing these checks might cause an
+        * unnecessary recovery.  */
+       if (ctdb_op_is_disabled(rec->recovery)) {
+               return;
+       }
+
        /* get the nodemap for all active remote nodes
         */
-       remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
+       remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
        if (remote_nodemaps == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
                return;
@@ -3962,13 +3970,26 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
 
+       /* count how many active nodes there are */
+       num_lmasters  = 0;
+       for (i=0; i<nodemap->num; i++) {
+               if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
+                       if (ctdb_node_has_capabilities(rec->caps,
+                                                      ctdb->nodes[i]->pnn,
+                                                      CTDB_CAP_LMASTER)) {
+                               num_lmasters++;
+                       }
+               }
+       }
+
+
        /* There must be the same number of lmasters in the vnn map as
         * there are active nodes with the lmaster capability...  or
         * do a recovery.
         */
-       if (vnnmap->size != rec->num_lmasters) {
+       if (vnnmap->size != num_lmasters) {
                DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
-                         vnnmap->size, rec->num_lmasters));
+                         vnnmap->size, num_lmasters));
                ctdb_set_culprit(rec, ctdb->pnn);
                do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                return;
@@ -4119,7 +4140,11 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        rec->ctdb = ctdb;
 
-       rec->takeover_run_in_progress = false;
+       rec->takeover_run = ctdb_op_init(rec, "takeover runs");
+       CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
+
+       rec->recovery = ctdb_op_init(rec, "recoveries");
+       CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
 
        rec->priority_time = timeval_current();
 
@@ -4159,6 +4184,11 @@ static void monitor_cluster(struct ctdb_context *ctdb)
                                        CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
                                        disable_takeover_runs_handler, rec);
 
+       /* Register a message port for disabling recoveries */
+       ctdb_client_set_message_handler(ctdb,
+                                       CTDB_SRVID_DISABLE_RECOVERIES,
+                                       disable_recoveries_handler, rec);
+
        /* register a message port for detaching database */
        ctdb_client_set_message_handler(ctdb,
                                        CTDB_SRVID_DETACH_DATABASE,
@@ -4191,7 +4221,8 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 /*
   event handler for when the main ctdbd dies
  */
-static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
+static void ctdb_recoverd_parent(struct tevent_context *ev,
+                                struct tevent_fd *fde,
                                 uint16_t flags, void *private_data)
 {
        DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
@@ -4201,29 +4232,30 @@ static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde,
 /*
   called regularly to verify that the recovery daemon is still running
  */
-static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
-                             struct timeval yt, void *p)
+static void ctdb_check_recd(struct tevent_context *ev,
+                           struct tevent_timer *te,
+                           struct timeval yt, void *p)
 {
        struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
 
        if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
                DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
 
-               event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
-                               ctdb_restart_recd, ctdb);
+               tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
+                                ctdb_restart_recd, ctdb);
 
                return;
        }
 
-       event_add_timed(ctdb->ev, ctdb->recd_ctx,
-                       timeval_current_ofs(30, 0),
-                       ctdb_check_recd, ctdb);
+       tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
+                        timeval_current_ofs(30, 0),
+                        ctdb_check_recd, ctdb);
 }
 
-static void recd_sig_child_handler(struct event_context *ev,
-       struct signal_event *se, int signum, int count,
-       void *dont_care, 
-       void *private_data)
+static void recd_sig_child_handler(struct tevent_context *ev,
+                                  struct tevent_signal *se, int signum,
+                                  int count, void *dont_care,
+                                  void *private_data)
 {
 //     struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
        int status;
@@ -4249,7 +4281,7 @@ static void recd_sig_child_handler(struct event_context *ev,
 int ctdb_start_recoverd(struct ctdb_context *ctdb)
 {
        int fd[2];
-       struct signal_event *se;
+       struct tevent_signal *se;
        struct tevent_fd *fde;
 
        if (pipe(fd) != 0) {
@@ -4267,9 +4299,9 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
                CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
 
                close(fd[0]);
-               event_add_timed(ctdb->ev, ctdb->recd_ctx,
-                               timeval_current_ofs(30, 0),
-                               ctdb_check_recd, ctdb);
+               tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
+                                timeval_current_ofs(30, 0),
+                                ctdb_check_recd, ctdb);
                return 0;
        }
 
@@ -4285,15 +4317,13 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
 
-       fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
-                    ctdb_recoverd_parent, &fd[0]);
+       fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
+                           ctdb_recoverd_parent, &fd[0]);
        tevent_fd_set_auto_close(fde);
 
        /* set up a handler to pick up sigchld */
-       se = event_add_signal(ctdb->ev, ctdb,
-                                    SIGCHLD, 0,
-                                    recd_sig_child_handler,
-                                    ctdb);
+       se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
+                              recd_sig_child_handler, ctdb);
        if (se == NULL) {
                DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
                exit(1);
@@ -4321,8 +4351,9 @@ void ctdb_stop_recoverd(struct ctdb_context *ctdb)
        TALLOC_FREE(ctdb->recd_ping_count);
 }
 
-static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void ctdb_restart_recd(struct tevent_context *ev,
+                             struct tevent_timer *te,
+                             struct timeval t, void *private_data)
 {
        struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);