merge from ronnie
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
index 82d6f14a4b887617d773b0869587e40313541b2f..7367093810be7e7655eecb81238bd00bb902516d 100644 (file)
 #include "lib/events/events.h"
 #include "system/filesys.h"
 #include "system/time.h"
+#include "system/network.h"
+#include "system/wait.h"
 #include "popt.h"
 #include "cmdline.h"
 #include "../include/ctdb.h"
 #include "../include/ctdb_private.h"
+#include "db_wrap.h"
+#include "dlinklist.h"
 
 
 struct ban_state {
@@ -43,11 +47,17 @@ struct ctdb_recoverd {
        struct ban_state **banned_nodes;
        struct timeval priority_time;
        bool need_takeover_run;
+       bool need_recovery;
+       uint32_t node_flags;
+       struct timed_event *send_election_te;
+       struct timed_event *election_timeout;
+       struct vacuum_info *vacuum_info;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
 
+
 /*
   unban a node
  */
@@ -55,17 +65,44 @@ static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
 {
        struct ctdb_context *ctdb = rec->ctdb;
 
+       DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
+
        if (!ctdb_validate_pnn(ctdb, pnn)) {
-               DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
+               DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
                return;
        }
 
-       if (rec->banned_nodes[pnn] == NULL) {
+       /* If we are unbanning a different node then just pass the ban info on */
+       if (pnn != ctdb->pnn) {
+               TDB_DATA data;
+               int ret;
+               
+               DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
+
+               data.dptr = (uint8_t *)&pnn;
+               data.dsize = sizeof(uint32_t);
+
+               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
+                       return;
+               }
+
                return;
        }
 
+       /* make sure we remember we are no longer banned in case 
+          there is an election */
+       rec->node_flags &= ~NODE_FLAGS_BANNED;
+
+       DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
        ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
 
+       if (rec->banned_nodes[pnn] == NULL) {
+               DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
+               return;
+       }
+
        talloc_free(rec->banned_nodes[pnn]);
        rec->banned_nodes[pnn] = NULL;
 }
@@ -80,7 +117,7 @@ static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, s
        struct ctdb_recoverd *rec = state->rec;
        uint32_t pnn = state->banned_node;
 
-       DEBUG(0,("Node %u is now unbanned\n", pnn));
+       DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
        ctdb_unban_node(rec, pnn);
 }
 
@@ -91,20 +128,58 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
 {
        struct ctdb_context *ctdb = rec->ctdb;
 
+       DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
+
        if (!ctdb_validate_pnn(ctdb, pnn)) {
-               DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
+               DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
                return;
        }
 
-       if (pnn == ctdb->pnn) {
-               DEBUG(0,("self ban - lowering our election priority\n"));
-               /* banning ourselves - lower our election priority */
-               rec->priority_time = timeval_current();
+       if (0 == ctdb->tunable.enable_bans) {
+               DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
+               return;
        }
 
+       /* If we are banning a different node then just pass the ban info on */
+       if (pnn != ctdb->pnn) {
+               struct ctdb_ban_info b;
+               TDB_DATA data;
+               int ret;
+               
+               DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
+
+               b.pnn = pnn;
+               b.ban_time = ban_time;
+
+               data.dptr = (uint8_t *)&b;
+               data.dsize = sizeof(b);
+
+               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
+                       return;
+               }
+
+               return;
+       }
+
+       DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
        ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
 
-       rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
+       /* banning ourselves - lower our election priority */
+       rec->priority_time = timeval_current();
+
+       /* make sure we remember we are banned in case there is an 
+          election */
+       rec->node_flags |= NODE_FLAGS_BANNED;
+
+       if (rec->banned_nodes[pnn] != NULL) {
+               DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
+               talloc_free(rec->banned_nodes[pnn]);
+               rec->banned_nodes[pnn] = NULL;
+       }
+
+       rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
        CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
 
        rec->banned_nodes[pnn]->rec = rec;
@@ -120,124 +195,98 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
 
 
-struct freeze_node_data {
-       uint32_t count;
-       enum monitor_result status;
-};
-
-
-static void freeze_node_callback(struct ctdb_client_control_state *state)
+/*
+  run the "recovered" eventscript on all nodes
+ */
+static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
 {
-       struct freeze_node_data *fndata = talloc_get_type(state->async.private, struct freeze_node_data);
-
+       TALLOC_CTX *tmp_ctx;
 
-       /* one more node has responded to our freeze node*/
-       fndata->count--;
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
-       /* if we failed to freeze the node, we must trigger another recovery */
-       if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
-               DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
-               fndata->status = MONITOR_RECOVERY_NEEDED;
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
+                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
+                       CONTROL_TIMEOUT(), false, tdb_null) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event. Recovery failed.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
        }
 
-       return;
+       talloc_free(tmp_ctx);
+       return 0;
 }
 
-
-
-/* freeze all nodes */
-static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+/*
+  run the "startrecovery" eventscript on all nodes
+ */
+static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
 {
-       struct freeze_node_data *fndata;
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       struct ctdb_client_control_state *state;
-       enum monitor_result status;
-       int j;
-       
-       fndata = talloc(mem_ctx, struct freeze_node_data);
-       CTDB_NO_MEMORY_FATAL(ctdb, fndata);
-       fndata->count  = 0;
-       fndata->status = MONITOR_OK;
-
-       /* loop over all active nodes and send an async freeze call to 
-          them*/
-       for (j=0; j<nodemap->num; j++) {
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
-                                       CONTROL_TIMEOUT(), 
-                                       nodemap->nodes[j].pnn);
-               if (state == NULL) {
-                       /* we failed to send the control, treat this as 
-                          an error and try again next iteration
-                       */                      
-                       DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
-                       talloc_free(mem_ctx);
-                       return MONITOR_RECOVERY_NEEDED;
-               }
-
-               /* set up the callback functions */
-               state->async.fn = freeze_node_callback;
-               state->async.private = fndata;
-
-               /* one more control to wait for to complete */
-               fndata->count++;
-       }
+       TALLOC_CTX *tmp_ctx;
 
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
-       /* now wait for up to the maximum number of seconds allowed
-          or until all nodes we expect a response from has replied
-       */
-       while (fndata->count > 0) {
-               event_loop_once(ctdb->ev);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
+                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
+                       CONTROL_TIMEOUT(), false, tdb_null) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
        }
 
-       status = fndata->status;
-       talloc_free(mem_ctx);
-       return status;
+       talloc_free(tmp_ctx);
+       return 0;
 }
 
