tweak some timeouts so that we do trigger a banning even if the control hangs/timesout
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
index 8031dbe3ecf3a821bf9ab4327ccbbb504087d8f8..c6d0a7a5f4a22725c875c0475f7bf95db755d739 100644 (file)
@@ -41,8 +41,10 @@ struct ban_state {
  */
 struct ctdb_recoverd {
        struct ctdb_context *ctdb;
-       int rec_file_fd;
+       uint32_t recmaster;
        uint32_t num_active;
+       uint32_t num_connected;
+       struct ctdb_node_map *nodemap;
        uint32_t last_culprit;
        uint32_t culprit_counter;
        struct timeval first_recover_time;
@@ -200,17 +202,22 @@ enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEED
 /*
   run the "recovered" eventscript on all nodes
  */
-static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
 {
        TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
 
        tmp_ctx = talloc_new(ctdb);
        CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, tdb_null) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event. Recovery failed.\n"));
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
+
                talloc_free(tmp_ctx);
                return -1;
        }
@@ -219,19 +226,73 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
        return 0;
 }
 
+/*
+  remember the trouble maker
+ */
+static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+
+       if (rec->last_culprit != culprit ||
+           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
+               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
+               /* either a new node is the culprit, or we've decided to forgive them */
+               rec->last_culprit = culprit;
+               rec->first_recover_time = timeval_current();
+               rec->culprit_counter = 0;
+       }
+       rec->culprit_counter++;
+}
+
+/*
+  remember the trouble maker
+ */
+static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+
+       if (rec->last_culprit != culprit ||
+           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
+               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
+               /* either a new node is the culprit, or we've decided to forgive them */
+               rec->last_culprit = culprit;
+               rec->first_recover_time = timeval_current();
+               rec->culprit_counter = 0;
+       }
+       rec->culprit_counter += count;
+}
+
+/* this callback is called for every node that failed to execute the
+   start recovery event
+*/
+static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
+
+       ctdb_set_culprit(rec, node_pnn);
+}
+
 /*
   run the "startrecovery" eventscript on all nodes
  */
-static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
 {
        TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
+       struct ctdb_context *ctdb = rec->ctdb;
 
        tmp_ctx = talloc_new(ctdb);
        CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
-                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, tdb_null) != 0) {
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL,
+                                       startrecovery_fail_callback,
+                                       rec) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
                talloc_free(tmp_ctx);
                return -1;
@@ -241,6 +302,43 @@ static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_
        return 0;
 }
 
+static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
+               DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
+               return;
+       }
+       if (node_pnn < ctdb->num_nodes) {
+               ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
+       }
+}
+
+/*
+  update the node capabilities for all connected nodes
+ */
+static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+{
+       uint32_t *nodes;
+       TALLOC_CTX *tmp_ctx;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
+                                       nodes, CONTROL_TIMEOUT(),
+                                       false, tdb_null,
+                                       async_getcap_callback, NULL,
+                                       NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
 /*
   change recovery mode on all nodes
  */
@@ -253,13 +351,14 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        tmp_ctx = talloc_new(ctdb);
        CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-
        /* freeze all nodes */
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (rec_mode == CTDB_RECOVERY_ACTIVE) {
                if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
                                                nodes, CONTROL_TIMEOUT(),
-                                               false, tdb_null) != 0) {
+                                               false, tdb_null,
+                                               NULL, NULL,
+                                               NULL) != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
                        talloc_free(tmp_ctx);
                        return -1;
@@ -272,22 +371,14 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
                                        nodes, CONTROL_TIMEOUT(),
-                                       false, data) != 0) {
+                                       false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       if (rec_mode == CTDB_RECOVERY_NORMAL) {
-               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
-                                               nodes, CONTROL_TIMEOUT(),
-                                               false, tdb_null) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-       }
-
        talloc_free(tmp_ctx);
        return 0;
 }
@@ -299,6 +390,7 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
 {
        TDB_DATA data;
        TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
 
        tmp_ctx = talloc_new(ctdb);
        CTDB_NO_MEMORY(ctdb, tmp_ctx);
@@ -306,9 +398,12 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
        data.dsize = sizeof(uint32_t);
        data.dptr = (unsigned char *)&pnn;
 
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
-                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
                talloc_free(tmp_ctx);
                return -1;
@@ -456,7 +551,7 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
 {
        int ret;
        TDB_DATA outdata;
-       struct ctdb_control_pulldb_reply *reply;
+       struct ctdb_marshall_buffer *reply;
        struct ctdb_rec_data *rec;
        int i;
        TALLOC_CTX *tmp_ctx = talloc_new(recdb);
@@ -469,9 +564,9 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
                return -1;
        }
 
-       reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
+       reply = (struct ctdb_marshall_buffer *)outdata.dptr;
 
-       if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
+       if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
                DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
                talloc_free(tmp_ctx);
                return -1;
@@ -534,7 +629,9 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
 /*
   pull all the remote database contents into the recdb
  */
-static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+static int pull_remote_database(struct ctdb_context *ctdb,
+                               struct ctdb_recoverd *rec, 
+                               struct ctdb_node_map *nodemap, 
                                struct tdb_wrap *recdb, uint32_t dbid)
 {
        int j;
@@ -550,6 +647,7 @@ static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map
                if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
                                 nodemap->nodes[j].pnn));
