If we can not pull a database from a node during recovery, mark this node as a "culpr...
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
index b67a9262d12e1ee4810b4f18741b592dfc08c694..c812f916959ffa72d261939e88b3a242a551efcc 100644 (file)
@@ -27,6 +27,8 @@
 #include "cmdline.h"
 #include "../include/ctdb.h"
 #include "../include/ctdb_private.h"
+#include "db_wrap.h"
+#include "dlinklist.h"
 
 
 struct ban_state {
@@ -39,6 +41,10 @@ struct ban_state {
  */
 struct ctdb_recoverd {
        struct ctdb_context *ctdb;
+       uint32_t recmaster;
+       uint32_t num_active;
+       uint32_t num_connected;
+       struct ctdb_node_map *nodemap;
        uint32_t last_culprit;
        uint32_t culprit_counter;
        struct timeval first_recover_time;
@@ -49,61 +55,13 @@ struct ctdb_recoverd {
        uint32_t node_flags;
        struct timed_event *send_election_te;
        struct timed_event *election_timeout;
+       struct vacuum_info *vacuum_info;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
 
 
-struct async_data {
-       uint32_t count;
-       uint32_t fail_count;
-};
-
-static void async_callback(struct ctdb_client_control_state *state)
-{
-       struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
-
-       /* one more node has responded with recmode data */
-       data->count--;
-
-       /* if we failed to push the db, then return an error and let
-          the main loop try again.
-       */
-       if (state->state != CTDB_CONTROL_DONE) {
-               DEBUG(0,("Async operation failed with state %d\n", state->state));
-               data->fail_count++;
-       }
-}
-
-
-static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
-{
-       /* set up the callback functions */
-       state->async.fn = async_callback;
-       state->async.private_data = data;
-       
-       /* one more control to wait for to complete */
-       data->count++;
-}
-
-
-/* wait for up to the maximum number of seconds allowed
-   or until all nodes we expect a response from has replied
-*/
-static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
-{
-       while (data->count > 0) {
-               event_loop_once(ctdb->ev);
-       }
-       if (data->fail_count != 0) {
-               DEBUG(0,("Async wait failed - fail_count=%u\n", data->fail_count));
-               return -1;
-       }
-       return 0;
-}
-
-
 /*
   unban a node
  */
@@ -111,10 +69,10 @@ static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
 {
        struct ctdb_context *ctdb = rec->ctdb;
 
-       DEBUG(0,("Unbanning node %u\n", pnn));
+       DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
 
        if (!ctdb_validate_pnn(ctdb, pnn)) {
-               DEBUG(0,("Bad pnn %u in ctdb_unban_node\n", pnn));
+               DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
                return;
        }
 
@@ -123,14 +81,14 @@ static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
                TDB_DATA data;
                int ret;
                
-               DEBUG(0,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
+               DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
 
                data.dptr = (uint8_t *)&pnn;
                data.dsize = sizeof(uint32_t);
 
                ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
                if (ret != 0) {
-                       DEBUG(0,("Failed to unban node %u\n", pnn));
+                       DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
                        return;
                }
 
@@ -141,11 +99,11 @@ static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
           there is an election */
        rec->node_flags &= ~NODE_FLAGS_BANNED;
 
-       DEBUG(0,("Clearing ban flag on node %u\n", pnn));
+       DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
        ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
 
        if (rec->banned_nodes[pnn] == NULL) {
-               DEBUG(0,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
+               DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
                return;
        }
 
@@ -163,7 +121,7 @@ static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, s
        struct ctdb_recoverd *rec = state->rec;
        uint32_t pnn = state->banned_node;
 
-       DEBUG(0,("Ban timeout. Node %u is now unbanned\n", pnn));
+       DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
        ctdb_unban_node(rec, pnn);
 }
 
@@ -174,15 +132,15 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
 {
        struct ctdb_context *ctdb = rec->ctdb;
 
-       DEBUG(0,("Banning node %u for %u seconds\n", pnn, ban_time));
+       DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
 
        if (!ctdb_validate_pnn(ctdb, pnn)) {
-               DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
+               DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
                return;
        }
 
        if (0 == ctdb->tunable.enable_bans) {
-               DEBUG(0,("Bans are disabled - ignoring ban of node %u\n", pnn));
+               DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
                return;
        }
 
@@ -192,7 +150,7 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
                TDB_DATA data;
                int ret;
                
-               DEBUG(0,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
+               DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
 
                b.pnn = pnn;
                b.ban_time = ban_time;
@@ -202,14 +160,14 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
 
                ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
                if (ret != 0) {
-                       DEBUG(0,("Failed to ban node %u\n", pnn));
+                       DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
                        return;
                }
 
                return;
        }
 
-       DEBUG(0,("self ban - lowering our election priority\n"));
+       DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
        ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
 
        /* banning ourselves - lower our election priority */
@@ -220,7 +178,7 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
        rec->node_flags |= NODE_FLAGS_BANNED;
 
        if (rec->banned_nodes[pnn] != NULL) {
-               DEBUG(0,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));               
+               DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
                talloc_free(rec->banned_nodes[pnn]);
                rec->banned_nodes[pnn] = NULL;
        }
@@ -241,90 +199,187 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
 
 
+/*
+  run the "recovered" eventscript on all nodes
+ */
+static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
+{
+       TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
+
+               talloc_free(tmp_ctx);
+               return -1;
+       }
 
+       talloc_free(tmp_ctx);
+       return 0;
+}
 
-/* freeze all nodes */
-static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+/*
+  remember the trouble maker
+ */
+static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
 {
-       struct async_data *async_data;
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       struct ctdb_client_control_state *state;
-       int j;
-       
-       async_data = talloc_zero(mem_ctx, struct async_data);
-       CTDB_NO_MEMORY_FATAL(ctdb, async_data);
+       struct ctdb_context *ctdb = rec->ctdb;
 
-       /* loop over all active nodes and send an async freeze call to 
-          them*/
-       for (j=0; j<nodemap->num; j++) {
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
-                                             CONTROL_TIMEOUT(), 
-                                             nodemap->nodes[j].pnn);
-               if (state == NULL) {
-                       /* we failed to send the control, treat this as 
-                          an error and try again next iteration
-                       */                      
-                       DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
-                       talloc_free(mem_ctx);
-                       return MONITOR_RECOVERY_NEEDED;
-               }
-               
-               async_add(async_data, state);
+       if (rec->last_culprit != culprit ||
+           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
+               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
+               /* either a new node is the culprit, or we've decided to forgive them */
+               rec->last_culprit = culprit;
+               rec->first_recover_time = timeval_current();
+               rec->culprit_counter = 0;
        }
+       rec->culprit_counter++;
+}
 
-       if (async_wait(ctdb, async_data) != 0) {
-               DEBUG(0,(__location__ " Failed async freeze call\n"));
-               talloc_free(mem_ctx);
-               return MONITOR_RECOVERY_NEEDED;
+/*
+  remember the trouble maker
+ */
+static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+
+       if (rec->last_culprit != culprit ||
+           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
+               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
+               /* either a new node is the culprit, or we've decided to forgive them */
+               rec->last_culprit = culprit;
+               rec->first_recover_time = timeval_current();
+               rec->culprit_counter = 0;
        }
+       rec->culprit_counter += count;
+}
 
-       talloc_free(mem_ctx);
-       return MONITOR_OK;
+/* this callback is called for every node that failed to execute the
+   start recovery event
+*/
+static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
+
+       ctdb_set_culprit(rec, node_pnn);
+}
+
+/*
+  run the "startrecovery" eventscript on all nodes
+ */
+static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
+{
+       TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
+       struct ctdb_context *ctdb = rec->ctdb;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL,
+                                       startrecovery_fail_callback,
+                                       rec) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
+               DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
+               return;
+       }
+       if (node_pnn < ctdb->num_nodes) {
+               ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
+       }
 }
 
+/*
+  update the node capabilities for all connected nodes
+ */
+static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+{
+       uint32_t *nodes;
+       TALLOC_CTX *tmp_ctx;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
+                                       nodes, CONTROL_TIMEOUT(),
+                                       false, tdb_null,
+                                       async_getcap_callback, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
 
 /*
   change recovery mode on all nodes
  */
 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
 {
-       int j, ret;
+       TDB_DATA data;
+       uint32_t *nodes;
+       TALLOC_CTX *tmp_ctx;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
        /* freeze all nodes */
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (rec_mode == CTDB_RECOVERY_ACTIVE) {
-               ret = freeze_all_nodes(ctdb, nodemap);
-               if (ret != MONITOR_OK) {
-                       DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
+               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
+                                               nodes, CONTROL_TIMEOUT(),
+                                               false, tdb_null,
+                                               NULL, NULL,
+                                               NULL) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
+                       talloc_free(tmp_ctx);
                        return -1;
                }
        }
 
 
-       /* set recovery mode to active on all nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* dont change it for nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
-                       return -1;
-               }
+       data.dsize = sizeof(uint32_t);
+       data.dptr = (unsigned char *)&rec_mode;
 
-               if (rec_mode == CTDB_RECOVERY_NORMAL) {
-                       ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
-                       if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
-                               return -1;
-                       }
-               }
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
+                                       nodes, CONTROL_TIMEOUT(),
+                                       false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
        }
 
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -333,22 +388,28 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
  */
 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
 {
-       int j, ret;
-
-       /* set recovery master to pnn on all nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* dont change it for nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
+       TDB_DATA data;
+       TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
 
-               ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
-                       return -1;
-               }
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       data.dsize = sizeof(uint32_t);
+       data.dptr = (unsigned char *)&pnn;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
        }
 
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -376,7 +437,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                         mem_ctx, &remote_dbmap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
                        return -1;
                }
 
@@ -398,13 +459,13 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                        ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
                                            mem_ctx, &name);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
                                return -1;
                        }
                        ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                           mem_ctx, name, dbmap->dbs[db].persistent);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
                                return -1;
                        }
                }
@@ -437,7 +498,7 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                         mem_ctx, &remote_dbmap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
                        return -1;
                }
 
@@ -460,19 +521,19 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                        ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                            remote_dbmap->dbs[db].dbid, mem_ctx, &name);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
                                          nodemap->nodes[j].pnn));
                                return -1;
                        }
                        ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
                                           remote_dbmap->dbs[db].persistent);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
                                return -1;
                        }
                        ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
                                return -1;
                        }
                }
@@ -483,59 +544,114 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
 
 
 /*
-  pull all the remote database contents into ours
+  pull the remote database contents from one node into the recdb
  */
-static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap
-                                    uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode
+                                   struct tdb_wrap *recdb, uint32_t dbid)
 {
-       int i, j, ret;
+       int ret;
+       TDB_DATA outdata;
+       struct ctdb_marshall_buffer *reply;
+       struct ctdb_rec_data *rec;
+       int i;
+       TALLOC_CTX *tmp_ctx = talloc_new(recdb);
 
-       /* pull all records from all other nodes across onto this node
-          (this merges based on rsn)
-       */
-       for (i=0;i<dbmap->num;i++) {
-               for (j=0; j<nodemap->num; j++) {
-                       /* we dont need to merge with ourselves */
-                       if (nodemap->nodes[j].pnn == pnn) {
-                               continue;
+       ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
+                              CONTROL_TIMEOUT(), &outdata);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       reply = (struct ctdb_marshall_buffer *)outdata.dptr;
+
+       if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
+               DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+       
+       rec = (struct ctdb_rec_data *)&reply->data[0];
+       
+       for (i=0;
+            i<reply->count;
+            rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
+               TDB_DATA key, data;
+               struct ctdb_ltdb_header *hdr;
+               TDB_DATA existing;
+               
+               key.dptr = &rec->data[0];
+               key.dsize = rec->keylen;
+               data.dptr = &rec->data[key.dsize];
+               data.dsize = rec->datalen;
+               
+               hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+
+               /* fetch the existing record, if any */
+               existing = tdb_fetch(recdb->tdb, key);
+               
+               if (existing.dptr != NULL) {
+                       struct ctdb_ltdb_header header;
+                       if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
+                               DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
+                                        (unsigned)existing.dsize, srcnode));
+                               free(existing.dptr);
+                               talloc_free(tmp_ctx);
+                               return -1;
                        }
-                       /* dont merge from nodes that are unavailable */
-                       if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       header = *(struct ctdb_ltdb_header *)existing.dptr;
+                       free(existing.dptr);
+                       if (!(header.rsn < hdr->rsn ||
+                             (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
                                continue;
                        }
-                       ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                              pnn, dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
-                       if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
-                                         nodemap->nodes[j].pnn, pnn));
-                               return -1;
-                       }
+               }
+               
+               if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
+                       DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;                              
                }
        }
 
+       talloc_free(tmp_ctx);
+
        return 0;
 }
 