-
 /*
   change recovery mode on all nodes
  */
 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
 {
-       int j, ret;
+       TDB_DATA data;
+       uint32_t *nodes;
+       TALLOC_CTX *tmp_ctx;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
 
        /* freeze all nodes */
        if (rec_mode == CTDB_RECOVERY_ACTIVE) {
-               ret = freeze_all_nodes(ctdb, nodemap);
-               if (ret != MONITOR_OK) {
-                       DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
+               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
+                                               nodes, CONTROL_TIMEOUT(),
+                                               false, tdb_null) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
+                       talloc_free(tmp_ctx);
                        return -1;
                }
        }
 
 
-       /* set recovery mode to active on all nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* dont change it for nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
+       data.dsize = sizeof(uint32_t);
+       data.dptr = (unsigned char *)&rec_mode;
 
-               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
-                       return -1;
-               }
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
+                                       nodes, CONTROL_TIMEOUT(),
+                                       false, data) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
 
-               if (rec_mode == CTDB_RECOVERY_NORMAL) {
-                       ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
-                       if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
-                               return -1;
-                       }
+       if (rec_mode == CTDB_RECOVERY_NORMAL) {
+               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
+                                               nodes, CONTROL_TIMEOUT(),
+                                               false, tdb_null) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
                }
        }
 
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -246,22 +295,24 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
  */
 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
 {
-       int j, ret;
+       TDB_DATA data;
+       TALLOC_CTX *tmp_ctx;
 
-       /* set recovery master to pnn on all nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* dont change it for nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
-               ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
-                       return -1;
-               }
+       data.dsize = sizeof(uint32_t);
+       data.dptr = (unsigned char *)&pnn;
+
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
+                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
+                       CONTROL_TIMEOUT(), false, data) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
        }
 
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -289,7 +340,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                         mem_ctx, &remote_dbmap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
                        return -1;
                }
 
@@ -299,7 +350,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
 
 
                        for (i=0;i<remote_dbmap->num;i++) {
-                               if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
+                               if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
                                        break;
                                }
                        }
@@ -308,14 +359,16 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                                continue;
                        }
                        /* ok so we need to create this database */
-                       ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbids[db], mem_ctx, &name);
+                       ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
+                                           mem_ctx, &name);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
                                return -1;
                        }
-                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, name);
+                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
+                                          mem_ctx, name, dbmap->dbs[db].persistent);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
                                return -1;
                        }
                }
@@ -348,7 +401,7 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                         mem_ctx, &remote_dbmap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
                        return -1;
                }
 
@@ -357,7 +410,7 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                        const char *name;
 
                        for (i=0;i<(*dbmap)->num;i++) {
-                               if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
+                               if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
                                        break;
                                }
                        }
@@ -369,20 +422,21 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                           rebuild dbmap
                         */
                        ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                           remote_dbmap->dbids[db], mem_ctx, &name);
+                                           remote_dbmap->dbs[db].dbid, mem_ctx, &name);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
                                          nodemap->nodes[j].pnn));
                                return -1;
                        }
-                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name);
+                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
+                                          remote_dbmap->dbs[db].persistent);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
                                return -1;
                        }
                        ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
                                return -1;
                        }
                }
@@ -393,63 +447,111 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
 
 
 /*
-  pull all the remote database contents into ours
+  pull the remote database contents from one node into the recdb
  */
-static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap
-                                    uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode
+                                   struct tdb_wrap *recdb, uint32_t dbid)
 {
-       int i, j, ret;
+       int ret;
+       TDB_DATA outdata;
+       struct ctdb_control_pulldb_reply *reply;
+       struct ctdb_rec_data *rec;
+       int i;
+       TALLOC_CTX *tmp_ctx = talloc_new(recdb);
 
-       /* pull all records from all other nodes across onto this node
-          (this merges based on rsn)
-       */
-       for (i=0;i<dbmap->num;i++) {
-               for (j=0; j<nodemap->num; j++) {
-                       /* we dont need to merge with ourselves */
-                       if (nodemap->nodes[j].pnn == pnn) {
-                               continue;
+       ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
+                              CONTROL_TIMEOUT(), &outdata);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
+
+       if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
+               DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+       
+       rec = (struct ctdb_rec_data *)&reply->data[0];
+       
+       for (i=0;
+            i<reply->count;
+            rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
+               TDB_DATA key, data;
+               struct ctdb_ltdb_header *hdr;
+               TDB_DATA existing;
+               
+               key.dptr = &rec->data[0];
+               key.dsize = rec->keylen;
+               data.dptr = &rec->data[key.dsize];
+               data.dsize = rec->datalen;
+               
+               hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+
+               /* fetch the existing record, if any */
+               existing = tdb_fetch(recdb->tdb, key);
+               
+               if (existing.dptr != NULL) {
+                       struct ctdb_ltdb_header header;
+                       if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
+                               DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
+                                        (unsigned)existing.dsize, srcnode));
+                               free(existing.dptr);
+                               talloc_free(tmp_ctx);
+                               return -1;
                        }
-                       /* dont merge from nodes that are unavailable */
-                       if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       header = *(struct ctdb_ltdb_header *)existing.dptr;
+                       free(existing.dptr);
+                       if (!(header.rsn < hdr->rsn ||
+                             (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
                                continue;
                        }
-                       ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                              pnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
-                       if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
-                                         nodemap->nodes[j].pnn, pnn));
-                               return -1;
-                       }
+               }
+               
+               if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
+                       DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;                              
                }
        }
 
+       talloc_free(tmp_ctx);
+
        return 0;
 }
 
-
 /*
-  change the dmaster on all databases to point to us
+  pull all the remote database contents into the recdb
  */
-static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                                          uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+                               struct tdb_wrap *recdb, uint32_t dbid)
 {
-       int i, j, ret;
+       int j;
 
-       /* update dmaster to point to this node for all databases/nodes */
-       for (i=0;i<dbmap->num;i++) {
-               for (j=0; j<nodemap->num; j++) {
-                       /* dont repoint nodes that are unavailable */
-                       if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                               continue;
-                       }
-                       ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, ctdb, dbmap->dbids[i], pnn);
-                       if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].pnn, dbmap->dbids[i]));
-                               return -1;
-                       }
+       /* pull all records from all other nodes across onto this node
+          (this merges based on rsn)
+       */
+       for (j=0; j<nodemap->num; j++) {
+               /* dont merge from nodes that are unavailable */
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
+                                nodemap->nodes[j].pnn));
+                       return -1;
                }
        }
-
+       
        return 0;
 }
 
@@ -478,103 +580,6 @@ static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node
        return 0;
 }
 