+                       ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
                        return -1;
                }
        }
@@ -561,28 +659,19 @@ static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map
 /*
   update flags on all active nodes
  */
-static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
 {
-       int i;
-       for (i=0;i<nodemap->num;i++) {
-               struct ctdb_node_flag_change c;
-               TDB_DATA data;
-
-               c.pnn = nodemap->nodes[i].pnn;
-               c.old_flags = nodemap->nodes[i].flags;
-               c.new_flags = nodemap->nodes[i].flags;
-
-               data.dptr = (uint8_t *)&c;
-               data.dsize = sizeof(c);
-
-               ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
-                                 CTDB_SRVID_NODE_FLAGS_CHANGED, data);
+       int ret;
 
+       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
+               if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+               return -1;
        }
+
        return 0;
 }
 
-
 /*
   ensure all nodes have the same vnnmap we do
  */
@@ -670,7 +759,7 @@ struct vacuum_info {
        struct ctdb_recoverd *rec;
        uint32_t srcnode;
        struct ctdb_db_context *ctdb_db;
-       struct ctdb_control_pulldb_reply *recs;
+       struct ctdb_marshall_buffer *recs;
        struct ctdb_rec_data *r;
 };
 
@@ -681,7 +770,7 @@ static void vacuum_fetch_next(struct vacuum_info *v);
  */
 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
 {
-       struct vacuum_info *v = talloc_get_type(state->async.private, struct vacuum_info);
+       struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
        talloc_free(state);
        vacuum_fetch_next(v);
 }
@@ -747,7 +836,7 @@ static void vacuum_fetch_next(struct vacuum_info *v)
                        return;
                }
                state->async.fn = vacuum_fetch_callback;
-               state->async.private = v;
+               state->async.private_data = v;
                return;
        }
 
@@ -772,7 +861,7 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
                                 TDB_DATA data, void *private_data)
 {
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       struct ctdb_control_pulldb_reply *recs;
+       struct ctdb_marshall_buffer *recs;
        int ret, i;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        const char *name;
@@ -783,10 +872,11 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
        uint32_t srcnode;
        struct vacuum_info *v;
 
-       recs = (struct ctdb_control_pulldb_reply *)data.dptr;
+       recs = (struct ctdb_marshall_buffer *)data.dptr;
        r = (struct ctdb_rec_data *)&recs->data[0];
 
        if (recs->count == 0) {
+               talloc_free(tmp_ctx);
                return;
        }
 
@@ -795,6 +885,7 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
        for (v=rec->vacuum_info;v;v=v->next) {
                if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
                        /* we're already working on records from this node */
+                       talloc_free(tmp_ctx);
                        return;
                }
        }
@@ -827,7 +918,7 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
        }
 
        /* attach to it */
-       ctdb_db = ctdb_attach(ctdb, name, persistent);
+       ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
                talloc_free(tmp_ctx);
@@ -837,6 +928,7 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
        v = talloc_zero(rec, struct vacuum_info);
        if (v == NULL) {
                DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
+               talloc_free(tmp_ctx);
                return;
        }
 
@@ -847,6 +939,7 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
        if (v->recs == NULL) {
                DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
                talloc_free(v);
+               talloc_free(tmp_ctx);
                return;         
        }
        v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
@@ -856,6 +949,7 @@ static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
        talloc_set_destructor(v, vacuum_info_destructor);
 
        vacuum_fetch_next(v);
+       talloc_free(tmp_ctx);
 }
 
 
@@ -904,24 +998,6 @@ static void ctdb_wait_election(struct ctdb_recoverd *rec)
        }
 }
 
-/*
-  remember the trouble maker
- */
-static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
-{
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
-               /* either a new node is the culprit, or we've decided to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
-       }
-       rec->culprit_counter++;
-}
-
 /*
   Update our local flags from all remote connected nodes. 
   This is only run when we are or we belive we are the recovery master
@@ -956,8 +1032,14 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                        return MONITOR_FAILED;
                }
                if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
-                       struct ctdb_node_flag_change c;
-                       TDB_DATA data;
+                       int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
+
+                       if (ban_changed) {
+                               DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
+                               nodemap->nodes[j].pnn,
+                               remote_nodemap->nodes[j].flags,
+                               nodemap->nodes[j].flags));
+                       }
 
                        /* We should tell our daemon about this so it
                           updates its flags or else we will log the same 
@@ -965,16 +1047,11 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                           Since we are the recovery master we can just as
                           well update the flags on all nodes.
                        */
-                       c.pnn = nodemap->nodes[j].pnn;
-                       c.old_flags = nodemap->nodes[j].flags;
-                       c.new_flags = remote_nodemap->nodes[j].flags;
-
-                       data.dptr = (uint8_t *)&c;
-                       data.dsize = sizeof(c);
-
-                       ctdb_send_message(ctdb, ctdb->pnn,
-                                       CTDB_SRVID_NODE_FLAGS_CHANGED, 
-                                       data);
+                       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+                               return -1;
+                       }
 
                        /* Update our local copy of the flags in the recovery
                           daemon.
@@ -987,10 +1064,7 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                        /* If the BANNED flag has changed for the node
                           this is a good reason to do a new election.
                         */