-
 /*
-  change the dmaster on all databases to point to us
+  pull all the remote database contents into the recdb
  */
-static int update_dmaster_on_our_databases(struct ctdb_context *ctdb, uint32_t pnn, 
-                                          struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+static int pull_remote_database(struct ctdb_context *ctdb,
+                               struct ctdb_recoverd *rec, 
+                               struct ctdb_node_map *nodemap, 
+                               struct tdb_wrap *recdb, uint32_t dbid)
 {
-       int i, ret;
+       int j;
 
-       /* update dmaster to point to this node for all databases/nodes */
-       for (i=0;i<dbmap->num;i++) {
-               ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), pnn, 
-                                          ctdb, dbmap->dbs[i].dbid, pnn);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", 
-                                 pnn, dbmap->dbs[i].dbid));
+       /* pull all records from all other nodes across onto this node
+          (this merges based on rsn)
+       */
+       for (j=0; j<nodemap->num; j++) {
+               /* dont merge from nodes that are unavailable */
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
+                                nodemap->nodes[j].pnn));
+                       ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
                        return -1;
                }
        }
-
+       
        return 0;
 }
 
@@ -543,264 +659,298 @@ static int update_dmaster_on_our_databases(struct ctdb_context *ctdb, uint32_t p
 /*
   update flags on all active nodes
  */
-static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
 {
-       int i;
-       for (i=0;i<nodemap->num;i++) {
-               struct ctdb_node_flag_change c;
-               TDB_DATA data;
-
-               c.pnn = nodemap->nodes[i].pnn;
-               c.old_flags = nodemap->nodes[i].flags;
-               c.new_flags = nodemap->nodes[i].flags;
-
-               data.dptr = (uint8_t *)&c;
-               data.dsize = sizeof(c);
-
-               ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
-                                 CTDB_SRVID_NODE_FLAGS_CHANGED, data);
+       int ret;
 
+       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
+               if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+               return -1;
        }
+
        return 0;
 }
 
 /*
-  vacuum one database
+  ensure all nodes have the same vnnmap we do
  */
-static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
+static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+                                     uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
 {
-       uint64_t max_rsn;
-       int ret, i;
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       struct async_data *async_data;
-       struct ctdb_client_control_state *state;
-
-       /* find max rsn on our local node for this db */
-       ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
-       if (ret != 0) {
-               talloc_free(mem_ctx);
-               return -1;
-       }
-
-       async_data = talloc_zero(mem_ctx, struct async_data);
-       CTDB_NO_MEMORY_FATAL(ctdb, async_data);
+       int j, ret;
 
-       /* set rsn on non-empty records to max_rsn+1 */
-       for (i=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+       /* push the new vnn map out to all the nodes */
+       for (j=0; j<nodemap->num; j++) {
+               /* dont push to nodes that are unavailable */
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               state = ctdb_ctrl_set_rsn_nonempty_send(ctdb, async_data, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
-                                                       db_id, max_rsn+1);
-               if (state == NULL) {
-                       DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
-                                nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
-                       talloc_free(mem_ctx);
-                       return -1;
-               }
-               async_add(async_data, state);
-       }
-       
-       if (async_wait(ctdb, async_data) != 0) {
-               DEBUG(0,(__location__ " Failed async calls to set rsn nonempty\n"));
-               talloc_free(mem_ctx);
-               return -1;
-       }
-
 
-       /* delete records with rsn < max_rsn+1 on all nodes */
-       for (i=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               state = ctdb_ctrl_delete_low_rsn_send(ctdb, async_data, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
-                                                     db_id, max_rsn+1);
-               if (state == NULL) {
-                       DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
-                                nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
-                       talloc_free(mem_ctx);
+               ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
                        return -1;
                }
-               async_add(async_data, state);
        }
 
-       if (async_wait(ctdb, async_data) != 0) {
-               DEBUG(0,(__location__ " Failed async calls to delete low rsn\n"));
+       return 0;
+}
+
+
+/*
+  handler for when the admin bans a node
+*/
+static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                       TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
+       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
+
+       if (data.dsize != sizeof(*b)) {
+               DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
                talloc_free(mem_ctx);
-               return -1;
+               return;
        }
 
-       return 0;
-}
+       if (b->pnn != ctdb->pnn) {
+               DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
+               return;
+       }
 
+       DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
+                b->pnn, b->ban_time));
+
+       ctdb_ban_node(rec, b->pnn, b->ban_time);
+       talloc_free(mem_ctx);
+}
 
 /*
-  vacuum all attached databases
- */
-static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap
-                               struct ctdb_dbid_map *dbmap)
+  handler for when the admin unbans a node
+*/
+static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid
+                         TDB_DATA data, void *private_data)
 {
-       int i;
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
+       uint32_t pnn;
 
-       /* update dmaster to point to this node for all databases/nodes */
-       for (i=0;i<dbmap->num;i++) {
-               if (vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap) != 0) {
-                       return -1;
-               }
+       if (data.dsize != sizeof(uint32_t)) {
+               DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
+               talloc_free(mem_ctx);
+               return;
        }
-       return 0;
+       pnn = *(uint32_t *)data.dptr;
+
+       if (pnn != ctdb->pnn) {
+               DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
+               return;
+       }
+
+       DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
+       ctdb_unban_node(rec, pnn);
+       talloc_free(mem_ctx);
 }
 