-/*
-  vacuum one database
- */
-static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
-{
-       uint64_t max_rsn;
-       int ret, i;
-
-       /* find max rsn on our local node for this db */
-       ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
-       if (ret != 0) {
-               return -1;
-       }
-
-       /* set rsn on non-empty records to max_rsn+1 */
-       for (i=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
-                                                db_id, max_rsn+1);
-               if (ret != 0) {
-                       DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
-                                nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
-                       return -1;
-               }
-       }
-
-       /* delete records with rsn < max_rsn+1 on all nodes */
-       for (i=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
-                                                db_id, max_rsn+1);
-               if (ret != 0) {
-                       DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
-                                nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
-                       return -1;
-               }
-       }
-
-
-       return 0;
-}
-
-
-/*
-  vacuum all attached databases
- */
-static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                               struct ctdb_dbid_map *dbmap)
-{
-       int i;
-
-       /* update dmaster to point to this node for all databases/nodes */
-       for (i=0;i<dbmap->num;i++) {
-               if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
-                       return -1;
-               }
-       }
-       return 0;
-}
-
-
-/*
-  push out all our database contents to all other nodes
- */
-static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                                   uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
-{
-       int i, j, ret;
-
-       /* push all records out to the nodes again */
-       for (i=0;i<dbmap->num;i++) {
-               for (j=0; j<nodemap->num; j++) {
-                       /* we dont need to push to ourselves */
-                       if (nodemap->nodes[j].pnn == pnn) {
-                               continue;
-                       }
-                       /* dont push to nodes that are unavailable */
-                       if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                               continue;
-                       }
-                       ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
-                                              dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
-                       if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
-                                         pnn, nodemap->nodes[j].pnn));
-                               return -1;
-                       }
-               }
-       }
-
-       return 0;
-}
-
 
 /*
   ensure all nodes have the same vnnmap we do
@@ -593,7 +598,7 @@ static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_nod
 
                ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
                        return -1;
                }
        }
@@ -611,30 +616,21 @@ static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid,
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
        struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
        TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       uint32_t recmaster;
-       int ret;
 
        if (data.dsize != sizeof(*b)) {
-               DEBUG(0,("Bad data in ban_handler\n"));
+               DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
                talloc_free(mem_ctx);
                return;
        }
 
-       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
-       if (ret != 0) {
-               DEBUG(0,(__location__ " Failed to find the recmaster\n"));
-               talloc_free(mem_ctx);
-               return;
-       }
-
-       if (recmaster != ctdb->pnn) {
-               DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
-               talloc_free(mem_ctx);
+       if (b->pnn != ctdb->pnn) {
+               DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
                return;
        }
 
-       DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
+       DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
                 b->pnn, b->ban_time));
+
        ctdb_ban_node(rec, b->pnn, b->ban_time);
        talloc_free(mem_ctx);
 }
@@ -648,35 +644,218 @@ static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid,
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
        TALLOC_CTX *mem_ctx = talloc_new(ctdb);
        uint32_t pnn;
-       int ret;
-       uint32_t recmaster;
 
        if (data.dsize != sizeof(uint32_t)) {
-               DEBUG(0,("Bad data in unban_handler\n"));
+               DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
                talloc_free(mem_ctx);
                return;
        }
        pnn = *(uint32_t *)data.dptr;
 
-       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
-       if (ret != 0) {
-               DEBUG(0,(__location__ " Failed to find the recmaster\n"));
-               talloc_free(mem_ctx);
+       if (pnn != ctdb->pnn) {
+               DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
                return;
        }
 
-       if (recmaster != ctdb->pnn) {
-               DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
-               talloc_free(mem_ctx);
+       DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
+       ctdb_unban_node(rec, pnn);
+       talloc_free(mem_ctx);
+}
+
+
+struct vacuum_info {
+       struct vacuum_info *next, *prev;
+       struct ctdb_recoverd *rec;
+       uint32_t srcnode;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_control_pulldb_reply *recs;
+       struct ctdb_rec_data *r;
+};
+
+static void vacuum_fetch_next(struct vacuum_info *v);
+
+/*
+  called when a vacuum fetch has completed - just free it and do the next one
+ */
+static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
+{
+       struct vacuum_info *v = talloc_get_type(state->async.private, struct vacuum_info);
+       talloc_free(state);
+       vacuum_fetch_next(v);
+}
+
+
+/*
+  process the next element from the vacuum list
+*/
+static void vacuum_fetch_next(struct vacuum_info *v)
+{
+       struct ctdb_call call;
+       struct ctdb_rec_data *r;
+
+       while (v->recs->count) {
+               struct ctdb_client_call_state *state;
+               TDB_DATA data;
+               struct ctdb_ltdb_header *hdr;
+
+               ZERO_STRUCT(call);
+               call.call_id = CTDB_NULL_FUNC;
+               call.flags = CTDB_IMMEDIATE_MIGRATION;
+
+               r = v->r;
+               v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
+               v->recs->count--;
+
+               call.key.dptr = &r->data[0];
+               call.key.dsize = r->keylen;
+
+               /* ensure we don't block this daemon - just skip a record if we can't get
+                  the chainlock */
+               if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
+                       continue;
+               }
+
+               data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
+               if (data.dptr == NULL) {
+                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+                       continue;
+               }
+
+               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+                       free(data.dptr);
+                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+                       continue;
+               }
+               
+               hdr = (struct ctdb_ltdb_header *)data.dptr;
+               if (hdr->dmaster == v->rec->ctdb->pnn) {
+                       /* its already local */
+                       free(data.dptr);
+                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+                       continue;
+               }
+
+               free(data.dptr);
+
+               state = ctdb_call_send(v->ctdb_db, &call);
+               tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+               if (state == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
+                       talloc_free(v);
+                       return;
+               }
+               state->async.fn = vacuum_fetch_callback;
+               state->async.private = v;
                return;
        }
 
-       DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
-       ctdb_unban_node(rec, pnn);
-       talloc_free(mem_ctx);
+       talloc_free(v);
 }
 
 