-                       if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
-                               DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
-                                nodemap->nodes[j].pnn, c.new_flags,
-                                c.old_flags));
+                       if (ban_changed) {
                                talloc_free(mem_ctx);
                                return MONITOR_ELECTION_NEEDED;
                        }
@@ -1029,6 +1103,7 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
 {
        char *name;
        struct tdb_wrap *recdb;
+       unsigned tdb_flags;
 
        /* open up the temporary recovery database */
        name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
@@ -1036,8 +1111,14 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
                return NULL;
        }
        unlink(name);
+
+       tdb_flags = TDB_NOLOCK;
+       if (!ctdb->do_setsched) {
+               tdb_flags |= TDB_NOMMAP;
+       }
+
        recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
-                             TDB_NOLOCK, O_RDWR|O_CREAT|O_EXCL, 0600);
+                             tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
        if (recdb == NULL) {
                DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
        }
@@ -1053,7 +1134,7 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
  */
 struct recdb_data {
        struct ctdb_context *ctdb;
-       struct ctdb_control_pulldb_reply *recdata;
+       struct ctdb_marshall_buffer *recdata;
        uint32_t len;
        bool failed;
 };
@@ -1101,21 +1182,22 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
                               struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
 {
        struct recdb_data params;
-       struct ctdb_control_pulldb_reply *recdata;
+       struct ctdb_marshall_buffer *recdata;
        TDB_DATA outdata;
        TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
 
        tmp_ctx = talloc_new(ctdb);
        CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
-       recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
+       recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
        CTDB_NO_MEMORY(ctdb, recdata);
 
        recdata->db_id = dbid;
 
        params.ctdb = ctdb;
        params.recdata = recdata;
-       params.len = offsetof(struct ctdb_control_pulldb_reply, data);
+       params.len = offsetof(struct ctdb_marshall_buffer, data);
        params.failed = false;
 
        if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
@@ -1137,9 +1219,12 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
        outdata.dptr = (void *)recdata;
        outdata.dsize = params.len;
 
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
-                       list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, outdata) != 0) {
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, outdata,
+                                       NULL, NULL,
+                                       NULL) != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
                talloc_free(recdata);
                talloc_free(tmp_ctx);
@@ -1171,6 +1256,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        struct ctdb_context *ctdb = rec->ctdb;
        TDB_DATA data;
        struct ctdb_control_wipe_database w;
+       uint32_t *nodes;
 
        recdb = create_recdb(ctdb, mem_ctx);
        if (recdb == NULL) {
@@ -1178,7 +1264,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        }
 
        /* pull all remote databases onto the recdb */
-       ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
+       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
                return -1;
@@ -1193,9 +1279,12 @@ static int recover_database(struct ctdb_recoverd *rec,
        data.dptr = (void *)&w;
        data.dsize = sizeof(w);
 
+       nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
-                       list_of_active_nodes(ctdb, nodemap, recdb, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
                talloc_free(recdb);
                return -1;
@@ -1215,33 +1304,51 @@ static int recover_database(struct ctdb_recoverd *rec,
        return 0;
 }
 
-               
+/*
+  reload the nodes file 
+*/
+static void reload_nodes_file(struct ctdb_context *ctdb)
+{
+       ctdb->nodes = NULL;
+       ctdb_load_nodes_file(ctdb);
+}
+
+       
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
 static int do_recovery(struct ctdb_recoverd *rec, 
                       TALLOC_CTX *mem_ctx, uint32_t pnn,
                       struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
-                      uint32_t culprit)
+                      int32_t culprit)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        int i, j, ret;
        uint32_t generation;
        struct ctdb_dbid_map *dbmap;
        TDB_DATA data;
+       uint32_t *nodes;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
+       if (ctdb->num_nodes != nodemap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
+               reload_nodes_file(ctdb);
+               return -1;
+       }
+
        /* if recovery fails, force it again */
        rec->need_recovery = true;
 
-       ctdb_set_culprit(rec, culprit);
+       if (culprit != -1) {
+               ctdb_set_culprit(rec, culprit);
+       }
 
        if (rec->culprit_counter > 2*nodemap->num) {
                DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
-                        culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
+                        rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
                         ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
+               ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
        }
 
        if (!ctdb_recovery_lock(ctdb, true)) {
@@ -1281,13 +1388,13 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        /* set recovery mode to active on all nodes */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
-       if (ret!=0) {
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return -1;
        }
 
        /* execute the "startrecovery" event script on all nodes */
-       ret = run_startrecovery_eventscript(ctdb, nodemap);
+       ret = run_startrecovery_eventscript(rec, nodemap);
        if (ret!=0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
                return -1;
@@ -1316,9 +1423,12 @@ static int do_recovery(struct ctdb_recoverd *rec,
        data.dptr = (void *)&generation;
        data.dsize = sizeof(uint32_t);
 
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
-                       list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
                return -1;
        }
@@ -1336,8 +1446,10 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        /* commit all the changes */
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
-                       list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
                return -1;
        }
@@ -1345,19 +1457,45 @@ static int do_recovery(struct ctdb_recoverd *rec,
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
        
 
+       /* update the capabilities for all nodes */
+       ret = update_capabilities(ctdb, nodemap);
+       if (ret!=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
+               return -1;
+       }
+
        /* build a new vnn map with all the currently active and
           unbanned nodes */
        generation = new_generation();
        vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
        CTDB_NO_MEMORY(ctdb, vnnmap);
        vnnmap->generation = generation;
-       vnnmap->size = rec->num_active;
+       vnnmap->size = 0;
        vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
+       CTDB_NO_MEMORY(ctdb, vnnmap->map);
        for (i=j=0;i<nodemap->num;i++) {
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                       vnnmap->map[j++] = nodemap->nodes[i].pnn;
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
+                       /* this node can not be an lmaster */
+                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
+                       continue;
                }
+
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[j++] = nodemap->nodes[i].pnn;
+
        }
+       if (vnnmap->size == 0) {
+               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[0] = pnn;
+       }       
 
        /* update to the new vnnmap on all nodes */
        ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
@@ -1380,46 +1518,48 @@ static int do_recovery(struct ctdb_recoverd *rec,
        /*
          update all nodes to have the same flags that we have
         */
-       ret = update_flags_on_all_nodes(ctdb, nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
-               return -1;
-       }
-       
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
+       for (i=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
 
-       /*
-         if enabled, tell nodes to takeover their public IPs
-        */
-       if (ctdb->vnn) {
-               rec->need_takeover_run = false;
-               ret = ctdb_takeover_run(ctdb, nodemap);
+               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
                        return -1;
                }
-               DEBUG(DEBUG_INFO, (__location__ " Recovery - done takeover\n"));
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
-       /* execute the "recovered" event script on all nodes */
-       ret = run_recovered_eventscript(ctdb, nodemap);
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
+       /* disable recovery mode */
+       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
                return -1;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
 
-       /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
+       /*
+         tell nodes to takeover their public IPs
+        */
+       rec->need_takeover_run = false;
+       ret = ctdb_takeover_run(ctdb, nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
+               return -1;
+       }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
+
+       /* execute the "recovered" event script on all nodes */
+       ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
        if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
                return -1;
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
 
        /* send a message to all clients telling them that the cluster 
           has been reconfigured */
@@ -1478,6 +1618,13 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
                        em->num_connected++;
                }
        }
+
+       /* we shouldnt try to win this election if we cant be a recmaster */
+       if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
+               em->num_connected = 0;
+               em->priority_time = timeval_current();
+       }
+
        talloc_free(nodemap);
 }
 