+
+struct vacuum_info {
+       struct vacuum_info *next, *prev;
+       struct ctdb_recoverd *rec;
+       uint32_t srcnode;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_marshall_buffer *recs;
+       struct ctdb_rec_data *r;
+};
+
+static void vacuum_fetch_next(struct vacuum_info *v);
+
 /*
-  push out all our database contents to all other nodes
+  called when a vacuum fetch has completed - just free it and do the next one
  */
-static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                                   uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
 {
-       int i;
+       struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
+       talloc_free(state);
+       vacuum_fetch_next(v);
+}
 
-       /* push all records out to the nodes again */
-       for (i=0;i<dbmap->num;i++) {
-               int j, ret;
-               TDB_DATA outdata;
-               struct async_data *async_data;
-               struct ctdb_client_control_state *state;
 
-               DEBUG(3,("pulling dbid 0x%x from local node %u\n",
-                       dbmap->dbs[i].dbid, pnn));
+/*
+  process the next element from the vacuum list
+*/
+static void vacuum_fetch_next(struct vacuum_info *v)
+{
+       struct ctdb_call call;
+       struct ctdb_rec_data *r;
 
-               async_data = talloc_zero(mem_ctx, struct async_data);
-               CTDB_NO_MEMORY_FATAL(ctdb, async_data);
+       while (v->recs->count) {
+               struct ctdb_client_call_state *state;
+               TDB_DATA data;
+               struct ctdb_ltdb_header *hdr;
 
-               ret = ctdb_ctrl_pulldb(ctdb, pnn, dbmap->dbs[i].dbid,
-                                      CTDB_LMASTER_ANY,
-                                      async_data, CONTROL_TIMEOUT(), &outdata);
-               if (ret != 0) {
-                       DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
-                       return -1;
-               }
+               ZERO_STRUCT(call);
+               call.call_id = CTDB_NULL_FUNC;
+               call.flags = CTDB_IMMEDIATE_MIGRATION;
 
-               for (j=0; j<nodemap->num; j++) {
-                       /* we dont need to push to ourselves */
-                       if (nodemap->nodes[j].pnn == pnn) {
-                               continue;
-                       }
-                       /* dont push to nodes that are unavailable */
-                       if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                               continue;
-                       }
+               r = v->r;
+               v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
+               v->recs->count--;
 
-                       DEBUG(3,("starting async push of dbid 0x%x to %u\n",
-                                dbmap->dbs[i].dbid,
-                                nodemap->nodes[j].pnn));
+               call.key.dptr = &r->data[0];
+               call.key.dsize = r->keylen;
 
-                       state = ctdb_ctrl_pushdb_send(ctdb,
-                                                     nodemap->nodes[j].pnn, 
-                                                     dbmap->dbs[i].dbid, async_data, 
-                                                     CONTROL_TIMEOUT(), outdata);
-                       if (state == NULL) {
-                               DEBUG(0,(__location__ " async control for pushdb for dbid 0x%08x to node %u failed\n", dbmap->dbs[i].dbid, nodemap->nodes[j].pnn));
-                               talloc_free(async_data);
-                               return -1;
-                       }
+               /* ensure we don't block this daemon - just skip a record if we can't get
+                  the chainlock */
+               if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
+                       continue;
+               }
 
-                       async_add(async_data, state);
+               data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
+               if (data.dptr == NULL) {
+                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+                       continue;
                }
 
-               if (async_wait(ctdb, async_data) != 0) {
-                       DEBUG(0,("Async push of database 0x%08x failed\n", dbmap->dbs[i].dbid));
-                       talloc_free(async_data);
-                       return -1;
+               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+                       free(data.dptr);
+                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+                       continue;
+               }
+               
+               hdr = (struct ctdb_ltdb_header *)data.dptr;
+               if (hdr->dmaster == v->rec->ctdb->pnn) {
+                       /* its already local */
+                       free(data.dptr);
+                       tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+                       continue;
                }
 
-               talloc_free(async_data);
+               free(data.dptr);
+
+               state = ctdb_call_send(v->ctdb_db, &call);
+               tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+               if (state == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
+                       talloc_free(v);
+                       return;
+               }
+               state->async.fn = vacuum_fetch_callback;
+               state->async.private_data = v;
+               return;
        }
 
-       return 0;
+       talloc_free(v);
 }
 
 
 /*
-  ensure all nodes have the same vnnmap we do
+  destroy a vacuum info structure
  */
-static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                                     uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
+static int vacuum_info_destructor(struct vacuum_info *v)
 {
-       int j, ret;
-
-       /* push the new vnn map out to all the nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* dont push to nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-                       return -1;
-               }
-       }
-
+       DLIST_REMOVE(v->rec->vacuum_info, v);
        return 0;
 }
 
 
 /*
-  handler for when the admin bans a node
+  handler for vacuum fetch
 */
-static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                       TDB_DATA data, void *private_data)
+static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                                TDB_DATA data, void *private_data)
 {
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-
-       if (data.dsize != sizeof(*b)) {
-               DEBUG(0,("Bad data in ban_handler\n"));
-               talloc_free(mem_ctx);
+       struct ctdb_marshall_buffer *recs;
+       int ret, i;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       const char *name;
+       struct ctdb_dbid_map *dbmap=NULL;
+       bool persistent = false;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_rec_data *r;
+       uint32_t srcnode;
+       struct vacuum_info *v;
+
+       recs = (struct ctdb_marshall_buffer *)data.dptr;
+       r = (struct ctdb_rec_data *)&recs->data[0];
+
+       if (recs->count == 0) {
+               talloc_free(tmp_ctx);
                return;
        }
 
-       if (b->pnn != ctdb->pnn) {
-               DEBUG(0,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
-               return;
+       srcnode = r->reqid;
+
+       for (v=rec->vacuum_info;v;v=v->next) {
+               if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
+                       /* we're already working on records from this node */
+                       talloc_free(tmp_ctx);
+                       return;
+               }
        }
 
-       DEBUG(0,("Node %u has been banned for %u seconds\n", 
-                b->pnn, b->ban_time));
+       /* work out if the database is persistent */
+       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
 
-       ctdb_ban_node(rec, b->pnn, b->ban_time);
-       talloc_free(mem_ctx);
-}
+       for (i=0;i<dbmap->num;i++) {
+               if (dbmap->dbs[i].dbid == recs->db_id) {
+                       persistent = dbmap->dbs[i].persistent;
+                       break;
+               }
+       }
+       if (i == dbmap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
+               talloc_free(tmp_ctx);
+               return;         
+       }
 
-/*
-  handler for when the admin unbans a node
-*/
-static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                         TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       uint32_t pnn;
+       /* find the name of this database */
+       if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
+               talloc_free(tmp_ctx);
+               return;
+       }
 
-       if (data.dsize != sizeof(uint32_t)) {
-               DEBUG(0,("Bad data in unban_handler\n"));
-               talloc_free(mem_ctx);
+       /* attach to it */
+       ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
+               talloc_free(tmp_ctx);
                return;
        }
-       pnn = *(uint32_t *)data.dptr;
 
-       if (pnn != ctdb->pnn) {
-               DEBUG(0,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
+       v = talloc_zero(rec, struct vacuum_info);
+       if (v == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
+               talloc_free(tmp_ctx);
                return;
        }
 
-       DEBUG(0,("Node %u has been unbanned.\n", pnn));
-       ctdb_unban_node(rec, pnn);
-       talloc_free(mem_ctx);
-}
+       v->rec = rec;
+       v->srcnode = srcnode;
+       v->ctdb_db = ctdb_db;
+       v->recs = talloc_memdup(v, recs, data.dsize);
+       if (v->recs == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
+               talloc_free(v);
+               talloc_free(tmp_ctx);
+               return;         
+       }
+       v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
 
+       DLIST_ADD(rec->vacuum_info, v);
+
+       talloc_set_destructor(v, vacuum_info_destructor);
+
+       vacuum_fetch_next(v);
+       talloc_free(tmp_ctx);
+}
 
 
 /*
@@ -848,24 +998,6 @@ static void ctdb_wait_election(struct ctdb_recoverd *rec)
        }
 }
 
-/*
-  remember the trouble maker
- */
-static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
-{
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               DEBUG(0,("New recovery culprit %u\n", culprit));
-               /* either a new node is the culprit, or we've decided to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
-       }
-       rec->culprit_counter++;
-}
-
 /*
   Update our local flags from all remote connected nodes. 
   This is only run when we are or we belive we are the recovery master
@@ -893,15 +1025,21 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                           mem_ctx, &remote_nodemap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
                                  nodemap->nodes[j].pnn));
                        ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                        talloc_free(mem_ctx);
                        return MONITOR_FAILED;
                }
                if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
-                       struct ctdb_node_flag_change c;
-                       TDB_DATA data;
+                       int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
+
+                       if (ban_changed) {
+                               DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
+                               nodemap->nodes[j].pnn,
+                               remote_nodemap->nodes[j].flags,
+                               nodemap->nodes[j].flags));
+                       }
 
                        /* We should tell our daemon about this so it
                           updates its flags or else we will log the same 
@@ -909,21 +1047,16 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                           Since we are the recovery master we can just as
                           well update the flags on all nodes.
                        */
-                       c.pnn = nodemap->nodes[j].pnn;
-                       c.old_flags = nodemap->nodes[j].flags;
-                       c.new_flags = remote_nodemap->nodes[j].flags;
-
-                       data.dptr = (uint8_t *)&c;
-                       data.dsize = sizeof(c);
-
-                       ctdb_send_message(ctdb, ctdb->pnn,
-                                       CTDB_SRVID_NODE_FLAGS_CHANGED, 
-                                       data);
+                       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+                               return -1;
+                       }
 
                        /* Update our local copy of the flags in the recovery
                           daemon.
                        */
-                       DEBUG(0,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
+                       DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
                                 nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
                                 nodemap->nodes[j].flags));
                        nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