+/*
+  destroy a vacuum info structure
+ */
+static int vacuum_info_destructor(struct vacuum_info *v)
+{
+       DLIST_REMOVE(v->rec->vacuum_info, v);
+       return 0;
+}
+
+
+/*
+  handler for vacuum fetch
+*/
+static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                                TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_control_pulldb_reply *recs;
+       int ret, i;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       const char *name;
+       struct ctdb_dbid_map *dbmap=NULL;
+       bool persistent = false;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_rec_data *r;
+       uint32_t srcnode;
+       struct vacuum_info *v;
+
+       recs = (struct ctdb_control_pulldb_reply *)data.dptr;
+       r = (struct ctdb_rec_data *)&recs->data[0];
+
+       if (recs->count == 0) {
+               return;
+       }
+
+       srcnode = r->reqid;
+
+       for (v=rec->vacuum_info;v;v=v->next) {
+               if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
+                       /* we're already working on records from this node */
+                       return;
+               }
+       }
+
+       /* work out if the database is persistent */
+       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       for (i=0;i<dbmap->num;i++) {
+               if (dbmap->dbs[i].dbid == recs->db_id) {
+                       persistent = dbmap->dbs[i].persistent;
+                       break;
+               }
+       }
+       if (i == dbmap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
+               talloc_free(tmp_ctx);
+               return;         
+       }
+
+       /* find the name of this database */
+       if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       /* attach to it */
+       ctdb_db = ctdb_attach(ctdb, name, persistent);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       v = talloc_zero(rec, struct vacuum_info);
+       if (v == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
+               return;
+       }
+
+       v->rec = rec;
+       v->srcnode = srcnode;
+       v->ctdb_db = ctdb_db;
+       v->recs = talloc_memdup(v, recs, data.dsize);
+       if (v->recs == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
+               talloc_free(v);
+               return;         
+       }
+       v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
+
+       DLIST_ADD(rec->vacuum_info, v);
+
+       talloc_set_destructor(v, vacuum_info_destructor);
+
+       vacuum_fetch_next(v);
+}
+
 
 /*
   called when ctdb_wait_timeout should finish
@@ -700,23 +879,340 @@ static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
        }
 }
 
-/* Create a new random generation ip. 
-   The generation id can not be the INVALID_GENERATION id
-*/
-static uint32_t new_generation(void)
+/*
+  called when an election times out (ends)
+ */
+static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
+                                 struct timeval t, void *p)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
+       rec->election_timeout = NULL;
+}
+
+
+/*
+  wait for an election to finish. It finished election_timeout seconds after
+  the last election packet is received
+ */
+static void ctdb_wait_election(struct ctdb_recoverd *rec)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+       while (rec->election_timeout) {
+               event_loop_once(ctdb->ev);
+       }
+}
+
+/*
+  remember the trouble maker
+ */
+static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+
+       if (rec->last_culprit != culprit ||
+           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
+               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
+               /* either a new node is the culprit, or we've decided to forgive them */
+               rec->last_culprit = culprit;
+               rec->first_recover_time = timeval_current();
+               rec->culprit_counter = 0;
+       }
+       rec->culprit_counter++;
+}
+
+/*
+  Update our local flags from all remote connected nodes. 
+  This is only run when we are or we belive we are the recovery master
+ */
+static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
+{
+       int j;
+       struct ctdb_context *ctdb = rec->ctdb;
+       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
+
+       /* get the nodemap for all active remote nodes and verify
+          they are the same as for this node
+        */
+       for (j=0; j<nodemap->num; j++) {
+               struct ctdb_node_map *remote_nodemap=NULL;
+               int ret;
+
+               if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+               if (nodemap->nodes[j].pnn == ctdb->pnn) {
+                       continue;
+               }
+
+               ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
+                                          mem_ctx, &remote_nodemap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
+                                 nodemap->nodes[j].pnn));
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       talloc_free(mem_ctx);
+                       return MONITOR_FAILED;
+               }
+               if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
+                       struct ctdb_node_flag_change c;
+                       TDB_DATA data;
+
+                       /* We should tell our daemon about this so it
+                          updates its flags or else we will log the same 
+                          message again in the next iteration of recovery.
+                          Since we are the recovery master we can just as
+                          well update the flags on all nodes.
+                       */
+                       c.pnn = nodemap->nodes[j].pnn;
+                       c.old_flags = nodemap->nodes[j].flags;
+                       c.new_flags = remote_nodemap->nodes[j].flags;
+
+                       data.dptr = (uint8_t *)&c;
+                       data.dsize = sizeof(c);
+
+                       ctdb_send_message(ctdb, ctdb->pnn,
+                                       CTDB_SRVID_NODE_FLAGS_CHANGED, 
+                                       data);
+
+                       /* Update our local copy of the flags in the recovery
+                          daemon.
+                       */
+                       DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
+                                nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
+                                nodemap->nodes[j].flags));
+                       nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
+
+                       /* If the BANNED flag has changed for the node
+                          this is a good reason to do a new election.
+                        */
+                       if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
+                               DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
+                                nodemap->nodes[j].pnn, c.new_flags,
+                                c.old_flags));
+                               talloc_free(mem_ctx);
+                               return MONITOR_ELECTION_NEEDED;
+                       }
+
+               }
+               talloc_free(remote_nodemap);
+       }
+       talloc_free(mem_ctx);
+       return MONITOR_OK;
+}
+
+
+/* Create a new random generation ip. 
+   The generation id can not be the INVALID_GENERATION id
+*/
+static uint32_t new_generation(void)
+{
+       uint32_t generation;
+
+       while (1) {
+               generation = random();
+
+               if (generation != INVALID_GENERATION) {
+                       break;
+               }
+       }
+
+       return generation;
+}
+
+
+/*
+  create a temporary working database
+ */
+static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
+{
+       char *name;
+       struct tdb_wrap *recdb;
+
+       /* open up the temporary recovery database */
+       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
+       if (name == NULL) {
+               return NULL;
+       }
+       unlink(name);
+       recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
+                             TDB_NOLOCK, O_RDWR|O_CREAT|O_EXCL, 0600);
+       if (recdb == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
+       }
+
+       talloc_free(name);
+
+       return recdb;
+}
+
+
+/* 
+   a traverse function for pulling all relevent records from recdb
+ */
+struct recdb_data {
+       struct ctdb_context *ctdb;
+       struct ctdb_control_pulldb_reply *recdata;
+       uint32_t len;
+       bool failed;
+};
+
+static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+       struct recdb_data *params = (struct recdb_data *)p;
+       struct ctdb_rec_data *rec;
+       struct ctdb_ltdb_header *hdr;
+
+       /* skip empty records */
+       if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+               return 0;
+       }
+
+       /* update the dmaster field to point to us */
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+       hdr->dmaster = params->ctdb->pnn;
+
+       /* add the record to the blob ready to send to the nodes */
+       rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
+       if (rec == NULL) {
+               params->failed = true;
+               return -1;
+       }
+       params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
+       if (params->recdata == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
+                        rec->length + params->len, params->recdata->count));
+               params->failed = true;
+               return -1;
+       }
+       params->recdata->count++;
+       memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
+       params->len += rec->length;
+       talloc_free(rec);
+
+       return 0;
+}
+
+/*
+  push the recdb database out to all nodes
+ */
+static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
+                              struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
+{
+       struct recdb_data params;
+       struct ctdb_control_pulldb_reply *recdata;
+       TDB_DATA outdata;
+       TALLOC_CTX *tmp_ctx;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
+       CTDB_NO_MEMORY(ctdb, recdata);
+
+       recdata->db_id = dbid;
+
+       params.ctdb = ctdb;
+       params.recdata = recdata;
+       params.len = offsetof(struct ctdb_control_pulldb_reply, data);
+       params.failed = false;
+
+       if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
+               talloc_free(params.recdata);
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       if (params.failed) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
+               talloc_free(params.recdata);
+               talloc_free(tmp_ctx);
+               return -1;              
+       }
+
+       recdata = params.recdata;
+
+       outdata.dptr = (void *)recdata;
+       outdata.dsize = params.len;
+
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
+                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
+                       CONTROL_TIMEOUT(), false, outdata) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
+               talloc_free(recdata);
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
+                 dbid, recdata->count));
+
+       talloc_free(recdata);
+       talloc_free(tmp_ctx);
+
+       return 0;
+}
+
+
+/*
+  go through a full recovery on one database 
+ */
+static int recover_database(struct ctdb_recoverd *rec, 
+                           TALLOC_CTX *mem_ctx,
+                           uint32_t dbid,
+                           uint32_t pnn, 
+                           struct ctdb_node_map *nodemap,
+                           uint32_t transaction_id)
 {
-       uint32_t generation;
+       struct tdb_wrap *recdb;
+       int ret;
+       struct ctdb_context *ctdb = rec->ctdb;
+       TDB_DATA data;
+       struct ctdb_control_wipe_database w;
 
-       while (1) {
-               generation = random();
+       recdb = create_recdb(ctdb, mem_ctx);
+       if (recdb == NULL) {
+               return -1;
+       }
 
-               if (generation != INVALID_GENERATION) {
-                       break;
-               }
+       /* pull all remote databases onto the recdb */
+       ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
+               return -1;
        }
 
-       return generation;
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
+
+       /* wipe all the remote databases. This is safe as we are in a transaction */
+       w.db_id = dbid;
+       w.transaction_id = transaction_id;
+
+       data.dptr = (void *)&w;
+       data.dsize = sizeof(w);
+
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
+                       list_of_active_nodes(ctdb, nodemap, recdb, true),
+                       CONTROL_TIMEOUT(), false, data) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
+               talloc_free(recdb);
+               return -1;
+       }
+       
+       /* push out the correct database. This sets the dmaster and skips 
+          the empty records */
+       ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
+       if (ret != 0) {
+               talloc_free(recdb);
+               return -1;
+       }
+
+       /* all done with this database */
+       talloc_free(recdb);
+
+       return 0;
 }