@@ -1491,6 +1638,11 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
 
        ctdb_election_data(rec, &myem);
 
+       /* we cant win if we dont have the recmaster capability */
+       if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
+               return false;
+       }
+
        /* we cant win if we are banned */
        if (rec->node_flags & NODE_FLAGS_BANNED) {
                return false;
@@ -1521,7 +1673,7 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
 /*
   send out an election request
  */
-static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
+static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
 {
        int ret;
        TDB_DATA election_data;
@@ -1537,19 +1689,26 @@ static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
        election_data.dptr  = (unsigned char *)&emsg;
 
 
-       /* first we assume we will win the election and set 
-          recoverymaster to be ourself on the current node
-        */
-       ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
-               return -1;
-       }
-
-
        /* send an election message to all active nodes */
        ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
 
+
+       /* A new node that is already frozen has entered the cluster.
+          The existing nodes are not frozen and dont need to be frozen
+          until the election has ended and we start the actual recovery
+       */
+       if (update_recmaster == true) {
+               /* first we assume we will win the election and set 
+                  recoverymaster to be ourself on the current node
+                */
+               ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
+                       return -1;
+               }
+       }
+
+
        return 0;
 }
 
@@ -1587,7 +1746,7 @@ static void election_send_request(struct event_context *ev, struct timed_event *
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
        int ret;
 
-       ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
+       ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
        }
@@ -1596,6 +1755,49 @@ static void election_send_request(struct event_context *ev, struct timed_event *
        rec->send_election_te = NULL;
 }
 
+/*
+  handler for memory dumps
+*/
+static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       TDB_DATA *dump;
+       int ret;
+       struct rd_memdump_reply *rd;
+
+       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+               DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       rd = (struct rd_memdump_reply *)data.dptr;
+
+       dump = talloc_zero(tmp_ctx, TDB_DATA);
+       if (dump == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       ret = ctdb_dump_memory(ctdb, dump);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));           
+
+       ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       talloc_free(tmp_ctx);
+}
+
 /*
   handler for recovery master elections
 */
@@ -1664,7 +1866,7 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
 /*
   force the start of the election process
  */
-static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
+static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
                           struct ctdb_node_map *nodemap)
 {
        int ret;
@@ -1672,7 +1874,7 @@ static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint3
 
        /* set all nodes to recovery mode to stop all internode traffic */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
-       if (ret!=0) {
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
        }
@@ -1682,7 +1884,7 @@ static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint3
                                                timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
                                                ctdb_election_timeout, rec);
 