@@ -931,10 +1064,7 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                        /* If the BANNED flag has changed for the node
                           this is a good reason to do a new election.
                         */
-                       if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
-                               DEBUG(0,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
-                                nodemap->nodes[j].pnn, c.new_flags,
-                                c.old_flags));
+                       if (ban_changed) {
                                talloc_free(mem_ctx);
                                return MONITOR_ELECTION_NEEDED;
                        }
@@ -965,29 +1095,257 @@ static uint32_t new_generation(void)
        return generation;
 }
 
-               
+
+/*
+  create a temporary working database
+ */
+static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
+{
+       char *name;
+       struct tdb_wrap *recdb;
+       unsigned tdb_flags;
+
+       /* open up the temporary recovery database */
+       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
+       if (name == NULL) {
+               return NULL;
+       }
+       unlink(name);
+
+       tdb_flags = TDB_NOLOCK;
+       if (!ctdb->do_setsched) {
+               tdb_flags |= TDB_NOMMAP;
+       }
+
+       recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
+                             tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
+       if (recdb == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
+       }
+
+       talloc_free(name);
+
+       return recdb;
+}
+
+
+/* 
+   a traverse function for pulling all relevent records from recdb
+ */
+struct recdb_data {
+       struct ctdb_context *ctdb;
+       struct ctdb_marshall_buffer *recdata;
+       uint32_t len;
+       bool failed;
+};
+
+static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+       struct recdb_data *params = (struct recdb_data *)p;
+       struct ctdb_rec_data *rec;
+       struct ctdb_ltdb_header *hdr;
+
+       /* skip empty records */
+       if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+               return 0;
+       }
+
+       /* update the dmaster field to point to us */
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+       hdr->dmaster = params->ctdb->pnn;
+
+       /* add the record to the blob ready to send to the nodes */
+       rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
+       if (rec == NULL) {
+               params->failed = true;
+               return -1;
+       }
+       params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
+       if (params->recdata == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
+                        rec->length + params->len, params->recdata->count));
+               params->failed = true;
+               return -1;
+       }
+       params->recdata->count++;
+       memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
+       params->len += rec->length;
+       talloc_free(rec);
+
+       return 0;
+}
+
+/*
+  push the recdb database out to all nodes
+ */
+static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
+                              struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
+{
+       struct recdb_data params;
+       struct ctdb_marshall_buffer *recdata;
+       TDB_DATA outdata;
+       TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
+       CTDB_NO_MEMORY(ctdb, recdata);
+
+       recdata->db_id = dbid;
+
+       params.ctdb = ctdb;
+       params.recdata = recdata;
+       params.len = offsetof(struct ctdb_marshall_buffer, data);
+       params.failed = false;
+
+       if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
+               talloc_free(params.recdata);
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       if (params.failed) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
+               talloc_free(params.recdata);
+               talloc_free(tmp_ctx);
+               return -1;              
+       }
+
+       recdata = params.recdata;
+
+       outdata.dptr = (void *)recdata;
+       outdata.dsize = params.len;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, outdata,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
+               talloc_free(recdata);
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
+                 dbid, recdata->count));
+
+       talloc_free(recdata);
+       talloc_free(tmp_ctx);
+
+       return 0;
+}
+
+
+/*
+  go through a full recovery on one database 
+ */
+static int recover_database(struct ctdb_recoverd *rec, 
+                           TALLOC_CTX *mem_ctx,
+                           uint32_t dbid,
+                           uint32_t pnn, 
+                           struct ctdb_node_map *nodemap,
+                           uint32_t transaction_id)
+{
+       struct tdb_wrap *recdb;
+       int ret;
+       struct ctdb_context *ctdb = rec->ctdb;
+       TDB_DATA data;
+       struct ctdb_control_wipe_database w;
+       uint32_t *nodes;
+
+       recdb = create_recdb(ctdb, mem_ctx);
+       if (recdb == NULL) {
+               return -1;
+       }
+
+       /* pull all remote databases onto the recdb */
+       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
+
+       /* wipe all the remote databases. This is safe as we are in a transaction */
+       w.db_id = dbid;
+       w.transaction_id = transaction_id;
+
+       data.dptr = (void *)&w;
+       data.dsize = sizeof(w);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
+               talloc_free(recdb);
+               return -1;
+       }
+       
+       /* push out the correct database. This sets the dmaster and skips 
+          the empty records */
+       ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
+       if (ret != 0) {
+               talloc_free(recdb);
+               return -1;
+       }
+
+       /* all done with this database */
+       talloc_free(recdb);
+
+       return 0;
+}
+
+/*
+  reload the nodes file 
+*/
+static void reload_nodes_file(struct ctdb_context *ctdb)
+{
+       ctdb->nodes = NULL;
+       ctdb_load_nodes_file(ctdb);
+}
+
+       
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
 static int do_recovery(struct ctdb_recoverd *rec, 
-                      TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
+                      TALLOC_CTX *mem_ctx, uint32_t pnn,
                       struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
-                      uint32_t culprit)
+                      int32_t culprit)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        int i, j, ret;
        uint32_t generation;
        struct ctdb_dbid_map *dbmap;
+       TDB_DATA data;
+       uint32_t *nodes;
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
-       DEBUG(0, (__location__ " Starting do_recovery\n"));
+       if (ctdb->num_nodes != nodemap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
+               reload_nodes_file(ctdb);
+               return -1;
+       }
 
        /* if recovery fails, force it again */
        rec->need_recovery = true;
 
-       ctdb_set_culprit(rec, culprit);
+       if (culprit != -1) {
+               ctdb_set_culprit(rec, culprit);
+       }
 
        if (rec->culprit_counter > 2*nodemap->num) {
-               DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
                         culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
                         ctdb->tunable.recovery_ban_period));
                ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
@@ -995,18 +1353,52 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        if (!ctdb_recovery_lock(ctdb, true)) {
                ctdb_set_culprit(rec, pnn);
-               DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
+               DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+
+       /* get a list of all databases */
+       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
                return -1;
        }
 