+
                
 /*
   we are the recmaster, and recovery is needed - start a recovery run
@@ -730,36 +1226,70 @@ static int do_recovery(struct ctdb_recoverd *rec,
        int i, j, ret;
        uint32_t generation;
        struct ctdb_dbid_map *dbmap;
+       TDB_DATA data;
 
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               /* either a new node is the culprit, or we've decide to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
-       }
-       rec->culprit_counter++;
+       DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
+
+       /* if recovery fails, force it again */
+       rec->need_recovery = true;
+
+       ctdb_set_culprit(rec, culprit);
 
        if (rec->culprit_counter > 2*nodemap->num) {
-               DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
                         culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
                         ctdb->tunable.recovery_ban_period));
                ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
        }
 
        if (!ctdb_recovery_lock(ctdb, true)) {
-               DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
+               ctdb_set_culprit(rec, pnn);
+               DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+
+       /* get a list of all databases */
+       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
+               return -1;
+       }
+
+       /* we do the db creation before we set the recovery mode, so the freeze happens
+          on all databases we will be dealing with. */
+
+       /* verify that we have all the databases any other node has */
+       ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
+               return -1;
+       }
+
+       /* verify that all other nodes have all our databases */
+       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
                return -1;
        }
 
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
+
+
        /* set recovery mode to active on all nodes */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return -1;
        }
 
-       DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+       /* execute the "startrecovery" event script on all nodes */
+       ret = run_startrecovery_eventscript(ctdb, nodemap);
+       if (ret!=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
+               return -1;
+       }
 
        /* pick a new generation number */
        generation = new_generation();
@@ -777,62 +1307,41 @@ static int do_recovery(struct ctdb_recoverd *rec,
        vnnmap->generation = generation;
        ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-               return -1;
-       }
-
-       /* get a list of all databases */
-       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
-               return -1;
-       }
-
-
-
-       /* verify that all other nodes have all our databases */
-       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
-               return -1;
-       }
-
-       /* verify that we have all the databases any other node has */
-       ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to create missing local databases\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
                return -1;
        }
 
+       data.dptr = (void *)&generation;
+       data.dsize = sizeof(uint32_t);
 
-
-       /* verify that all other nodes have all our databases */
-       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
+                       list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
+                       CONTROL_TIMEOUT(), false, data) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
                return -1;
        }
 
+       DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
 
-       DEBUG(1, (__location__ " Recovery - created remote databases\n"));
-
-       /* pull all remote databases onto the local node */
-       ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to pull remote databases\n"));
-               return -1;
+       for (i=0;i<dbmap->num;i++) {
+               if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
+                       return -1;
+               }
        }
 
-       DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
 
-       /* push all local databases to the remote nodes */
-       ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to push local databases\n"));
+       /* commit all the changes */
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
+                       list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
+                       CONTROL_TIMEOUT(), false, data) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
                return -1;
        }
 
-       DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
+       
 
        /* build a new vnn map with all the currently active and
           unbanned nodes */
@@ -848,58 +1357,34 @@ static int do_recovery(struct ctdb_recoverd *rec,
                }
        }
 
-
-
        /* update to the new vnnmap on all nodes */
        ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
                return -1;
        }
 
-       DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
 
        /* update recmaster to point to us for all nodes */
        ret = set_recovery_master(ctdb, nodemap, pnn);
        if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery master\n"));
-               return -1;
-       }
-
-       DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
-
-       /* repoint all local and remote database records to the local
-          node as being dmaster
-        */
-       ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
                return -1;
        }
 
-       DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
 
        /*
          update all nodes to have the same flags that we have
         */
        ret = update_flags_on_all_nodes(ctdb, nodemap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
                return -1;
        }
        
-       DEBUG(1, (__location__ " Recovery - updated flags\n"));
-
-       /*
-         run a vacuum operation on empty records
-        */
-       ret = vacuum_all_databases(ctdb, nodemap, dbmap);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
-               return -1;
-       }
-
-       DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
        /*
          if enabled, tell nodes to takeover their public IPs
@@ -908,17 +1393,23 @@ static int do_recovery(struct ctdb_recoverd *rec,
                rec->need_takeover_run = false;
                ret = ctdb_takeover_run(ctdb, nodemap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
                        return -1;
                }
-               DEBUG(1, (__location__ " Recovery - done takeover\n"));
+               DEBUG(DEBUG_INFO, (__location__ " Recovery - done takeover\n"));
        }
 
+       /* execute the "recovered" event script on all nodes */
+       ret = run_recovered_eventscript(ctdb, nodemap);
+       if (ret!=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
+               return -1;
+       }
 
        /* disable recovery mode */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
        if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
                return -1;
        }
 