-       ret = send_election_request(rec, pnn);
+       ret = send_election_request(rec, pnn, true);
        if (ret!=0) {
                DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
                return;
@@ -1736,15 +1938,6 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        changed_flags = c->old_flags ^ c->new_flags;
 
-       /* Dont let messages from remote nodes change the DISCONNECTED flag. 
-          This flag is handled locally based on whether the local node
-          can communicate with the node or not.
-       */
-       c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
-       if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
-               c->new_flags |= NODE_FLAGS_DISCONNECTED;
-       }
-
        if (nodemap->nodes[i].flags != c->new_flags) {
                DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
@@ -1761,8 +1954,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        
        if (ret == 0 &&
            ctdb->recovery_master == ctdb->pnn &&
-           ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
-           ctdb->vnn) {
+           ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
                /* Only do the takeover run if the perm disabled or unhealthy
                   flags changed since these will cause an ip failover but not
                   a recovery.
@@ -1778,6 +1970,20 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        talloc_free(tmp_ctx);
 }
 
+/*
+  handler for when we need to push out flag changes ot all other nodes
+*/
+static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                           TDB_DATA data, void *private_data)
+{
+       int ret;
+       struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+
+       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+       }
+}
 
 
 struct verify_recmode_normal_data {
@@ -1870,6 +2076,7 @@ static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb
 
 
 struct verify_recmaster_data {
+       struct ctdb_recoverd *rec;
        uint32_t count;
        uint32_t pnn;
        enum monitor_result status;
@@ -1898,6 +2105,7 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
        */
        if (state->status != rmdata->pnn) {
                DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
+               ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
                rmdata->status = MONITOR_ELECTION_NEEDED;
        }
 
@@ -1906,8 +2114,9 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
 
 
 /* verify that all nodes agree that we are the recmaster */
-static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
+static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
 {
+       struct ctdb_context *ctdb = rec->ctdb;
        struct verify_recmaster_data *rmdata;
        TALLOC_CTX *mem_ctx = talloc_new(ctdb);
        struct ctdb_client_control_state *state;
@@ -1916,6 +2125,7 @@ static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ct
        
        rmdata = talloc(mem_ctx, struct verify_recmaster_data);
        CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
+       rmdata->rec    = rec;
        rmdata->count  = 0;
        rmdata->pnn    = pnn;
        rmdata->status = MONITOR_OK;
@@ -1959,71 +2169,152 @@ static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ct
        return status;
 }
 
-/*
-  this function writes the number of connected nodes we have for this pnn
-  to the pnn slot in the reclock file
+
+/* called to check that the allocation of public ip addresses is ok.
 */
-static void
-ctdb_recoverd_write_pnn_connect_count(struct ctdb_recoverd *rec, const char count)
+static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
 {
-       struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
+       TALLOC_CTX *mem_ctx = talloc_new(NULL);
+       struct ctdb_all_public_ips *ips = NULL;
+       struct ctdb_uptime *uptime1 = NULL;
+       struct ctdb_uptime *uptime2 = NULL;
+       int ret, j;
+
+       ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
+                               CTDB_CURRENT_NODE, &uptime1);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
 
-       if (pwrite(rec->rec_file_fd, &count, 1, ctdb->pnn) == -1) {
-               DEBUG(DEBUG_CRIT, (__location__ " Failed to write pnn count\n"));
+       /* read the ip allocation from the local node */
+       ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
        }
-}
 
-/* 
-  this function opens the reclock file and sets a byterage lock for the single
-  byte at position pnn+1.
-  the existence/non-existence of such a lock provides an alternative mechanism
-  to know whether a remote node(recovery daemon) is running or not.
-*/
-static void
-ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
-{
-       struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
-       struct flock lock;
-       char *pnnfile = NULL;
+       ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
+                               CTDB_CURRENT_NODE, &uptime2);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
+
+       /* skip the check if the startrecovery time has changed */
+       if (timeval_compare(&uptime1->last_recovery_started,
+                           &uptime2->last_recovery_started) != 0) {
+               DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
+               talloc_free(mem_ctx);
+               return 0;
+       }
 
-       DEBUG(DEBUG_INFO, ("Setting PNN lock for pnn:%d\n", ctdb->pnn));
+       /* skip the check if the endrecovery time has changed */
+       if (timeval_compare(&uptime1->last_recovery_finished,
+                           &uptime2->last_recovery_finished) != 0) {
+               DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
+               talloc_free(mem_ctx);
+               return 0;
+       }
 
-       if (rec->rec_file_fd != -1) {
-               DEBUG(DEBUG_CRIT, (__location__ " rec_lock_fd is already open. Aborting\n"));
-               exit(10);
+       /* skip the check if we have started but not finished recovery */
+       if (timeval_compare(&uptime1->last_recovery_finished,
+                           &uptime1->last_recovery_started) != 1) {
+               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
+               talloc_free(mem_ctx);
+
+               return 0;
        }
 
-       pnnfile = talloc_asprintf(rec, "%s.pnn", ctdb->recovery_lock_file);
-       CTDB_NO_MEMORY_FATAL(ctdb, pnnfile);
+       /* verify that we have the ip addresses we should have
+          and we dont have ones we shouldnt have.
+          if we find an inconsistency we set recmode to
+          active on the local node and wait for the recmaster
+          to do a full blown recovery
+       */
+       for (j=0; j<ips->num; j++) {
+               if (ips->ips[j].pnn == pnn) {
+                       if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                               DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
+                                       ctdb_addr_to_str(&ips->ips[j].addr)));
+                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                       }
+               } else {
+                       if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                               DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
+                                       ctdb_addr_to_str(&ips->ips[j].addr)));
+
+                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
 