+       /* we do the db creation before we set the recovery mode, so the freeze happens
+          on all databases we will be dealing with. */
+
+       /* verify that we have all the databases any other node has */
+       ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
+               return -1;
+       }
+
+       /* verify that all other nodes have all our databases */
+       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
+
+
        /* set recovery mode to active on all nodes */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
-       if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return -1;
        }
 
-       DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+       /* execute the "startrecovery" event script on all nodes */
+       ret = run_startrecovery_eventscript(rec, nodemap);
+       if (ret!=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
+               return -1;
+       }
 
        /* pick a new generation number */
        generation = new_generation();
@@ -1024,155 +1416,156 @@ static int do_recovery(struct ctdb_recoverd *rec,
        vnnmap->generation = generation;
        ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-               return -1;
-       }
-
-       /* get a list of all databases */
-       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
                return -1;
        }
 
+       data.dptr = (void *)&generation;
+       data.dsize = sizeof(uint32_t);
 
-       /* verify that all other nodes have all our databases */
-       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
                return -1;
        }
 
-       /* verify that we have all the databases any other node has */
-       ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to create missing local databases\n"));
-               return -1;
-       }
+       DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
 
-       /* verify that all other nodes have all our databases */
-       ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
-               return -1;
-       }
-
-
-       DEBUG(0, (__location__ " Recovery - created remote databases\n"));
-
-       /* pull all remote databases onto the local node */
-       ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to pull remote databases\n"));
-               return -1;
+       for (i=0;i<dbmap->num;i++) {
+               if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
+                       return -1;
+               }
        }
 
-       DEBUG(0, (__location__ " Recovery - pulled remote databases\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
 
-       /* repoint all local database records to the local node as
-          being dmaster
-        */
-       ret = update_dmaster_on_our_databases(ctdb, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
+       /* commit all the changes */
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
                return -1;
        }
 
-       DEBUG(0, (__location__ " Recovery - updated dmaster on our databases\n"));
-
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
+       
 
-       /* push all local databases to the remote nodes */
-       ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to push local databases\n"));
+       /* update the capabilities for all nodes */
+       ret = update_capabilities(ctdb, nodemap);
+       if (ret!=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
                return -1;
        }
 
-       DEBUG(0, (__location__ " Recovery - pushed remote databases\n"));
-
        /* build a new vnn map with all the currently active and
           unbanned nodes */
        generation = new_generation();
        vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
        CTDB_NO_MEMORY(ctdb, vnnmap);
        vnnmap->generation = generation;
-       vnnmap->size = num_active;
+       vnnmap->size = 0;
        vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
+       CTDB_NO_MEMORY(ctdb, vnnmap->map);
        for (i=j=0;i<nodemap->num;i++) {
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                       vnnmap->map[j++] = nodemap->nodes[i].pnn;
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
                }
+               if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
+                       /* this node can not be an lmaster */
+                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
+                       continue;
+               }
+
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[j++] = nodemap->nodes[i].pnn;
+
        }
+       if (vnnmap->size == 0) {
+               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[0] = pnn;
+       }       
 
        /* update to the new vnnmap on all nodes */
        ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
                return -1;
        }
 
-       DEBUG(0, (__location__ " Recovery - updated vnnmap\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
 
        /* update recmaster to point to us for all nodes */
        ret = set_recovery_master(ctdb, nodemap, pnn);
        if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery master\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
                return -1;
        }
 
-       DEBUG(0, (__location__ " Recovery - updated recmaster\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
 
        /*
          update all nodes to have the same flags that we have
         */
-       ret = update_flags_on_all_nodes(ctdb, nodemap);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
-               return -1;
+       for (i=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                       return -1;
+               }
        }
-       
-       DEBUG(0, (__location__ " Recovery - updated flags\n"));
 
-       /*
-         run a vacuum operation on empty records
-        */
-       ret = vacuum_all_databases(ctdb, nodemap, dbmap);
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
+
+       /* disable recovery mode */
+       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
                return -1;
        }
 
-       DEBUG(0, (__location__ " Recovery - vacuumed all databases\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
 
        /*
-         if enabled, tell nodes to takeover their public IPs
+         tell nodes to takeover their public IPs
         */
-       if (ctdb->vnn) {
-               rec->need_takeover_run = false;
-               ret = ctdb_takeover_run(ctdb, nodemap);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
-                       return -1;
-               }
-               DEBUG(1, (__location__ " Recovery - done takeover\n"));
-       }
-
-       for (i=0;i<dbmap->num;i++) {
-               DEBUG(2,("Recovered database with db_id 0x%08x\n", dbmap->dbs[i].dbid));
+       rec->need_takeover_run = false;
+       ret = ctdb_takeover_run(ctdb, nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
+               return -1;
        }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
 
-       /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
+       /* execute the "recovered" event script on all nodes */
+       ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
        if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
                return -1;
        }
 
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
+
        /* send a message to all clients telling them that the cluster 
           has been reconfigured */
        ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
 
-       DEBUG(0, (__location__ " Recovery complete\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
 
        rec->need_recovery = false;
 
@@ -1180,9 +1573,9 @@ static int do_recovery(struct ctdb_recoverd *rec,
           We now wait for rerecovery_timeout before we allow 
           another recovery to take place.
        */
-       DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
        ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
-       DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
 
        return 0;
 }
@@ -1216,7 +1609,7 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
 
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
        if (ret != 0) {
-               DEBUG(0,(__location__ " unable to get election data\n"));
+               DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
                return;
        }
 
@@ -1225,6 +1618,13 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
                        em->num_connected++;
                }
        }
+
+       /* we shouldnt try to win this election if we cant be a recmaster */
+       if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
+               em->num_connected = 0;
+               em->priority_time = timeval_current();
+       }
+
        talloc_free(nodemap);
 }
 
@@ -1238,6 +1638,11 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
 
        ctdb_election_data(rec, &myem);
 
+       /* we cant win if we dont have the recmaster capability */
+       if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
+               return false;
+       }
+
        /* we cant win if we are banned */
        if (rec->node_flags & NODE_FLAGS_BANNED) {
                return false;
@@ -1268,7 +1673,7 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
 /*
   send out an election request
  */
-static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
+static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
 {
        int ret;
        TDB_DATA election_data;
@@ -1284,19 +1689,26 @@ static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
        election_data.dptr  = (unsigned char *)&emsg;
 
 
-       /* first we assume we will win the election and set 
-          recoverymaster to be ourself on the current node
-        */
-       ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
-       if (ret != 0) {
-               DEBUG(0, (__location__ " failed to send recmaster election request\n"));
-               return -1;
-       }
-
-
        /* send an election message to all active nodes */
        ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
 
+
+       /* A new node that is already frozen has entered the cluster.
+          The existing nodes are not frozen and dont need to be frozen
+          until the election has ended and we start the actual recovery
+       */
+       if (update_recmaster == true) {
+               /* first we assume we will win the election and set 
+                  recoverymaster to be ourself on the current node
+                */
+               ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
+                       return -1;
+               }
+       }
+
+
        return 0;
 }
 
@@ -1311,7 +1723,7 @@ static void unban_all_nodes(struct ctdb_context *ctdb)
        
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
        if (ret != 0) {
-               DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
+               DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
                return;
        }
 
@@ -1334,15 +1746,58 @@ static void election_send_request(struct event_context *ev, struct timed_event *
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
        int ret;
 
-       ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
+       ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
        if (ret != 0) {
-               DEBUG(0,("Failed to send election request!\n"));
+               DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
        }
 
        talloc_free(rec->send_election_te);
        rec->send_election_te = NULL;
 }
 
+/*
+  handler for memory dumps
+*/
+static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       TDB_DATA *dump;
+       int ret;
+       struct rd_memdump_reply *rd;
+
+       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+               DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       rd = (struct rd_memdump_reply *)data.dptr;
+
+       dump = talloc_zero(tmp_ctx, TDB_DATA);
+       if (dump == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       ret = ctdb_dump_memory(ctdb, dump);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));           
+
+       ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       talloc_free(tmp_ctx);
+}
+
 /*
   handler for recovery master elections
 */
@@ -1392,7 +1847,7 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
        /* ok, let that guy become recmaster then */
        ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
        if (ret != 0) {
-               DEBUG(0, (__location__ " failed to send recmaster election request"));
+               DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
                talloc_free(mem_ctx);
                return;
        }
@@ -1411,7 +1866,7 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
 /*
   force the start of the election process
  */