@@ -926,15 +1417,17 @@ static int do_recovery(struct ctdb_recoverd *rec,
           has been reconfigured */
        ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
 
-       DEBUG(0, (__location__ " Recovery complete\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
+
+       rec->need_recovery = false;
 
        /* We just finished a recovery successfully. 
           We now wait for rerecovery_timeout before we allow 
           another recovery to take place.
        */
-       DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
        ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
-       DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
 
        return 0;
 }
@@ -948,6 +1441,7 @@ struct election_message {
        uint32_t num_connected;
        struct timeval priority_time;
        uint32_t pnn;
+       uint32_t node_flags;
 };
 
 /*
@@ -963,9 +1457,11 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
 
        em->pnn = rec->ctdb->pnn;
        em->priority_time = rec->priority_time;
+       em->node_flags = rec->node_flags;
 
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
        if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
                return;
        }
 
@@ -983,12 +1479,24 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
 {
        struct election_message myem;
-       int cmp;
+       int cmp = 0;
 
        ctdb_election_data(rec, &myem);
 
+       /* we cant win if we are banned */
+       if (rec->node_flags & NODE_FLAGS_BANNED) {
+               return false;
+       }       
+
+       /* we will automatically win if the other node is banned */
+       if (em->node_flags & NODE_FLAGS_BANNED) {
+               return true;
+       }
+
        /* try to use the most connected node */
-       cmp = (int)myem.num_connected - (int)em->num_connected;
+       if (cmp == 0) {
+               cmp = (int)myem.num_connected - (int)em->num_connected;
+       }
 
        /* then the longest running node */
        if (cmp == 0) {
@@ -1005,14 +1513,14 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
 /*
   send out an election request
  */
-static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
+static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
 {
        int ret;
        TDB_DATA election_data;
        struct election_message emsg;
        uint64_t srvid;
        struct ctdb_context *ctdb = rec->ctdb;
-       
+
        srvid = CTDB_SRVID_RECOVERY;
 
        ctdb_election_data(rec, &emsg);
@@ -1026,7 +1534,7 @@ static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
         */
        ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
        if (ret != 0) {
-               DEBUG(0, (__location__ " failed to send recmaster election request\n"));
+               DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
                return -1;
        }
 
@@ -1048,7 +1556,7 @@ static void unban_all_nodes(struct ctdb_context *ctdb)
        
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
        if (ret != 0) {
-               DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
+               DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
                return;
        }
 
@@ -1062,6 +1570,24 @@ static void unban_all_nodes(struct ctdb_context *ctdb)
        talloc_free(tmp_ctx);
 }
 
+
+/*
+  we think we are winning the election - send a broadcast election request
+ */
+static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
+       int ret;
+
+       ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
+       }
+
+       talloc_free(rec->send_election_te);
+       rec->send_election_te = NULL;
+}
+
 /*
   handler for recovery master elections
 */
@@ -1073,6 +1599,12 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
        struct election_message *em = (struct election_message *)data.dptr;
        TALLOC_CTX *mem_ctx;
 
+       /* we got an election packet - update the timeout for the election */
+       talloc_free(rec->election_timeout);
+       rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
+                                               timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
+                                               ctdb_election_timeout, rec);
+
        mem_ctx = talloc_new(ctdb);
 
        /* someone called an election. check their election data
@@ -1080,14 +1612,19 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
           send a new election message to all other nodes
         */
        if (ctdb_election_win(rec, em)) {
-               ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
-               if (ret!=0) {
-                       DEBUG(0, (__location__ " failed to initiate recmaster election"));
+               if (!rec->send_election_te) {
+                       rec->send_election_te = event_add_timed(ctdb->ev, rec, 
+                                                               timeval_current_ofs(0, 500000),
+                                                               election_send_request, rec);
                }
                talloc_free(mem_ctx);
                /*unban_all_nodes(ctdb);*/
                return;
        }
+       
+       /* we didn't win */
+       talloc_free(rec->send_election_te);
+       rec->send_election_te = NULL;
 
        /* release the recmaster lock */
        if (em->pnn != ctdb->pnn &&
@@ -1100,7 +1637,7 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
        /* ok, let that guy become recmaster then */
        ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
        if (ret != 0) {
-               DEBUG(0, (__location__ " failed to send recmaster election request"));
+               DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
                talloc_free(mem_ctx);
                return;
        }
@@ -1128,18 +1665,23 @@ static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint3
        /* set all nodes to recovery mode to stop all internode traffic */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
        }
-       
-       ret = send_election_request(rec, mem_ctx, pnn);
+
+       talloc_free(rec->election_timeout);
+       rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
+                                               timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
+                                               ctdb_election_timeout, rec);
+
+       ret = send_election_request(rec, pnn);
        if (ret!=0) {
-               DEBUG(0, (__location__ " failed to initiate recmaster election"));
+               DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
                return;
        }
 
        /* wait for a few seconds to collect all responses */
-       ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
+       ctdb_wait_election(rec);
 }
 
 
@@ -1159,7 +1701,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
 
        if (data.dsize != sizeof(*c)) {
-               DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
+               DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
                return;
        }
 
@@ -1167,13 +1709,19 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
 
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
+               talloc_free(tmp_ctx);
+               return;         
+       }
+
 
        for (i=0;i<nodemap->num;i++) {
                if (nodemap->nodes[i].pnn == c->pnn) break;
        }
 
        if (i == nodemap->num) {
-               DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
+               DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
                talloc_free(tmp_ctx);
                return;
        }