-       rec->rec_file_fd = open(pnnfile, O_RDWR|O_CREAT, 0600);
-       if (rec->rec_file_fd == -1) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to open %s - (%s)\n", 
-                        pnnfile, strerror(errno)));
-               exit(10);
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                               if (ret != 0) {
+                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+
+                                       talloc_free(mem_ctx);
+                                       return -1;
+                               }
+                       }
+               }
        }
 
-       set_close_on_exec(rec->rec_file_fd);
-       lock.l_type = F_WRLCK;
-       lock.l_whence = SEEK_SET;
-       lock.l_start = ctdb->pnn;
-       lock.l_len = 1;
-       lock.l_pid = 0;
+       talloc_free(mem_ctx);
+       return 0;
+}
+
+
+static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_node_map **remote_nodemaps = callback_data;
 
-       if (fcntl(rec->rec_file_fd, F_SETLK, &lock) != 0) {
-               close(rec->rec_file_fd);
-               rec->rec_file_fd = -1;
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to get pnn lock on '%s'\n", pnnfile));
-               exit(10);
+       if (node_pnn >= ctdb->num_nodes) {
+               DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
+               return;
        }
 
+       remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
 
-       DEBUG(DEBUG_NOTICE,(__location__ " Got pnn lock on '%s'\n", pnnfile));
+}
 
-       talloc_free(pnnfile);
+static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
+       struct ctdb_node_map *nodemap,
+       struct ctdb_node_map **remote_nodemaps)
+{
+       uint32_t *nodes;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
+                                       nodes,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       async_getnodemap_callback,
+                                       NULL,
+                                       remote_nodemaps) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
+
+               return -1;
+       }
 
-       /* we start out with 0 connected nodes */
-       ctdb_recoverd_write_pnn_connect_count(rec, 0);
+       return 0;
 }
 
 /*
@@ -2031,16 +2322,16 @@ ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
  */
 static void monitor_cluster(struct ctdb_context *ctdb)
 {
-       uint32_t pnn, recmaster;
+       uint32_t pnn;
        TALLOC_CTX *mem_ctx=NULL;
        struct ctdb_node_map *nodemap=NULL;
-       struct ctdb_node_map *remote_nodemap=NULL;
+       struct ctdb_node_map *recmaster_nodemap=NULL;
+       struct ctdb_node_map **remote_nodemaps=NULL;
        struct ctdb_vnn_map *vnnmap=NULL;
        struct ctdb_vnn_map *remote_vnnmap=NULL;
        int32_t debug_level;
        int i, j, ret;
        struct ctdb_recoverd *rec;
-       struct ctdb_all_public_ips *ips;
        char c;
 
        DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
@@ -2054,17 +2345,19 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        rec->priority_time = timeval_current();
 
-       /* open the rec file fd and lock our slot */
-       rec->rec_file_fd = -1;
-       ctdb_recoverd_get_pnn_lock(rec);
+       /* register a message port for sending memory dumps */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
 
        /* register a message port for recovery elections */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
 
-       /* and one for when nodes are disabled/enabled */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
+       /* when nodes are disabled/enabled */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
+
+       /* when we are asked to puch out a flag change */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
 
-       /* and one for when nodes are banned */
+       /* when nodes are banned */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
 
        /* and one for when nodes are unbanned */
@@ -2072,7 +2365,7 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        /* register a message port for vacuum fetch */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
-       
+
 again:
        if (mem_ctx) {
                talloc_free(mem_ctx);
@@ -2093,6 +2386,9 @@ again:
                exit(-1);
        }
 
+       /* ping the local daemon to tell it we are alive */
+       ctdb_ctrl_recd_ping(ctdb);
+
        if (rec->election_timeout) {
                /* an election is in progress */
                goto again;
@@ -2140,22 +2436,28 @@ again:
 
 
        /* get number of nodes */
-       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
+       if (rec->nodemap) {
+               talloc_free(rec->nodemap);
+               rec->nodemap = NULL;
+               nodemap=NULL;
+       }
+       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
                goto again;
        }
+       nodemap = rec->nodemap;
 
        /* check which node is the recovery master */
-       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
+       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
                goto again;
        }
 
-       if (recmaster == (uint32_t)-1) {
+       if (rec->recmaster == (uint32_t)-1) {
                DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               force_election(rec, pnn, nodemap);
                goto again;
        }
        
@@ -2164,7 +2466,7 @@ again:
        */
        if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
                if (rec->banned_nodes[pnn] == NULL) {
-                       if (recmaster == pnn) {
+                       if (rec->recmaster == pnn) {
                                DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
 
                                ctdb_unban_node(rec, pnn);
@@ -2177,7 +2479,7 @@ again:
                }
        } else {
                if (rec->banned_nodes[pnn] != NULL) {
-                       if (recmaster == pnn) {
+                       if (rec->recmaster == pnn) {
                                DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
 
                                ctdb_unban_node(rec, pnn);
@@ -2195,37 +2497,41 @@ again:
        rec->node_flags = nodemap->nodes[pnn].flags;
 
        /* count how many active nodes there are */
-       rec->num_active = 0;
+       rec->num_active    = 0;
+       rec->num_connected = 0;
        for (i=0; i<nodemap->num; i++) {
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
                        rec->num_active++;
                }
+               if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
+                       rec->num_connected++;
+               }
        }
 
 
        /* verify that the recmaster node is still active */
        for (j=0; j<nodemap->num; j++) {
-               if (nodemap->nodes[j].pnn==recmaster) {
+               if (nodemap->nodes[j].pnn==rec->recmaster) {
                        break;
                }
        }
 
        if (j == nodemap->num) {
-               DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", recmaster));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
+               force_election(rec, pnn, nodemap);
                goto again;
        }
 
        /* if recovery master is disconnected we must elect a new recmaster */
        if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               force_election(rec, pnn, nodemap);
                goto again;
        }
 
        /* grap the nodemap from the recovery master to check if it is banned */
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                  mem_ctx, &remote_nodemap);
+                                  mem_ctx, &recmaster_nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
                          nodemap->nodes[j].pnn));
@@ -2233,62 +2539,28 @@ again:
        }
 
 