-static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
+static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
                           struct ctdb_node_map *nodemap)
 {
        int ret;
@@ -1419,8 +1874,8 @@ static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint3
 
        /* set all nodes to recovery mode to stop all internode traffic */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
-       if (ret!=0) {
-               DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
        }
 
@@ -1429,9 +1884,9 @@ static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint3
                                                timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
                                                ctdb_election_timeout, rec);
 
-       ret = send_election_request(rec, pnn);
+       ret = send_election_request(rec, pnn, true);
        if (ret!=0) {
-               DEBUG(0, (__location__ " failed to initiate recmaster election"));
+               DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
                return;
        }
 
@@ -1456,7 +1911,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
 
        if (data.dsize != sizeof(*c)) {
-               DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
+               DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
                return;
        }
 
@@ -1465,7 +1920,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
        if (ret != 0) {
-               DEBUG(0,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
+               DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
                talloc_free(tmp_ctx);
                return;         
        }
@@ -1476,24 +1931,15 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        }
 
        if (i == nodemap->num) {
-               DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
+               DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
                talloc_free(tmp_ctx);
                return;
        }
 
        changed_flags = c->old_flags ^ c->new_flags;
 
-       /* Dont let messages from remote nodes change the DISCONNECTED flag. 
-          This flag is handled locally based on whether the local node
-          can communicate with the node or not.
-       */
-       c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
-       if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
-               c->new_flags |= NODE_FLAGS_DISCONNECTED;
-       }
-
        if (nodemap->nodes[i].flags != c->new_flags) {
-               DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
+               DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
        nodemap->nodes[i].flags = c->new_flags;
@@ -1508,8 +1954,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        
        if (ret == 0 &&
            ctdb->recovery_master == ctdb->pnn &&
-           ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
-           ctdb->vnn) {
+           ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
                /* Only do the takeover run if the perm disabled or unhealthy
                   flags changed since these will cause an ip failover but not
                   a recovery.
@@ -1525,6 +1970,20 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        talloc_free(tmp_ctx);
 }
 
+/*
+  handler for when we need to push out flag changes ot all other nodes
+*/
+static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                           TDB_DATA data, void *private_data)
+{
+       int ret;
+       struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+
+       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+       }
+}
 
 
 struct verify_recmode_normal_data {
@@ -1554,7 +2013,7 @@ static void verify_recmode_normal_callback(struct ctdb_client_control_state *sta
           status field
        */
        if (state->status != CTDB_RECOVERY_NORMAL) {
-               DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
                rmdata->status = MONITOR_RECOVERY_NEEDED;
        }
 
@@ -1589,7 +2048,7 @@ static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb
                        /* we failed to send the control, treat this as 
                           an error and try again next iteration
                        */                      
-                       DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
+                       DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
                        talloc_free(mem_ctx);
                        return MONITOR_FAILED;
                }
@@ -1617,6 +2076,7 @@ static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb
 
 
 struct verify_recmaster_data {
+       struct ctdb_recoverd *rec;
        uint32_t count;
        uint32_t pnn;
        enum monitor_result status;
@@ -1644,7 +2104,8 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
           status field
        */
        if (state->status != rmdata->pnn) {
-               DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
+               ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
                rmdata->status = MONITOR_ELECTION_NEEDED;
        }
 
@@ -1653,8 +2114,9 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
 
 
 /* verify that all nodes agree that we are the recmaster */
-static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
+static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
 {
+       struct ctdb_context *ctdb = rec->ctdb;
        struct verify_recmaster_data *rmdata;
        TALLOC_CTX *mem_ctx = talloc_new(ctdb);
        struct ctdb_client_control_state *state;
@@ -1663,6 +2125,7 @@ static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ct
        
        rmdata = talloc(mem_ctx, struct verify_recmaster_data);
        CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
+       rmdata->rec    = rec;
        rmdata->count  = 0;
        rmdata->pnn    = pnn;
        rmdata->status = MONITOR_OK;
@@ -1680,7 +2143,7 @@ static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ct
                        /* we failed to send the control, treat this as 
                           an error and try again next iteration
                        */                      
-                       DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
+                       DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
                        talloc_free(mem_ctx);
                        return MONITOR_FAILED;
                }
@@ -1707,23 +2170,171 @@ static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ct
 }
 
 
+/* called to check that the allocation of public ip addresses is ok.
+*/
+static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
+{
+       TALLOC_CTX *mem_ctx = talloc_new(NULL);
+       struct ctdb_all_public_ips *ips = NULL;
+       struct ctdb_uptime *uptime1 = NULL;
+       struct ctdb_uptime *uptime2 = NULL;
+       int ret, j;
+
+       ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
+                               CTDB_CURRENT_NODE, &uptime1);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
+
+       /* read the ip allocation from the local node */
+       ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
+
+       ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
+                               CTDB_CURRENT_NODE, &uptime2);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
+
+       /* skip the check if the startrecovery time has changed */
+       if (timeval_compare(&uptime1->last_recovery_started,
+                           &uptime2->last_recovery_started) != 0) {
+               DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
+               talloc_free(mem_ctx);
+               return 0;
+       }
+
+       /* skip the check if the endrecovery time has changed */
+       if (timeval_compare(&uptime1->last_recovery_finished,
+                           &uptime2->last_recovery_finished) != 0) {
+               DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
+               talloc_free(mem_ctx);
+               return 0;
+       }
+
+       /* skip the check if we have started but not finished recovery */
+       if (timeval_compare(&uptime1->last_recovery_finished,
+                           &uptime1->last_recovery_started) != 1) {
+               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
+               talloc_free(mem_ctx);
+
+               return 0;
+       }
+
+       /* verify that we have the ip addresses we should have
+          and we dont have ones we shouldnt have.
+          if we find an inconsistency we set recmode to
+          active on the local node and wait for the recmaster
+          to do a full blown recovery
+       */
+       for (j=0; j<ips->num; j++) {
+               if (ips->ips[j].pnn == pnn) {
+                       if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                               DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
+                                       ctdb_addr_to_str(&ips->ips[j].addr)));
+                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                       }
+               } else {
+                       if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                               DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
+                                       ctdb_addr_to_str(&ips->ips[j].addr)));
+
+                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                       }
+               }
+       }
+
+       talloc_free(mem_ctx);
+       return 0;
+}
+
+
+static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_node_map **remote_nodemaps = callback_data;
+
+       if (node_pnn >= ctdb->num_nodes) {
+               DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
+               return;
+       }
+
+       remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
+
+}
+
+static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
+       struct ctdb_node_map *nodemap,
+       struct ctdb_node_map **remote_nodemaps)
+{
+       uint32_t *nodes;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       async_getnodemap_callback,
+                                       NULL,
+                                       remote_nodemaps) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
+
+               return -1;
+       }
+
+       return 0;
+}
+
 /*
   the main monitoring loop
  */
 static void monitor_cluster(struct ctdb_context *ctdb)
 {
-       uint32_t pnn, num_active, recmaster;
+       uint32_t pnn;
        TALLOC_CTX *mem_ctx=NULL;
        struct ctdb_node_map *nodemap=NULL;
-       struct ctdb_node_map *remote_nodemap=NULL;
+       struct ctdb_node_map *recmaster_nodemap=NULL;
+       struct ctdb_node_map **remote_nodemaps=NULL;
        struct ctdb_vnn_map *vnnmap=NULL;
        struct ctdb_vnn_map *remote_vnnmap=NULL;
+       int32_t debug_level;
        int i, j, ret;
        struct ctdb_recoverd *rec;
-       struct ctdb_all_public_ips *ips;
        char c;
 
-       DEBUG(0,("monitor_cluster starting\n"));
+       DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
 
        rec = talloc_zero(ctdb, struct ctdb_recoverd);
        CTDB_NO_MEMORY_FATAL(ctdb, rec);
@@ -1734,18 +2345,27 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        rec->priority_time = timeval_current();
 
+       /* register a message port for sending memory dumps */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
+
        /* register a message port for recovery elections */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
 
-       /* and one for when nodes are disabled/enabled */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
+       /* when nodes are disabled/enabled */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
+
+       /* when we are asked to puch out a flag change */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
 
-       /* and one for when nodes are banned */
+       /* when nodes are banned */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
 
        /* and one for when nodes are unbanned */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
-       
+
+       /* register a message port for vacuum fetch */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
+
 again:
        if (mem_ctx) {
                talloc_free(mem_ctx);
@@ -1753,25 +2373,42 @@ again:
        }
        mem_ctx = talloc_new(ctdb);
        if (!mem_ctx) {
-               DEBUG(0,("Failed to create temporary context\n"));
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
                exit(-1);
        }
 
        /* we only check for recovery once every second */
        ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
 
+       /* verify that the main daemon is still running */
+       if (kill(ctdb->ctdbd_pid, 0) != 0) {
+               DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
+               exit(-1);
+       }
+
+       /* ping the local daemon to tell it we are alive */
+       ctdb_ctrl_recd_ping(ctdb);
+
        if (rec->election_timeout) {
                /* an election is in progress */
                goto again;
        }
 
+       /* read the debug level from the parent and update locally */
+       ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
+       if (ret !=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
+               goto again;
+       }
+       LogLevel = debug_level;
+
 
        /* We must check if we need to ban a node here but we want to do this
           as early as possible so we dont wait until we have pulled the node
           map from the local node. thats why we have the hardcoded value 20
        */
        if (rec->culprit_counter > 20) {
-               DEBUG(0,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
                         rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
                         ctdb->tunable.recovery_ban_period));
                ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
@@ -1780,41 +2417,47 @@ again:
        /* get relevant tunables */
        ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
        if (ret != 0) {
-               DEBUG(0,("Failed to get tunables - retrying\n"));
+               DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
                goto again;
        }
 
        pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
        if (pnn == (uint32_t)-1) {
-               DEBUG(0,("Failed to get local pnn - retrying\n"));
+               DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
                goto again;
        }
 
        /* get the vnnmap */
        ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
                goto again;
        }
 
 
        /* get number of nodes */