@@ -1190,7 +1738,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        }
 
        if (nodemap->nodes[i].flags != c->new_flags) {
-               DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
+               DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
        nodemap->nodes[i].flags = c->new_flags;
@@ -1231,7 +1779,7 @@ struct verify_recmode_normal_data {
 
 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
 {
-       struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private, struct verify_recmode_normal_data);
+       struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
 
 
        /* one more node has responded with recmode data*/
@@ -1251,7 +1799,7 @@ static void verify_recmode_normal_callback(struct ctdb_client_control_state *sta
           status field
        */
        if (state->status != CTDB_RECOVERY_NORMAL) {
-               DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
                rmdata->status = MONITOR_RECOVERY_NEEDED;
        }
 
@@ -1286,14 +1834,14 @@ static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb
                        /* we failed to send the control, treat this as 
                           an error and try again next iteration
                        */                      
-                       DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
+                       DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
                        talloc_free(mem_ctx);
                        return MONITOR_FAILED;
                }
 
                /* set up the callback functions */
                state->async.fn = verify_recmode_normal_callback;
-               state->async.private = rmdata;
+               state->async.private_data = rmdata;
 
                /* one more control to wait for to complete */
                rmdata->count++;
@@ -1321,7 +1869,7 @@ struct verify_recmaster_data {
 
 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
 {
-       struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private, struct verify_recmaster_data);
+       struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
 
 
        /* one more node has responded with recmaster data*/
@@ -1341,7 +1889,7 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
           status field
        */
        if (state->status != rmdata->pnn) {
-               DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
                rmdata->status = MONITOR_ELECTION_NEEDED;
        }
 
@@ -1377,14 +1925,14 @@ static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ct
                        /* we failed to send the control, treat this as 
                           an error and try again next iteration
                        */                      
-                       DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
+                       DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
                        talloc_free(mem_ctx);
                        return MONITOR_FAILED;
                }
 
                /* set up the callback functions */
                state->async.fn = verify_recmaster_callback;
-               state->async.private = rmdata;
+               state->async.private_data = rmdata;
 
                /* one more control to wait for to complete */
                rmdata->count++;
@@ -1417,6 +1965,10 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        struct ctdb_vnn_map *remote_vnnmap=NULL;
        int i, j, ret;
        struct ctdb_recoverd *rec;
+       struct ctdb_all_public_ips *ips;
+       char c;
+
+       DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
 
        rec = talloc_zero(ctdb, struct ctdb_recoverd);
        CTDB_NO_MEMORY_FATAL(ctdb, rec);
@@ -1438,6 +1990,9 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        /* and one for when nodes are unbanned */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
+
+       /* register a message port for vacuum fetch */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
        
 again:
        if (mem_ctx) {
@@ -1446,30 +2001,53 @@ again:
        }
        mem_ctx = talloc_new(ctdb);
        if (!mem_ctx) {
-               DEBUG(0,("Failed to create temporary context\n"));
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
                exit(-1);
        }
 
        /* we only check for recovery once every second */
        ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
 
+       /* verify that the main daemon is still running */
+       if (kill(ctdb->ctdbd_pid, 0) != 0) {
+               DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
+               exit(-1);
+       }
+
+       if (rec->election_timeout) {
+               /* an election is in progress */
+               goto again;
+       }
+
+
+       /* We must check if we need to ban a node here but we want to do this
+          as early as possible so we dont wait until we have pulled the node
+          map from the local node. thats why we have the hardcoded value 20
+       */
+       if (rec->culprit_counter > 20) {
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
+                        rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
+                        ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
+       }
+
        /* get relevant tunables */
        ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
        if (ret != 0) {
-               DEBUG(0,("Failed to get tunables - retrying\n"));
+               DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
                goto again;
        }
 
        pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
        if (pnn == (uint32_t)-1) {
-               DEBUG(0,("Failed to get local pnn - retrying\n"));
+               DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
                goto again;
        }
 
        /* get the vnnmap */
        ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
                goto again;
        }
 
@@ -1477,38 +2055,67 @@ again:
        /* get number of nodes */
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
                goto again;
        }
 
-
-       /* count how many active nodes there are */
-       num_active = 0;
-       for (i=0; i<nodemap->num; i++) {
-               if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
-                       nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
-               } else {
-                       nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
-               }
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                       num_active++;
-               }
-       }
-
-
        /* check which node is the recovery master */
        ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
                goto again;
        }
 
        if (recmaster == (uint32_t)-1) {
-               DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
+               DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
                force_election(rec, mem_ctx, pnn, nodemap);
                goto again;
        }
        
+       /* check that we (recovery daemon) and the local ctdb daemon
+          agrees on whether we are banned or not
+       */
+       if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
+               if (rec->banned_nodes[pnn] == NULL) {
+                       if (recmaster == pnn) {
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
+
+                               ctdb_unban_node(rec, pnn);
+                       } else {
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
+                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
+                               ctdb_set_culprit(rec, pnn);
+                       }
+                       goto again;
+               }
+       } else {
+               if (rec->banned_nodes[pnn] != NULL) {
+                       if (recmaster == pnn) {
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
+
+                               ctdb_unban_node(rec, pnn);
+                       } else {
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
+
+                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
+                               ctdb_set_culprit(rec, pnn);
+                       }
+                       goto again;
+               }
+       }
+
+       /* remember our own node flags */
+       rec->node_flags = nodemap->nodes[pnn].flags;
+
+       /* count how many active nodes there are */
+       num_active = 0;
+       for (i=0; i<nodemap->num; i++) {
+               if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
+                       num_active++;
+               }
+       }
+
+
        /* verify that the recmaster node is still active */
        for (j=0; j<nodemap->num; j++) {
                if (nodemap->nodes[j].pnn==recmaster) {
@@ -1517,17 +2124,79 @@ again:
        }
 
        if (j == nodemap->num) {
-               DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
+               DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", recmaster));
                force_election(rec, mem_ctx, pnn, nodemap);
                goto again;
        }
 
-       if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-               DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
+       /* if recovery master is disconnected we must elect a new recmaster */
+       if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
+               DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
                force_election(rec, mem_ctx, pnn, nodemap);
                goto again;
        }
-       
+
+       /* grap the nodemap from the recovery master to check if it is banned */
+       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
+                                  mem_ctx, &remote_nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
+                         nodemap->nodes[j].pnn));
+               goto again;
+       }
+
+
+       if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+               DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
+               force_election(rec, mem_ctx, pnn, nodemap);
+               goto again;
+       }
+
+       /* verify that the public ip address allocation is consistent */
+       if (ctdb->vnn != NULL) {
+               ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", i));
+                       goto again;
+               }
+               for (j=0; j<ips->num; j++) {
+                       /* verify that we have the ip addresses we should have
+                          and we dont have ones we shouldnt have.
+                          if we find an inconsistency we set recmode to
+                          active on the local node and wait for the recmaster
+                          to do a full blown recovery
+                       */
+                       if (ips->ips[j].pnn == pnn) {
+                               if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
+                                       DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
+                                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+                                       if (ret != 0) {
+                                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+                                               goto again;
+                                       }
+                                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                                       if (ret != 0) {
+                                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+                                               goto again;
+                                       }
+                               }
+                       } else {
+                               if (ctdb_sys_have_ip(ips->ips[j].sin)) {
+                                       DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
+                                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+                                       if (ret != 0) {
+                                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+                                               goto again;
+                                       }
+                                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                                       if (ret != 0) {
+                                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+                                               goto again;
+                                       }
+                               }
+                       }
+               }
+       }
 
        /* if we are not the recmaster then we do not need to check
           if recovery is needed
@@ -1537,6 +2206,18 @@ again:
        }
 
 
+       /* ensure our local copies of flags are right */
+       ret = update_local_flags(rec, nodemap);
+       if (ret == MONITOR_ELECTION_NEEDED) {
+               DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
+               force_election(rec, mem_ctx, pnn, nodemap);
+               goto again;
+       }
+       if (ret != MONITOR_OK) {
+               DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
+               goto again;
+       }
+
        /* update the list of public ips that a node can handle for
           all connected nodes
        */