-       if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+       if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               force_election(rec, pnn, nodemap);
                goto again;
        }
 
-       /* verify that the public ip address allocation is consistent */
-       if (ctdb->vnn != NULL) {
-               ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", i));
+
+       /* verify that we have all ip addresses we should have and we dont
+        * have addresses we shouldnt have.
+        */ 
+       if (ctdb->do_checkpublicip) {
+               if (verify_ip_allocation(ctdb, pnn) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
                        goto again;
                }
-               for (j=0; j<ips->num; j++) {
-                       /* verify that we have the ip addresses we should have
-                          and we dont have ones we shouldnt have.
-                          if we find an inconsistency we set recmode to
-                          active on the local node and wait for the recmaster
-                          to do a full blown recovery
-                       */
-                       if (ips->ips[j].pnn == pnn) {
-                               if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
-                                       DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
-                                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                                       if (ret != 0) {
-                                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                                       if (ret != 0) {
-                                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                               }
-                       } else {
-                               if (ctdb_sys_have_ip(ips->ips[j].sin)) {
-                                       DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
-                                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                                       if (ret != 0) {
-                                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                                       if (ret != 0) {
-                                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
-                                               goto again;
-                                       }
-                               }
-                       }
-               }
        }
 
+
        /* if we are not the recmaster then we do not need to check
           if recovery is needed
         */
-       if (pnn != recmaster) {
+       if (pnn != rec->recmaster) {
                goto again;
        }
 
@@ -2297,7 +2569,7 @@ again:
        ret = update_local_flags(rec, nodemap);
        if (ret == MONITOR_ELECTION_NEEDED) {
                DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
-               force_election(rec, mem_ctx, pnn, nodemap);
+               force_election(rec, pnn, nodemap);
                goto again;
        }
        if (ret != MONITOR_OK) {
@@ -2308,6 +2580,11 @@ again:
        /* update the list of public ips that a node can handle for
           all connected nodes
        */
+       if (ctdb->num_nodes != nodemap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
+               reload_nodes_file(ctdb);
+               goto again;
+       }
        for (j=0; j<nodemap->num; j++) {
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
@@ -2330,12 +2607,12 @@ again:
 
 
        /* verify that all active nodes agree that we are the recmaster */
-       switch (verify_recmaster(ctdb, nodemap, pnn)) {
+       switch (verify_recmaster(rec, nodemap, pnn)) {
        case MONITOR_RECOVERY_NEEDED:
                /* can not happen */
                goto again;
        case MONITOR_ELECTION_NEEDED:
-               force_election(rec, mem_ctx, pnn, nodemap);
+               force_election(rec, pnn, nodemap);
                goto again;
        case MONITOR_OK:
                break;
@@ -2346,7 +2623,7 @@ again:
 
        if (rec->need_recovery) {
                /* a previous recovery didn't finish */
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
                goto again;             
        }
 
@@ -2381,31 +2658,42 @@ again:
                goto again;
        }
 
-       /* get the nodemap for all active remote nodes and verify
-          they are the same as for this node
+
+       /* get the nodemap for all active remote nodes
         */
+       remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
+       if (remote_nodemaps == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
+               goto again;
+       }
+       for(i=0; i<nodemap->num; i++) {
+               remote_nodemaps[i] = NULL;
+       }
+       if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
+               goto again;
+       } 
+
+       /* verify that all other nodes have the same nodemap as we have
+       */
        for (j=0; j<nodemap->num; j++) {
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (nodemap->nodes[j].pnn == pnn) {
-                       continue;
-               }
 
-               ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                          mem_ctx, &remote_nodemap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
-                                 nodemap->nodes[j].pnn));
+               if (remote_nodemaps[j] == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
+                       ctdb_set_culprit(rec, j);
+
                        goto again;
                }
 
-               /* if the nodes disagree on how many nodes there are
+               /* if the nodes disagree on how many nodes there are
                   then this is a good reason to try recovery
                 */
-               if (remote_nodemap->num != nodemap->num) {
+               if (remote_nodemaps[j]->num != nodemap->num) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
-                                 nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
+                                 nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
                        do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
                        goto again;
                }
@@ -2414,25 +2702,44 @@ again:
                   active, then that is also a good reason to do recovery
                 */
                for (i=0;i<nodemap->num;i++) {
-                       if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
+                       if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
                                DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
                                          nodemap->nodes[j].pnn, i, 
-                                         remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
-                               do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
-                               goto again;
-                       }
-                       if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
-                           (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                               DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
-                                         nodemap->nodes[j].pnn, i,
-                                         remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
+                                         remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
                                do_recovery(rec, mem_ctx, pnn, nodemap, 
                                            vnnmap, nodemap->nodes[j].pnn);
                                goto again;
                        }
                }
 
+               /* verify the flags are consistent
+               */
+               for (i=0; i<nodemap->num; i++) {
+                       if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                               continue;
+                       }
+                       
+                       if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
+                                 nodemap->nodes[j].pnn, 
+                                 nodemap->nodes[i].pnn, 
+                                 remote_nodemaps[j]->nodes[i].flags,
+                                 nodemap->nodes[j].flags));
+                               if (i == j) {
+                                       DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
+                                       update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
+                                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                                   vnnmap, nodemap->nodes[j].pnn);
+                                       goto again;
+                               } else {
+                                       DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
+                                       update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
+                                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                                   vnnmap, nodemap->nodes[j].pnn);
+                                       goto again;
+                               }
+                       }
+               }
        }
 
 