-       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
+       if (rec->nodemap) {
+               talloc_free(rec->nodemap);
+               rec->nodemap = NULL;
+               nodemap=NULL;
+       }
+       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
                goto again;
        }
+       nodemap = rec->nodemap;
 
        /* check which node is the recovery master */
-       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
+       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
                goto again;
        }
 
-       if (recmaster == (uint32_t)-1) {
-               DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
-               force_election(rec, mem_ctx, pnn, nodemap);
+       if (rec->recmaster == (uint32_t)-1) {
+               DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
+               force_election(rec, pnn, nodemap);
                goto again;
        }
        
@@ -1823,12 +2466,12 @@ again:
        */
        if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
                if (rec->banned_nodes[pnn] == NULL) {
-                       if (recmaster == pnn) {
-                               DEBUG(0,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
+                       if (rec->recmaster == pnn) {
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
 
                                ctdb_unban_node(rec, pnn);
                        } else {
-                               DEBUG(0,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
                                ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
                                ctdb_set_culprit(rec, pnn);
                        }
@@ -1836,12 +2479,12 @@ again:
                }
        } else {
                if (rec->banned_nodes[pnn] != NULL) {
-                       if (recmaster == pnn) {
-                               DEBUG(0,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
+                       if (rec->recmaster == pnn) {
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
 
                                ctdb_unban_node(rec, pnn);
                        } else {
-                               DEBUG(0,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
+                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
 
                                ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
                                ctdb_set_culprit(rec, pnn);
@@ -1854,100 +2497,70 @@ again:
        rec->node_flags = nodemap->nodes[pnn].flags;
 
        /* count how many active nodes there are */
-       num_active = 0;
+       rec->num_active    = 0;
+       rec->num_connected = 0;
        for (i=0; i<nodemap->num; i++) {
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                       num_active++;
+                       rec->num_active++;
+               }
+               if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
+                       rec->num_connected++;
                }
        }
 
 
        /* verify that the recmaster node is still active */
        for (j=0; j<nodemap->num; j++) {
-               if (nodemap->nodes[j].pnn==recmaster) {
+               if (nodemap->nodes[j].pnn==rec->recmaster) {
                        break;
                }
        }
 
        if (j == nodemap->num) {
-               DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
+               force_election(rec, pnn, nodemap);
                goto again;
        }
 
        /* if recovery master is disconnected we must elect a new recmaster */
        if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
-               DEBUG(0, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
+               force_election(rec, pnn, nodemap);
                goto again;
        }
 
        /* grap the nodemap from the recovery master to check if it is banned */
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                  mem_ctx, &remote_nodemap);
+                                  mem_ctx, &recmaster_nodemap);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Unable to get nodemap from recovery master %u\n", 
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
                          nodemap->nodes[j].pnn));
                goto again;
        }
 
 
-       if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-               DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
-               force_election(rec, mem_ctx, pnn, nodemap);
+       if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+               DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
+               force_election(rec, pnn, nodemap);
                goto again;
        }
 
-       /* verify that the public ip address allocation is consistent */
-       if (ctdb->vnn != NULL) {
-               ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
-               if (ret != 0) {
-                       DEBUG(0, ("Unable to get public ips from node %u\n", i));
+
+       /* verify that we have all ip addresses we should have and we dont
+        * have addresses we shouldnt have.
+        */ 
+       if (ctdb->do_checkpublicip) {
+               if (verify_ip_allocation(ctdb, pnn) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
                        goto again;
                }
-               for (j=0; j<ips->num; j++) {
-                       /* verify that we have the ip addresses we should have
-                          and we dont have ones we shouldnt have.
-                          if we find an inconsistency we set recmode to
-                          active on the local node and wait for the recmaster
-                          to do a full blown recovery
-                       */
-                       if (ips->ips[j].pnn == pnn) {
-                               if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
-                                       DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
-                                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                                       if (ret != 0) {
-                                               DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                                       if (ret != 0) {
-                                               DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                               }
-                       } else {
-                               if (ctdb_sys_have_ip(ips->ips[j].sin)) {
-                                       DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
-                                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                                       if (ret != 0) {
-                                               DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                                       if (ret != 0) {
-                                               DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                               }
-                       }
-               }
        }
 
+
        /* if we are not the recmaster then we do not need to check
           if recovery is needed
         */
-       if (pnn != recmaster) {
+       if (pnn != rec->recmaster) {
                goto again;
        }
 
@@ -1955,18 +2568,23 @@ again:
        /* ensure our local copies of flags are right */
        ret = update_local_flags(rec, nodemap);
        if (ret == MONITOR_ELECTION_NEEDED) {
-               DEBUG(0,("update_local_flags() called for a re-election.\n"));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
+               force_election(rec, pnn, nodemap);
                goto again;
        }
        if (ret != MONITOR_OK) {
-               DEBUG(0,("Unable to update local flags\n"));
+               DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
                goto again;
        }
 
        /* update the list of public ips that a node can handle for
           all connected nodes
        */
+       if (ctdb->num_nodes != nodemap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
+               reload_nodes_file(ctdb);
+               goto again;
+       }
        for (j=0; j<nodemap->num; j++) {
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
@@ -1981,7 +2599,7 @@ again:
                        ctdb->nodes[j]->pnn, 
                        ctdb->nodes,
                        &ctdb->nodes[j]->public_ips)) {
-                       DEBUG(0,("Failed to read public ips from node : %u\n", 
+                       DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
                                ctdb->nodes[j]->pnn));
                        goto again;
                }
@@ -1989,12 +2607,12 @@ again:
 
 
        /* verify that all active nodes agree that we are the recmaster */
-       switch (verify_recmaster(ctdb, nodemap, pnn)) {
+       switch (verify_recmaster(rec, nodemap, pnn)) {
        case MONITOR_RECOVERY_NEEDED:
                /* can not happen */
                goto again;
        case MONITOR_ELECTION_NEEDED:
-               force_election(rec, mem_ctx, pnn, nodemap);
+               force_election(rec, pnn, nodemap);
                goto again;
        case MONITOR_OK:
                break;
@@ -2005,7 +2623,7 @@ again:
 
        if (rec->need_recovery) {
                /* a previous recovery didn't finish */
-               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
                goto again;             
        }
 
@@ -2014,7 +2632,7 @@ again:
         */
        switch (verify_recmode(ctdb, nodemap)) {
        case MONITOR_RECOVERY_NEEDED:
-               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
                goto again;
        case MONITOR_FAILED:
                goto again;
@@ -2027,45 +2645,56 @@ again:
 
        /* we should have the reclock - check its not stale */
        if (ctdb->recovery_lock_fd == -1) {
-               DEBUG(0,("recovery master doesn't have the recovery lock\n"));
-               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+               DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
                goto again;
        }
 
-       if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
-               DEBUG(0,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
+       if (pread(ctdb->recovery_lock_fd, &c, 1, 0) == -1) {
+               DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
                close(ctdb->recovery_lock_fd);
                ctdb->recovery_lock_fd = -1;
-               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
                goto again;
        }
 
-       /* get the nodemap for all active remote nodes and verify
-          they are the same as for this node
+
+       /* get the nodemap for all active remote nodes
         */
+       remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
+       if (remote_nodemaps == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
+               goto again;
+       }
+       for(i=0; i<nodemap->num; i++) {
+               remote_nodemaps[i] = NULL;
+       }
+       if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
+               goto again;
+       } 
+
+       /* verify that all other nodes have the same nodemap as we have
+       */
        for (j=0; j<nodemap->num; j++) {
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (nodemap->nodes[j].pnn == pnn) {
-                       continue;
-               }
 
-               ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                          mem_ctx, &remote_nodemap);
-               if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
-                                 nodemap->nodes[j].pnn));
+               if (remote_nodemaps[j] == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
+                       ctdb_set_culprit(rec, j);
+
                        goto again;
                }
 
-               /* if the nodes disagree on how many nodes there are
+               /* if the nodes disagree on how many nodes there are
                   then this is a good reason to try recovery
                 */
-               if (remote_nodemap->num != nodemap->num) {
-                       DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
-                                 nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
-                       do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
+               if (remote_nodemaps[j]->num != nodemap->num) {
+                       DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
+                                 nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
                }
 
@@ -2073,35 +2702,54 @@ again:
                   active, then that is also a good reason to do recovery
                 */
                for (i=0;i<nodemap->num;i++) {
-                       if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
-                               DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
+                       if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
                                          nodemap->nodes[j].pnn, i, 
-                                         remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
-                               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
-                               goto again;
-                       }
-                       if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
-                           (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                               DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
-                                         nodemap->nodes[j].pnn, i,
-                                         remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
-                               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
+                                         remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
+                               do_recovery(rec, mem_ctx, pnn, nodemap, 
                                            vnnmap, nodemap->nodes[j].pnn);
                                goto again;
                        }
                }
 
+               /* verify the flags are consistent
+               */
+               for (i=0; i<nodemap->num; i++) {
+                       if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                               continue;
+                       }
+                       
+                       if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
+                                 nodemap->nodes[j].pnn, 
+                                 nodemap->nodes[i].pnn, 
+                                 remote_nodemaps[j]->nodes[i].flags,
+                                 nodemap->nodes[j].flags));
+                               if (i == j) {
+                                       DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
+                                       update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
+                                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                                   vnnmap, nodemap->nodes[j].pnn);
+                                       goto again;
+                               } else {
+                                       DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
+                                       update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
+                                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                                   vnnmap, nodemap->nodes[j].pnn);
+                                       goto again;
+                               }
+                       }
+               }
        }
 
 
        /* there better be the same number of lmasters in the vnn map
           as there are active nodes or we will have to do a recovery
         */
-       if (vnnmap->size != num_active) {
-               DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
-                         vnnmap->size, num_active));
-               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
+       if (vnnmap->size != rec->num_active) {
+               DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
+                         vnnmap->size, rec->num_active));
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
                goto again;
        }
 