@@ -1554,7 +2235,7 @@ again:
                        ctdb->nodes[j]->pnn, 
                        ctdb->nodes,
                        &ctdb->nodes[j]->public_ips)) {
-                       DEBUG(0,("Failed to read public ips from node : %u\n", 
+                       DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
                                ctdb->nodes[j]->pnn));
                        goto again;
                }
@@ -1576,12 +2257,18 @@ again:
        }
 
 
+       if (rec->need_recovery) {
+               /* a previous recovery didn't finish */
+               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+               goto again;             
+       }
+
        /* verify that all active nodes are in normal mode 
           and not in recovery mode 
         */
        switch (verify_recmode(ctdb, nodemap)) {
        case MONITOR_RECOVERY_NEEDED:
-               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
+               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
                goto again;
        case MONITOR_FAILED:
                goto again;
@@ -1592,6 +2279,20 @@ again:
        }
 
 
+       /* we should have the reclock - check its not stale */
+       if (ctdb->recovery_lock_fd == -1) {
+               DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
+               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+               goto again;
+       }
+
+       if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
+               DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
+               close(ctdb->recovery_lock_fd);
+               ctdb->recovery_lock_fd = -1;
+               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+               goto again;
+       }
 
        /* get the nodemap for all active remote nodes and verify
           they are the same as for this node
@@ -1607,7 +2308,7 @@ again:
                ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                           mem_ctx, &remote_nodemap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
                                  nodemap->nodes[j].pnn));
                        goto again;
                }
@@ -1616,7 +2317,7 @@ again:
                   then this is a good reason to try recovery
                 */
                if (remote_nodemap->num != nodemap->num) {
-                       DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
+                       DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
                                  nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
                        do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
@@ -1627,7 +2328,7 @@ again:
                 */
                for (i=0;i<nodemap->num;i++) {
                        if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
-                               DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
                                          nodemap->nodes[j].pnn, i, 
                                          remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
                                do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
@@ -1636,7 +2337,7 @@ again:
                        }
                        if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
                            (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                               DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
                                          nodemap->nodes[j].pnn, i,
                                          remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
                                do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
@@ -1645,15 +2346,6 @@ again:
                        }
                }
 
-               /* update our nodemap flags according to the other
-                  server - this gets the NODE_FLAGS_DISABLED
-                  flag. Note that the remote node is authoritative
-                  for its flags (except CONNECTED, which we know
-                  matches in this code) */
-               if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
-                       nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
-                       rec->need_takeover_run = true;
-               }
        }
 
 
@@ -1661,7 +2353,7 @@ again:
           as there are active nodes or we will have to do a recovery
         */
        if (vnnmap->size != num_active) {
-               DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
+               DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
                          vnnmap->size, num_active));
                do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
                goto again;
@@ -1684,7 +2376,7 @@ again:
                        }
                }
                if (i == vnnmap->size) {
-                       DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
                                  nodemap->nodes[j].pnn));
                        do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
@@ -1706,14 +2398,14 @@ again:
                ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                          mem_ctx, &remote_vnnmap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
                                  nodemap->nodes[j].pnn));
                        goto again;
                }
 
                /* verify the vnnmap generation is the same */
                if (vnnmap->generation != remote_vnnmap->generation) {
-                       DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
                        do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
@@ -1721,7 +2413,7 @@ again:
 
                /* verify the vnnmap size is the same */
                if (vnnmap->size != remote_vnnmap->size) {
-                       DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
                        do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
@@ -1730,7 +2422,7 @@ again:
                /* verify the vnnmap is the same */
                for (i=0;i<vnnmap->size;i++) {
                        if (remote_vnnmap->map[i] != vnnmap->map[i]) {
-                               DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
                                          nodemap->nodes[j].pnn));
                                do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
                                            vnnmap, nodemap->nodes[j].pnn);
@@ -1742,11 +2434,28 @@ again:
        /* we might need to change who has what IP assigned */
        if (rec->need_takeover_run) {
                rec->need_takeover_run = false;
+
+               /* execute the "startrecovery" event script on all nodes */
+               ret = run_startrecovery_eventscript(ctdb, nodemap);
+               if (ret!=0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
+                       do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
+                                   vnnmap, ctdb->pnn);
+               }
+
                ret = ctdb_takeover_run(ctdb, nodemap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
+                       do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
+                                   vnnmap, ctdb->pnn);
+               }
+
+               /* execute the "recovered" event script on all nodes */
+               ret = run_recovered_eventscript(ctdb, nodemap);
+               if (ret!=0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
                        do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
-                                   vnnmap, nodemap->nodes[j].pnn);
+                                   vnnmap, ctdb->pnn);
                }
        }
 
@@ -1760,12 +2469,10 @@ again:
 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
                                 uint16_t flags, void *private_data)
 {
-       DEBUG(0,("recovery daemon parent died - exiting\n"));
+       DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
        _exit(1);
 }
 
-
-
 /*
   startup the recovery daemon as a child of the main ctdb daemon
  */
@@ -1773,18 +2480,19 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 {
        int ret;
        int fd[2];
-       pid_t child;
 
        if (pipe(fd) != 0) {
                return -1;
        }
 
-       child = fork();
-       if (child == -1) {
+       ctdb->ctdbd_pid = getpid();
+
+       ctdb->recoverd_pid = fork();
+       if (ctdb->recoverd_pid == -1) {
                return -1;
        }
        
-       if (child != 0) {
+       if (ctdb->recoverd_pid != 0) {
                close(fd[0]);
                return 0;
        }
@@ -1806,15 +2514,33 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        srandom(getpid() ^ time(NULL));
 
+       /* the recovery daemon does not need to be realtime */
+       if (ctdb->do_setsched) {
+               ctdb_restore_scheduler(ctdb);
+       }
+
        /* initialise ctdb */
        ret = ctdb_socket_connect(ctdb);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Failed to init ctdb\n"));
+               DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
                exit(1);
        }
 
        monitor_cluster(ctdb);
 
-       DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
+       DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
        return -1;
 }
+
+/*
+  shutdown the recovery daemon
+ */
+void ctdb_stop_recoverd(struct ctdb_context *ctdb)
+{
+       if (ctdb->recoverd_pid == 0) {
+               return;
+       }
+
+       DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
+       kill(ctdb->recoverd_pid, SIGTERM);
+}