@@ -2523,7 +2830,7 @@ again:
                rec->need_takeover_run = false;
 
                /* execute the "startrecovery" event script on all nodes */
-               ret = run_startrecovery_eventscript(ctdb, nodemap);
+               ret = run_startrecovery_eventscript(rec, nodemap);
                if (ret!=0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
                        do_recovery(rec, mem_ctx, pnn, nodemap, 
@@ -2538,14 +2845,21 @@ again:
                }
 
                /* execute the "recovered" event script on all nodes */
-               ret = run_recovered_eventscript(ctdb, nodemap);
+               ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
+#if 0
+// we cant check whether the event completed successfully
+// since this script WILL fail if the node is in recovery mode
+// and if that race happens, the code here would just cause a second
+// cascading recovery.
                if (ret!=0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
                        do_recovery(rec, mem_ctx, pnn, nodemap, 
                                    vnnmap, ctdb->pnn);
                }
+#endif
        }
 
+
        goto again;
 
 }
@@ -2560,13 +2874,62 @@ static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde,
        _exit(1);
 }
 
+/*
+  called regularly to verify that the recovery daemon is still running
+ */
+static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
+                             struct timeval yt, void *p)
+{
+       struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+
+       if (kill(ctdb->recoverd_pid, 0) != 0) {
+               DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
+
+               ctdb_stop_recoverd(ctdb);
+               ctdb_stop_keepalive(ctdb);
+               ctdb_stop_monitoring(ctdb);
+               ctdb_release_all_ips(ctdb);
+               if (ctdb->methods != NULL) {
+                       ctdb->methods->shutdown(ctdb);
+               }
+               ctdb_event_script(ctdb, "shutdown");
+
+               exit(10);       
+       }
+
+       event_add_timed(ctdb->ev, ctdb, 
+                       timeval_current_ofs(30, 0),
+                       ctdb_check_recd, ctdb);
+}
+
+static void recd_sig_child_handler(struct event_context *ev,
+       struct signal_event *se, int signum, int count,
+       void *dont_care, 
+       void *private_data)
+{
+//     struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+       int status;
+       pid_t pid = -1;
+
+       while (pid != 0) {
+               pid = waitpid(-1, &status, WNOHANG);
+               if (pid == -1) {
+                       DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
+                       return;
+               }
+               if (pid > 0) {
+                       DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
+               }
+       }
+}
+
 /*
   startup the recovery daemon as a child of the main ctdb daemon
  */
 int ctdb_start_recoverd(struct ctdb_context *ctdb)
 {
-       int ret;
        int fd[2];
+       struct signal_event *se;
 
        if (pipe(fd) != 0) {
                return -1;
@@ -2581,35 +2944,31 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
        
        if (ctdb->recoverd_pid != 0) {
                close(fd[0]);
+               event_add_timed(ctdb->ev, ctdb, 
+                               timeval_current_ofs(30, 0),
+                               ctdb_check_recd, ctdb);
                return 0;
        }
 
        close(fd[1]);
 
-       /* shutdown the transport */
-       ctdb->methods->shutdown(ctdb);
+       srandom(getpid() ^ time(NULL));
 
-       /* get a new event context */
-       talloc_free(ctdb->ev);
-       ctdb->ev = event_context_init(ctdb);
+       if (switch_from_server_to_client(ctdb) != 0) {
+               DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
+               exit(1);
+       }
 
        event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
                     ctdb_recoverd_parent, &fd[0]);     
 
-       close(ctdb->daemon.sd);
-       ctdb->daemon.sd = -1;
-
-       srandom(getpid() ^ time(NULL));
-
-       /* the recovery daemon does not need to be realtime */
-       if (ctdb->do_setsched) {
-               ctdb_restore_scheduler(ctdb);
-       }
-
-       /* initialise ctdb */
-       ret = ctdb_socket_connect(ctdb);
-       if (ret != 0) {
-               DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
+       /* set up a handler to pick up sigchld */
+       se = event_add_signal(ctdb->ev, ctdb,
+                                    SIGCHLD, 0,
+                                    recd_sig_child_handler,
+                                    ctdb);
+       if (se == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
                exit(1);
        }