@@ -2122,9 +2770,9 @@ again:
                        }
                }
                if (i == vnnmap->size) {
-                       DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
                                  nodemap->nodes[j].pnn));
-                       do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
                }
        }
@@ -2144,33 +2792,33 @@ again:
                ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                          mem_ctx, &remote_vnnmap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
                                  nodemap->nodes[j].pnn));
                        goto again;
                }
 
                /* verify the vnnmap generation is the same */
                if (vnnmap->generation != remote_vnnmap->generation) {
-                       DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
-                       do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
                }
 
                /* verify the vnnmap size is the same */
                if (vnnmap->size != remote_vnnmap->size) {
-                       DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
+                       DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
-                       do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
                }
 
                /* verify the vnnmap is the same */
                for (i=0;i<vnnmap->size;i++) {
                        if (remote_vnnmap->map[i] != vnnmap->map[i]) {
-                               DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
                                          nodemap->nodes[j].pnn));
-                               do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
+                               do_recovery(rec, mem_ctx, pnn, nodemap, 
                                            vnnmap, nodemap->nodes[j].pnn);
                                goto again;
                        }
@@ -2180,14 +2828,38 @@ again:
        /* we might need to change who has what IP assigned */
        if (rec->need_takeover_run) {
                rec->need_takeover_run = false;
+
+               /* execute the "startrecovery" event script on all nodes */
+               ret = run_startrecovery_eventscript(rec, nodemap);
+               if (ret!=0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
+                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                   vnnmap, ctdb->pnn);
+               }
+
                ret = ctdb_takeover_run(ctdb, nodemap);
                if (ret != 0) {
-                       DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
-                       do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
+                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                   vnnmap, ctdb->pnn);
+               }
+
+               /* execute the "recovered" event script on all nodes */
+               ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
+#if 0
+// we cant check whether the event completed successfully
+// since this script WILL fail if the node is in recovery mode
+// and if that race happens, the code here would just cause a second
+// cascading recovery.
+               if (ret!=0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
+                       do_recovery(rec, mem_ctx, pnn, nodemap, 
                                    vnnmap, ctdb->pnn);
                }
+#endif
        }
 
+
        goto again;
 
 }
@@ -2198,10 +2870,59 @@ again:
 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
                                 uint16_t flags, void *private_data)
 {
-       DEBUG(0,("recovery daemon parent died - exiting\n"));
+       DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
        _exit(1);
 }
 
+/*
+  called regularly to verify that the recovery daemon is still running
+ */
+static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
+                             struct timeval yt, void *p)
+{
+       struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+
+       if (kill(ctdb->recoverd_pid, 0) != 0) {
+               DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
+
+               ctdb_stop_recoverd(ctdb);
+               ctdb_stop_keepalive(ctdb);
+               ctdb_stop_monitoring(ctdb);
+               ctdb_release_all_ips(ctdb);
+               if (ctdb->methods != NULL) {
+                       ctdb->methods->shutdown(ctdb);
+               }
+               ctdb_event_script(ctdb, "shutdown");
+
+               exit(10);       
+       }
+
+       event_add_timed(ctdb->ev, ctdb, 
+                       timeval_current_ofs(30, 0),
+                       ctdb_check_recd, ctdb);
+}
+
+static void recd_sig_child_handler(struct event_context *ev,
+       struct signal_event *se, int signum, int count,
+       void *dont_care, 
+       void *private_data)
+{
+//     struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+       int status;
+       pid_t pid = -1;
+
+       while (pid != 0) {
+               pid = waitpid(-1, &status, WNOHANG);
+               if (pid == -1) {
+                       DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
+                       return;
+               }
+               if (pid > 0) {
+                       DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
+               }
+       }
+}
+
 /*
   startup the recovery daemon as a child of the main ctdb daemon
  */
@@ -2209,11 +2930,14 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 {
        int ret;
        int fd[2];
+       struct signal_event *se;
 
        if (pipe(fd) != 0) {
                return -1;
        }
 
+       ctdb->ctdbd_pid = getpid();
+
        ctdb->recoverd_pid = fork();
        if (ctdb->recoverd_pid == -1) {
                return -1;
@@ -2221,13 +2945,18 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
        
        if (ctdb->recoverd_pid != 0) {
                close(fd[0]);
+               event_add_timed(ctdb->ev, ctdb, 
+                               timeval_current_ofs(30, 0),
+                               ctdb_check_recd, ctdb);
                return 0;
        }
 
        close(fd[1]);
 
        /* shutdown the transport */
-       ctdb->methods->shutdown(ctdb);
+       if (ctdb->methods) {
+               ctdb->methods->shutdown(ctdb);
+       }
 
        /* get a new event context */
        talloc_free(ctdb->ev);
@@ -2241,16 +2970,31 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        srandom(getpid() ^ time(NULL));
 
+       /* the recovery daemon does not need to be realtime */
+       if (ctdb->do_setsched) {
+               ctdb_restore_scheduler(ctdb);
+       }
+
        /* initialise ctdb */
        ret = ctdb_socket_connect(ctdb);
        if (ret != 0) {
-               DEBUG(0, (__location__ " Failed to init ctdb\n"));
+               DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
+               exit(1);
+       }
+
+       /* set up a handler to pick up sigchld */
+       se = event_add_signal(ctdb->ev, ctdb,
+                                    SIGCHLD, 0,
+                                    recd_sig_child_handler,
+                                    ctdb);
+       if (se == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
                exit(1);
        }
 
        monitor_cluster(ctdb);
 
-       DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
+       DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
        return -1;
 }
 
@@ -2263,6 +3007,6 @@ void ctdb_stop_recoverd(struct ctdb_context *ctdb)
                return;
        }
 
-       DEBUG(0,("Shutting down recovery daemon\n"));
+       DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
        kill(ctdb->recoverd_pid, SIGTERM);
 }