The recent change to the recovery daemon to keep track of and
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
index 4faa2f898b2b5f784ee703927f0ad8ff59a8febf..f04d6091fecc4d82c68301cb5f798624badfb144 100644 (file)
 #include "dlinklist.h"
 
 
-struct ban_state {
-       struct ctdb_recoverd *rec;
-       uint32_t banned_node;
+/* list of "ctdb ipreallocate" processes to call back when we have
+   finished the takeover run.
+*/
+struct ip_reallocate_list {
+       struct ip_reallocate_list *next;
+       struct rd_memdump_reply *rd;
+};
+
+struct ctdb_banning_state {
+       uint32_t count;
+       struct timeval last_reported_time;
 };
 
 /*
@@ -44,11 +52,8 @@ struct ctdb_recoverd {
        uint32_t recmaster;
        uint32_t num_active;
        uint32_t num_connected;
+       uint32_t last_culprit_node;
        struct ctdb_node_map *nodemap;
-       uint32_t last_culprit;
-       uint32_t culprit_counter;
-       struct timeval first_recover_time;
-       struct ban_state **banned_nodes;
        struct timeval priority_time;
        bool need_takeover_run;
        bool need_recovery;
@@ -56,82 +61,25 @@ struct ctdb_recoverd {
        struct timed_event *send_election_te;
        struct timed_event *election_timeout;
        struct vacuum_info *vacuum_info;
+       TALLOC_CTX *ip_reallocate_ctx;
+       struct ip_reallocate_list *reallocate_callers;
+       TALLOC_CTX *ip_check_disable_ctx;
+       struct ctdb_control_get_ifaces *ifaces;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
 
 
-/*
-  unban a node
- */
-static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
-{
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
-
-       if (!ctdb_validate_pnn(ctdb, pnn)) {
-               DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
-               return;
-       }
-
-       /* If we are unbanning a different node then just pass the ban info on */
-       if (pnn != ctdb->pnn) {
-               TDB_DATA data;
-               int ret;
-               
-               DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
-
-               data.dptr = (uint8_t *)&pnn;
-               data.dsize = sizeof(uint32_t);
-
-               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
-                       return;
-               }
-
-               return;
-       }
-
-       /* make sure we remember we are no longer banned in case 
-          there is an election */
-       rec->node_flags &= ~NODE_FLAGS_BANNED;
-
-       DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
-       ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
-
-       if (rec->banned_nodes[pnn] == NULL) {
-               DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
-               return;
-       }
-
-       talloc_free(rec->banned_nodes[pnn]);
-       rec->banned_nodes[pnn] = NULL;
-}
-
-
-/*
-  called when a ban has timed out
- */
-static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
-{
-       struct ban_state *state = talloc_get_type(p, struct ban_state);
-       struct ctdb_recoverd *rec = state->rec;
-       uint32_t pnn = state->banned_node;
-
-       DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
-       ctdb_unban_node(rec, pnn);
-}
-
 /*
   ban a node for a period of time
  */
 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
 {
+       int ret;
        struct ctdb_context *ctdb = rec->ctdb;
-
+       struct ctdb_ban_time bantime;
+       
        DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
 
        if (!ctdb_validate_pnn(ctdb, pnn)) {
@@ -139,61 +87,15 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
                return;
        }
 
-       if (0 == ctdb->tunable.enable_bans) {
-               DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
-               return;
-       }
-
-       /* If we are banning a different node then just pass the ban info on */
-       if (pnn != ctdb->pnn) {
-               struct ctdb_ban_info b;
-               TDB_DATA data;
-               int ret;
-               
-               DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
-
-               b.pnn = pnn;
-               b.ban_time = ban_time;
-
-               data.dptr = (uint8_t *)&b;
-               data.dsize = sizeof(b);
-
-               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
-                       return;
-               }
+       bantime.pnn  = pnn;
+       bantime.time = ban_time;
 
+       ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
                return;
        }
 
-       DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
-       ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
-
-       /* banning ourselves - lower our election priority */
-       rec->priority_time = timeval_current();
-
-       /* make sure we remember we are banned in case there is an 
-          election */
-       rec->node_flags |= NODE_FLAGS_BANNED;
-
-       if (rec->banned_nodes[pnn] != NULL) {
-               DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
-               talloc_free(rec->banned_nodes[pnn]);
-               rec->banned_nodes[pnn] = NULL;
-       }
-
-       rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
-
-       rec->banned_nodes[pnn]->rec = rec;
-       rec->banned_nodes[pnn]->banned_node = pnn;
-
-       if (ban_time != 0) {
-               event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
-                               timeval_current_ofs(ban_time, 0),
-                               ctdb_ban_timeout, rec->banned_nodes[pnn]);
-       }
 }
 
 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
@@ -212,7 +114,7 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -229,19 +131,41 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
 /*
   remember the trouble maker
  */
-static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
+static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
+       struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
+       struct ctdb_banning_state *ban_state;
+
+       if (culprit > ctdb->num_nodes) {
+               DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
+               return;
+       }
+
+       if (ctdb->nodes[culprit]->ban_state == NULL) {
+               ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
+               CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
 
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
-               /* either a new node is the culprit, or we've decided to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
+               
+       }
+       ban_state = ctdb->nodes[culprit]->ban_state;
+       if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
+               /* this was the first time in a long while this node
+                  misbehaved so we will forgive any old transgressions.
+               */
+               ban_state->count = 0;
        }
-       rec->culprit_counter++;
+
+       ban_state->count += count;
+       ban_state->last_reported_time = timeval_current();
+       rec->last_culprit_node = culprit;
+}
+
+/*
+  remember the trouble maker
+ */
+static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
+{
+       ctdb_set_culprit_count(rec, culprit, 1);
 }
 
 
@@ -271,7 +195,7 @@ static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        NULL,
                                        startrecovery_fail_callback,
@@ -288,7 +212,7 @@ static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_
 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
 {
        if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
-               DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
+               DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
                return;
        }
        if (node_pnn < ctdb->num_nodes) {
@@ -309,7 +233,8 @@ static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
-                                       nodes, CONTROL_TIMEOUT(),
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
                                        false, tdb_null,
                                        async_getcap_callback, NULL,
                                        NULL) != 0) {
@@ -322,10 +247,26 @@ static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *
        return 0;
 }
 
+static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
+       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
+}
+
+static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
+       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
+}
+
 /*
   change recovery mode on all nodes
  */
-static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
+static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
 {
        TDB_DATA data;
        uint32_t *nodes;
@@ -337,14 +278,20 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        /* freeze all nodes */
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (rec_mode == CTDB_RECOVERY_ACTIVE) {
-               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
-                                               nodes, CONTROL_TIMEOUT(),
+               int i;
+
+               for (i=1; i<=NUM_DB_PRIORITIES; i++) {
+                       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
+                                               nodes, i,
+                                               CONTROL_TIMEOUT(),
                                                false, tdb_null,
-                                               NULL, NULL,
-                                               NULL) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
+                                               NULL,
+                                               set_recmode_fail_callback,
+                                               rec) != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
+                               talloc_free(tmp_ctx);
+                               return -1;
+                       }
                }
        }
 
@@ -353,7 +300,8 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        data.dptr = (unsigned char *)&rec_mode;
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
-                                       nodes, CONTROL_TIMEOUT(),
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
                                        false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -383,7 +331,7 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -396,6 +344,50 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
        return 0;
 }
 
+/* update all remote nodes to use the same db priority that we have
+   this can fail if the remove node has not yet been upgraded to 
+   support this function, so we always return success and never fail
+   a recovery if this call fails.
+*/
+static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
+       struct ctdb_node_map *nodemap, 
+       uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+{
+       int db;
+       uint32_t *nodes;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
+
+       /* step through all local databases */
+       for (db=0; db<dbmap->num;db++) {
+               TDB_DATA data;
+               struct ctdb_db_priority db_prio;
+               int ret;
+
+               db_prio.db_id     = dbmap->dbs[db].dbid;
+               ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
+                       continue;
+               }
+
+               DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
+
+               data.dptr  = (uint8_t *)&db_prio;
+               data.dsize = sizeof(db_prio);
+
+               if (ctdb_client_async_control(ctdb,
+                                       CTDB_CONTROL_SET_DB_PRIORITY,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
+               }
+       }
+
+       return 0;
+}                      
 
 /*
   ensure all other nodes have attached to any databases that we have
@@ -530,7 +522,8 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
   pull the remote database contents from one node into the recdb
  */
 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
-                                   struct tdb_wrap *recdb, uint32_t dbid)
+                                   struct tdb_wrap *recdb, uint32_t dbid,
+                                   bool persistent)
 {
        int ret;
        TDB_DATA outdata;
@@ -612,8 +605,11 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
 /*
   pull all the remote database contents into the recdb
  */
-static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid)
+static int pull_remote_database(struct ctdb_context *ctdb,
+                               struct ctdb_recoverd *rec, 
+                               struct ctdb_node_map *nodemap, 
+                               struct tdb_wrap *recdb, uint32_t dbid,
+                               bool persistent)
 {
        int j;
 
@@ -625,9 +621,10 @@ static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
+               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
                                 nodemap->nodes[j].pnn));
+                       ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
                        return -1;
                }
        }
@@ -639,27 +636,12 @@ static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map
 /*
   update flags on all active nodes
  */
-static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
-{
-       int i;
-       for (i=0;i<nodemap->num;i++) {
-               int ret;
-
-               ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, nodemap->nodes[i].flags, ~nodemap->nodes[i].flags);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
-                       return -1;
-               }
-       }
-       return 0;
-}
-
-static int update_our_flags_on_all_nodes(struct ctdb_context *ctdb, uint32_t pnn, struct ctdb_node_map *nodemap)
+static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
 {
        int ret;
 
-       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[pnn].pnn, nodemap->nodes[pnn].flags, ~nodemap->nodes[pnn].flags);
-       if (ret != 0) {
+       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
+               if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
                return -1;
        }
@@ -693,62 +675,6 @@ static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_nod
 }
 
 
-/*
-  handler for when the admin bans a node
-*/
-static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                       TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-
-       if (data.dsize != sizeof(*b)) {
-               DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
-               talloc_free(mem_ctx);
-               return;
-       }
-
-       if (b->pnn != ctdb->pnn) {
-               DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
-               return;
-       }
-
-       DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
-                b->pnn, b->ban_time));
-
-       ctdb_ban_node(rec, b->pnn, b->ban_time);
-       talloc_free(mem_ctx);
-}
-
-/*
-  handler for when the admin unbans a node
-*/
-static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                         TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       uint32_t pnn;
-
-       if (data.dsize != sizeof(uint32_t)) {
-               DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
-               talloc_free(mem_ctx);
-               return;
-       }
-       pnn = *(uint32_t *)data.dptr;
-
-       if (pnn != ctdb->pnn) {
-               DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
-               return;
-       }
-
-       DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
-       ctdb_unban_node(rec, pnn);
-       talloc_free(mem_ctx);
-}
-
-
 struct vacuum_info {
        struct vacuum_info *next, *prev;
        struct ctdb_recoverd *rec;
@@ -978,6 +904,8 @@ static void ctdb_election_timeout(struct event_context *ev, struct timed_event *
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
        rec->election_timeout = NULL;
+
+       DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
 }
 
 
@@ -1027,15 +955,6 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                        return MONITOR_FAILED;
                }
                if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
-                       int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
-
-                       if (ban_changed) {
-                               DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
-                               nodemap->nodes[j].pnn,
-                               remote_nodemap->nodes[j].flags,
-                               nodemap->nodes[j].flags));
-                       }
-
                        /* We should tell our daemon about this so it
                           updates its flags or else we will log the same 
                           message again in the next iteration of recovery.
@@ -1055,15 +974,6 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                                 nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
                                 nodemap->nodes[j].flags));
                        nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
-
-                       /* If the BANNED flag has changed for the node
-                          this is a good reason to do a new election.
-                        */
-                       if (ban_changed) {
-                               talloc_free(mem_ctx);
-                               return MONITOR_ELECTION_NEEDED;
-                       }
-
                }
                talloc_free(remote_nodemap);
        }
@@ -1101,16 +1011,19 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
        unsigned tdb_flags;
 
        /* open up the temporary recovery database */
-       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
+       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
+                              ctdb->db_directory_state,
+                              ctdb->pnn);
        if (name == NULL) {
                return NULL;
        }
        unlink(name);
 
        tdb_flags = TDB_NOLOCK;
-       if (!ctdb->do_setsched) {
+       if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
+       tdb_flags |= TDB_DISALLOW_NESTING;
 
        recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
                              tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
@@ -1132,6 +1045,7 @@ struct recdb_data {
        struct ctdb_marshall_buffer *recdata;
        uint32_t len;
        bool failed;
+       bool persistent;
 };
 
 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
@@ -1147,7 +1061,9 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
        /* update the dmaster field to point to us */
        hdr = (struct ctdb_ltdb_header *)data.dptr;
-       hdr->dmaster = params->ctdb->pnn;
+       if (!params->persistent) {
+               hdr->dmaster = params->ctdb->pnn;
+       }
 
        /* add the record to the blob ready to send to the nodes */
        rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
@@ -1174,6 +1090,7 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
   push the recdb database out to all nodes
  */
 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
+                              bool persistent,
                               struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
 {
        struct recdb_data params;
@@ -1194,6 +1111,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
        params.recdata = recdata;
        params.len = offsetof(struct ctdb_marshall_buffer, data);
        params.failed = false;
+       params.persistent = persistent;
 
        if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
@@ -1216,7 +1134,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, outdata,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1242,6 +1160,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
 static int recover_database(struct ctdb_recoverd *rec, 
                            TALLOC_CTX *mem_ctx,
                            uint32_t dbid,
+                           bool persistent,
                            uint32_t pnn, 
                            struct ctdb_node_map *nodemap,
                            uint32_t transaction_id)
@@ -1259,7 +1178,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        }
 
        /* pull all remote databases onto the recdb */
-       ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
+       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
                return -1;
@@ -1276,7 +1195,7 @@ static int recover_database(struct ctdb_recoverd *rec,
 
        nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1287,7 +1206,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        
        /* push out the correct database. This sets the dmaster and skips 
           the empty records */
-       ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
+       ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
        if (ret != 0) {
                talloc_free(recdb);
                return -1;
@@ -1308,14 +1227,85 @@ static void reload_nodes_file(struct ctdb_context *ctdb)
        ctdb_load_nodes_file(ctdb);
 }
 
-       
+static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
+                                        struct ctdb_recoverd *rec,
+                                        struct ctdb_node_map *nodemap,
+                                        uint32_t *culprit)
+{
+       int j;
+       int ret;
+
+       if (ctdb->num_nodes != nodemap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
+                                 ctdb->num_nodes, nodemap->num));
+               if (culprit) {
+                       *culprit = ctdb->pnn;
+               }
+               return -1;
+       }
+
+       for (j=0; j<nodemap->num; j++) {
+               /* release any existing data */
+               if (ctdb->nodes[j]->known_public_ips) {
+                       talloc_free(ctdb->nodes[j]->known_public_ips);
+                       ctdb->nodes[j]->known_public_ips = NULL;
+               }
+               if (ctdb->nodes[j]->available_public_ips) {
+                       talloc_free(ctdb->nodes[j]->available_public_ips);
+                       ctdb->nodes[j]->available_public_ips = NULL;
+               }
+
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+
+               /* grab a new shiny list of public ips from the node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb,
+                                       CONTROL_TIMEOUT(),
+                                       ctdb->nodes[j]->pnn,
+                                       ctdb->nodes,
+                                       0,
+                                       &ctdb->nodes[j]->known_public_ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
+                               ctdb->nodes[j]->pnn));
+                       if (culprit) {
+                               *culprit = ctdb->nodes[j]->pnn;
+                       }
+                       return -1;
+               }
+
+               if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
+                       DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
+                       rec->need_takeover_run = true;
+               }
+
+               /* grab a new shiny list of public ips from the node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb,
+                                       CONTROL_TIMEOUT(),
+                                       ctdb->nodes[j]->pnn,
+                                       ctdb->nodes,
+                                       CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
+                                       &ctdb->nodes[j]->available_public_ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
+                               ctdb->nodes[j]->pnn));
+                       if (culprit) {
+                               *culprit = ctdb->nodes[j]->pnn;
+                       }
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
 static int do_recovery(struct ctdb_recoverd *rec, 
                       TALLOC_CTX *mem_ctx, uint32_t pnn,
-                      struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
-                      int32_t culprit)
+                      struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        int i, j, ret;
@@ -1323,36 +1313,45 @@ static int do_recovery(struct ctdb_recoverd *rec,
        struct ctdb_dbid_map *dbmap;
        TDB_DATA data;
        uint32_t *nodes;
+       struct timeval start_time;
+       uint32_t culprit = (uint32_t)-1;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
-       if (ctdb->num_nodes != nodemap->num) {
-               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
-               reload_nodes_file(ctdb);
-               return -1;
-       }
-
        /* if recovery fails, force it again */
        rec->need_recovery = true;
 
-       if (culprit != -1) {
-               ctdb_set_culprit(rec, culprit);
-       }
+       for (i=0; i<ctdb->num_nodes; i++) {
+               struct ctdb_banning_state *ban_state;
 
-       if (rec->culprit_counter > 2*nodemap->num) {
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
-                        culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
-                        ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 2*ctdb->num_nodes) {
+                       continue;
+               }
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
        }
 
-       if (!ctdb_recovery_lock(ctdb, true)) {
-               ctdb_set_culprit(rec, pnn);
-               DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
-               return -1;
+
+        if (ctdb->tunable.verify_recovery_lock != 0) {
+               DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
+               start_time = timeval_current();
+               if (!ctdb_recovery_lock(ctdb, true)) {
+                       ctdb_set_culprit(rec, pnn);
+                       DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
+                       return -1;
+               }
+               ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
+               DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
 
        /* get a list of all databases */
        ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
@@ -1377,12 +1376,18 @@ static int do_recovery(struct ctdb_recoverd *rec,
                DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
                return -1;
        }
-
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
 
+       /* update the database priority for all remote databases */
+       ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
+       }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
+
 
        /* set recovery mode to active on all nodes */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return -1;
@@ -1395,14 +1400,31 @@ static int do_recovery(struct ctdb_recoverd *rec,
                return -1;
        }
 
-       /* pick a new generation number */
-       generation = new_generation();
+       /*
+         update all nodes to have the same flags that we have
+        */
+       for (i=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
 
-       /* change the vnnmap on this node to use the new generation 
-          number but not on any other nodes.
-          this guarantees that if we abort the recovery prematurely
-          for some reason (a node stops responding?)
-          that we can just return immediately and we will reenter
+               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                       return -1;
+               }
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
+
+       /* pick a new generation number */
+       generation = new_generation();
+
+       /* change the vnnmap on this node to use the new generation 
+          number but not on any other nodes.
+          this guarantees that if we abort the recovery prematurely
+          for some reason (a node stops responding?)
+          that we can just return immediately and we will reenter
           recovery shortly again.
           I.e. we deliberately leave the cluster with an inconsistent
           generation id to allow us to abort recovery at any stage and
@@ -1420,18 +1442,31 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
+                                       NULL,
+                                       transaction_start_fail_callback,
+                                       rec) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
+               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL,
+                                       NULL,
+                                       NULL) != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
+               }
                return -1;
        }
 
        DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
 
        for (i=0;i<dbmap->num;i++) {
-               if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
+               ret = recover_database(rec, mem_ctx,
+                                      dbmap->dbs[i].dbid,
+                                      dbmap->dbs[i].persistent,
+                                      pnn, nodemap, generation);
+               if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
                        return -1;
                }
@@ -1441,7 +1476,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        /* commit all the changes */
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1513,16 +1548,22 @@ static int do_recovery(struct ctdb_recoverd *rec,
        /*
          update all nodes to have the same flags that we have
         */
-       ret = update_flags_on_all_nodes(ctdb, nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
-               return -1;
+       for (i=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                       return -1;
+               }
        }
-       
+
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
        /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
                return -1;
@@ -1533,6 +1574,12 @@ static int do_recovery(struct ctdb_recoverd *rec,
        /*
          tell nodes to takeover their public IPs
         */
+       ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                culprit));
+               return -1;
+       }
        rec->need_takeover_run = false;
        ret = ctdb_takeover_run(ctdb, nodemap);
        if (ret != 0) {
@@ -1558,6 +1605,27 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        rec->need_recovery = false;
 
+       /* we managed to complete a full recovery, make sure to forgive
+          any past sins by the nodes that could now participate in the
+          recovery.
+       */
+       DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
+       for (i=0;i<nodemap->num;i++) {
+               struct ctdb_banning_state *ban_state;
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
+               if (ban_state == NULL) {
+                       continue;
+               }
+
+               ban_state->count = 0;
+       }
+
+
        /* We just finished a recovery successfully. 
           We now wait for rerecovery_timeout before we allow 
           another recovery to take place.
@@ -1594,7 +1662,6 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
 
        em->pnn = rec->ctdb->pnn;
        em->priority_time = rec->priority_time;
-       em->node_flags = rec->node_flags;
 
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
        if (ret != 0) {
@@ -1602,6 +1669,9 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
                return;
        }
 
+       rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
+       em->node_flags = rec->node_flags;
+
        for (i=0;i<nodemap->num;i++) {
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
                        em->num_connected++;
@@ -1637,11 +1707,21 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
                return false;
        }       
 
+       /* we cant win if we are stopped */
+       if (rec->node_flags & NODE_FLAGS_STOPPED) {
+               return false;
+       }       
+
        /* we will automatically win if the other node is banned */
        if (em->node_flags & NODE_FLAGS_BANNED) {
                return true;
        }
 
+       /* we will automatically win if the other node is banned */
+       if (em->node_flags & NODE_FLAGS_STOPPED) {
+               return true;
+       }
+
        /* try to use the most connected node */
        if (cmp == 0) {
                cmp = (int)myem.num_connected - (int)em->num_connected;
@@ -1679,6 +1759,7 @@ static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool u
 
 
        /* send an election message to all active nodes */
+       DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
        ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
 
 
@@ -1787,6 +1868,172 @@ DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));
        talloc_free(tmp_ctx);
 }
 
+/*
+  handler for reload_nodes
+*/
+static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
+
+       reload_nodes_file(rec->ctdb);
+}
+
+
+static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
+                             struct timeval yt, void *p)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
+
+       talloc_free(rec->ip_check_disable_ctx);
+       rec->ip_check_disable_ctx = NULL;
+}
+
+
+static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_public_ip *ip;
+
+       if (rec->recmaster != rec->ctdb->pnn) {
+               DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
+               return;
+       }
+
+       if (data.dsize != sizeof(struct ctdb_public_ip)) {
+               DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
+               return;
+       }
+
+       ip = (struct ctdb_public_ip *)data.dptr;
+
+       update_ip_assignment_tree(rec->ctdb, ip);
+}
+
+
+static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       uint32_t timeout;
+
+       if (rec->ip_check_disable_ctx != NULL) {
+               talloc_free(rec->ip_check_disable_ctx);
+               rec->ip_check_disable_ctx = NULL;
+       }
+
+       if (data.dsize != sizeof(uint32_t)) {
+               DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
+                                "expexting %lu\n", (long unsigned)data.dsize,
+                                (long unsigned)sizeof(uint32_t)));
+               return;
+       }
+       if (data.dptr == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
+               return;
+       }
+
+       timeout = *((uint32_t *)data.dptr);
+       DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
+
+       rec->ip_check_disable_ctx = talloc_new(rec);
+       CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
+
+       event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
+}
+
+
+/*
+  handler for ip reallocate, just add it to the list of callers and 
+  handle this later in the monitor_cluster loop so we do not recurse
+  with other callers to takeover_run()
+*/
+static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ip_reallocate_list *caller;
+
+       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+               DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
+               return;
+       }
+
+       if (rec->ip_reallocate_ctx == NULL) {
+               rec->ip_reallocate_ctx = talloc_new(rec);
+               CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
+       }
+
+       caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
+       CTDB_NO_MEMORY_FATAL(ctdb, caller);
+
+       caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
+       caller->next = rec->reallocate_callers;
+       rec->reallocate_callers = caller;
+
+       return;
+}
+
+static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       TDB_DATA result;
+       int32_t ret;
+       struct ip_reallocate_list *callers;
+       uint32_t culprit;
+
+       DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
+
+       /* update the list of public ips that a node can handle for
+          all connected nodes
+       */
+       ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                culprit));
+               rec->need_takeover_run = true;
+       }
+       if (ret == 0) {
+               ret = ctdb_takeover_run(ctdb, rec->nodemap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                        culprit));
+                       rec->need_takeover_run = true;
+               }
+       }
+
+       result.dsize = sizeof(int32_t);
+       result.dptr  = (uint8_t *)&ret;
+
+       for (callers=rec->reallocate_callers; callers; callers=callers->next) {
+
+               /* Someone that sent srvid==0 does not want a reply */
+               if (callers->rd->srvid == 0) {
+                       continue;
+               }
+               DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
+                                 "%u:%llu\n", (unsigned)callers->rd->pnn,
+                                 (unsigned long long)callers->rd->srvid));
+               ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
+                                        "message to %u:%llu\n",
+                                        (unsigned)callers->rd->pnn,
+                                        (unsigned long long)callers->rd->srvid));
+               }
+       }
+
+       talloc_free(tmp_ctx);
+       talloc_free(rec->ip_reallocate_ctx);
+       rec->ip_reallocate_ctx = NULL;
+       rec->reallocate_callers = NULL;
+       
+}
+
+
 /*
   handler for recovery master elections
 */
@@ -1825,12 +2072,14 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
        talloc_free(rec->send_election_te);
        rec->send_election_te = NULL;
 
-       /* release the recmaster lock */
-       if (em->pnn != ctdb->pnn &&
-           ctdb->recovery_lock_fd != -1) {
-               close(ctdb->recovery_lock_fd);
-               ctdb->recovery_lock_fd = -1;
-               unban_all_nodes(ctdb);
+        if (ctdb->tunable.verify_recovery_lock != 0) {
+               /* release the recmaster lock */
+               if (em->pnn != ctdb->pnn &&
+                   ctdb->recovery_lock_fd != -1) {
+                       close(ctdb->recovery_lock_fd);
+                       ctdb->recovery_lock_fd = -1;
+                       unban_all_nodes(ctdb);
+               }
        }
 
        /* ok, let that guy become recmaster then */
@@ -1841,12 +2090,6 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-       /* release any bans */
-       rec->last_culprit = (uint32_t)-1;
-       talloc_free(rec->banned_nodes);
-       rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
-
        talloc_free(mem_ctx);
        return;
 }
@@ -1861,8 +2104,10 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
        int ret;
        struct ctdb_context *ctdb = rec->ctdb;
 
+       DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
+
        /* set all nodes to recovery mode to stop all internode traffic */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
@@ -1898,6 +2143,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        uint32_t changed_flags;
        int i;
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       int disabled_flag_changed;
 
        if (data.dsize != sizeof(*c)) {
                DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
@@ -1931,6 +2177,8 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
+       disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
+
        nodemap->nodes[i].flags = c->new_flags;
 
        ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
@@ -1951,7 +2199,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                   lead to an ip address failover but that is handled 
                   during recovery
                */
-               if (changed_flags & NODE_FLAGS_DISABLED) {
+               if (disabled_flag_changed) {
                        rec->need_takeover_run = true;
                }
        }
@@ -1967,11 +2215,47 @@ static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid,
 {
        int ret;
        struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+       struct ctdb_node_map *nodemap=NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t recmaster;
+       uint32_t *nodes;
 
-       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
+       /* find the recovery master */
+       ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
+               talloc_free(tmp_ctx);
+               return;
        }
+
+       /* read the node flags from the recmaster */
+       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       if (c->pnn >= nodemap->num) {
+               DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       /* send the flags update to all connected nodes */
+       nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
+
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
+                                     nodes, 0, CONTROL_TIMEOUT(),
+                                     false, data,
+                                     NULL, NULL,
+                                     NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
+
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       talloc_free(tmp_ctx);
 }
 
 
@@ -2159,15 +2443,18 @@ static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ct
 }
 
 
-/* called to check that the allocation of public ip addresses is ok.
+/* called to check that the local allocation of public ip addresses is ok.
 */
-static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
+static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
+       struct ctdb_control_get_ifaces *ifaces = NULL;
        struct ctdb_all_public_ips *ips = NULL;
        struct ctdb_uptime *uptime1 = NULL;
        struct ctdb_uptime *uptime2 = NULL;
        int ret, j;
+       bool need_iface_check = false;
+       bool need_takeover_run = false;
 
        ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
                                CTDB_CURRENT_NODE, &uptime1);
@@ -2177,6 +2464,30 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
                return -1;
        }
 
+
+       /* read the interfaces from the local node */
+       ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
+
+       if (!rec->ifaces) {
+               need_iface_check = true;
+       } else if (rec->ifaces->num != ifaces->num) {
+               need_iface_check = true;
+       } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
+               need_iface_check = true;
+       }
+
+       if (need_iface_check) {
+               DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
+                                    "local node %u - force takeover run\n",
+                                    pnn));
+               need_takeover_run = true;
+       }
+
        /* read the ip allocation from the local node */
        ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
        if (ret != 0) {
@@ -2212,12 +2523,15 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
        /* skip the check if we have started but not finished recovery */
        if (timeval_compare(&uptime1->last_recovery_finished,
                            &uptime1->last_recovery_started) != 1) {
-               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
+               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
                talloc_free(mem_ctx);
 
                return 0;
        }
 
+       talloc_free(rec->ifaces);
+       rec->ifaces = talloc_steal(rec, ifaces);
+
        /* verify that we have the ip addresses we should have
           and we dont have ones we shouldnt have.
           if we find an inconsistency we set recmode to
@@ -2229,48 +2543,299 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
                        if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
                                DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
                                        ctdb_addr_to_str(&ips->ips[j].addr)));
-                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
-
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
-
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
+                               need_takeover_run = true;
                        }
                } else {
                        if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
                                DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
                                        ctdb_addr_to_str(&ips->ips[j].addr)));
+                               need_takeover_run = true;
+                       }
+               }
+       }
 
-                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+       if (need_takeover_run) {
+               struct takeover_run_reply rd;
+               TDB_DATA data;
 
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+               DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
 
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                       }
+               rd.pnn = ctdb->pnn;
+               rd.srvid = 0;
+               data.dptr = (uint8_t *)&rd;
+               data.dsize = sizeof(rd);
+
+               ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
                }
        }
-
        talloc_free(mem_ctx);
        return 0;
 }
 
+
+static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_node_map **remote_nodemaps = callback_data;
+
+       if (node_pnn >= ctdb->num_nodes) {
+               DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
+               return;
+       }
+
+       remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
+
+}
+
+static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
+       struct ctdb_node_map *nodemap,
+       struct ctdb_node_map **remote_nodemaps)
+{
+       uint32_t *nodes;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       async_getnodemap_callback,
+                                       NULL,
+                                       remote_nodemaps) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
+
+               return -1;
+       }
+
+       return 0;
+}
+
+enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
+struct ctdb_check_reclock_state {
+       struct ctdb_context *ctdb;
+       struct timeval start_time;
+       int fd[2];
+       pid_t child;
+       struct timed_event *te;
+       struct fd_event *fde;
+       enum reclock_child_status status;
+};
+
+/* when we free the reclock state we must kill any child process.
+*/
+static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
+{
+       struct ctdb_context *ctdb = state->ctdb;
+
+       ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
+
+       if (state->fd[0] != -1) {
+               close(state->fd[0]);
+               state->fd[0] = -1;
+       }
+       if (state->fd[1] != -1) {
+               close(state->fd[1]);
+               state->fd[1] = -1;
+       }
+       kill(state->child, SIGKILL);
+       return 0;
+}
+
+/*
+  called if our check_reclock child times out. this would happen if
+  i/o to the reclock file blocks.
+ */
+static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
+                                        struct timeval t, void *private_data)
+{
+       struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
+                                          struct ctdb_check_reclock_state);
+
+       DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
+       state->status = RECLOCK_TIMEOUT;
+}
+
+/* this is called when the child process has completed checking the reclock
+   file and has written data back to us through the pipe.
+*/
+static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
+                            uint16_t flags, void *private_data)
+{
+       struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
+                                            struct ctdb_check_reclock_state);
+       char c = 0;
+       int ret;
+
+       /* we got a response from our child process so we can abort the
+          timeout.
+       */
+       talloc_free(state->te);
+       state->te = NULL;
+
+       ret = read(state->fd[0], &c, 1);
+       if (ret != 1 || c != RECLOCK_OK) {
+               DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
+               state->status = RECLOCK_FAILED;
+
+               return;
+       }
+
+       state->status = RECLOCK_OK;
+       return;
+}
+
+static int check_recovery_lock(struct ctdb_context *ctdb)
+{
+       int ret;
+       struct ctdb_check_reclock_state *state;
+       pid_t parent = getpid();
+
+       if (ctdb->recovery_lock_fd == -1) {
+               DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
+               return -1;
+       }
+
+       state = talloc(ctdb, struct ctdb_check_reclock_state);
+       CTDB_NO_MEMORY(ctdb, state);
+
+       state->ctdb = ctdb;
+       state->start_time = timeval_current();
+       state->status = RECLOCK_CHECKING;
+       state->fd[0] = -1;
+       state->fd[1] = -1;
+
+       ret = pipe(state->fd);
+       if (ret != 0) {
+               talloc_free(state);
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
+               return -1;
+       }
+
+       state->child = fork();
+       if (state->child == (pid_t)-1) {
+               DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
+               close(state->fd[0]);
+               state->fd[0] = -1;
+               close(state->fd[1]);
+               state->fd[1] = -1;
+               talloc_free(state);
+               return -1;
+       }
+
+       if (state->child == 0) {
+               char cc = RECLOCK_OK;
+               close(state->fd[0]);
+               state->fd[0] = -1;
+
+               if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
+                       DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
+                       cc = RECLOCK_FAILED;
+               }
+
+               write(state->fd[1], &cc, 1);
+               /* make sure we die when our parent dies */
+               while (kill(parent, 0) == 0 || errno != ESRCH) {
+                       sleep(5);
+                       write(state->fd[1], &cc, 1);
+               }
+               _exit(0);
+       }
+       close(state->fd[1]);
+       state->fd[1] = -1;
+       set_close_on_exec(state->fd[0]);
+
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
+
+       talloc_set_destructor(state, check_reclock_destructor);
+
+       state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
+                                   ctdb_check_reclock_timeout, state);
+       if (state->te == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
+               talloc_free(state);
+               return -1;
+       }
+
+       state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
+                               EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+                               reclock_child_handler,
+                               (void *)state);
+
+       if (state->fde == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
+               talloc_free(state);
+               return -1;
+       }
+
+       while (state->status == RECLOCK_CHECKING) {
+               event_loop_once(ctdb->ev);
+       }
+
+       if (state->status == RECLOCK_FAILED) {
+               DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
+               close(ctdb->recovery_lock_fd);
+               ctdb->recovery_lock_fd = -1;
+               talloc_free(state);
+               return -1;
+       }
+
+       talloc_free(state);
+       return 0;
+}
+
+static int update_recovery_lock_file(struct ctdb_context *ctdb)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       const char *reclockfile;
+
+       if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
+               talloc_free(tmp_ctx);
+               return -1;      
+       }
+
+       if (reclockfile == NULL) {
+               if (ctdb->recovery_lock_file != NULL) {
+                       DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
+                       talloc_free(ctdb->recovery_lock_file);
+                       ctdb->recovery_lock_file = NULL;
+                       if (ctdb->recovery_lock_fd != -1) {
+                               close(ctdb->recovery_lock_fd);
+                               ctdb->recovery_lock_fd = -1;
+                       }
+               }
+               ctdb->tunable.verify_recovery_lock = 0;
+               talloc_free(tmp_ctx);
+               return 0;
+       }
+
+       if (ctdb->recovery_lock_file == NULL) {
+               ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
+               if (ctdb->recovery_lock_fd != -1) {
+                       close(ctdb->recovery_lock_fd);
+                       ctdb->recovery_lock_fd = -1;
+               }
+               talloc_free(tmp_ctx);
+               return 0;
+       }
+
+
+       if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
+               talloc_free(tmp_ctx);
+               return 0;
+       }
+
+       talloc_free(ctdb->recovery_lock_file);
+       ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
+       ctdb->tunable.verify_recovery_lock = 0;
+       if (ctdb->recovery_lock_fd != -1) {
+               close(ctdb->recovery_lock_fd);
+               ctdb->recovery_lock_fd = -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+               
 /*
   the main monitoring loop
  */
@@ -2279,13 +2844,13 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        uint32_t pnn;
        TALLOC_CTX *mem_ctx=NULL;
        struct ctdb_node_map *nodemap=NULL;
-       struct ctdb_node_map *remote_nodemap=NULL;
+       struct ctdb_node_map *recmaster_nodemap=NULL;
+       struct ctdb_node_map **remote_nodemaps=NULL;
        struct ctdb_vnn_map *vnnmap=NULL;
        struct ctdb_vnn_map *remote_vnnmap=NULL;
        int32_t debug_level;
        int i, j, ret;
        struct ctdb_recoverd *rec;
-       char c;
 
        DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
 
@@ -2293,8 +2858,6 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        CTDB_NO_MEMORY_FATAL(ctdb, rec);
 
        rec->ctdb = ctdb;
-       rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
 
        rec->priority_time = timeval_current();
 
@@ -2310,15 +2873,21 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        /* when we are asked to puch out a flag change */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
 
-       /* when nodes are banned */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
-
-       /* and one for when nodes are unbanned */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
-
        /* register a message port for vacuum fetch */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
 
+       /* register a message port for reloadnodes  */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
+
+       /* register a message port for performing a takeover run */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
+
+       /* register a message port for disabling the ip check for a short while */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
+
+       /* register a message port for updating the recovery daemons node assignment for an ip */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
+
 again:
        if (mem_ctx) {
                talloc_free(mem_ctx);
@@ -2360,11 +2929,21 @@ again:
           as early as possible so we dont wait until we have pulled the node
           map from the local node. thats why we have the hardcoded value 20
        */
-       if (rec->culprit_counter > 20) {
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
-                        rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
-                        ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
+       for (i=0; i<ctdb->num_nodes; i++) {
+               struct ctdb_banning_state *ban_state;
+
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 20) {
+                       continue;
+               }
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
        }
 
        /* get relevant tunables */
@@ -2374,6 +2953,22 @@ again:
                goto again;
        }
 
+       /* get the current recovery lock file from the server */
+       if (update_recovery_lock_file(ctdb) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
+               goto again;
+       }
+
+       /* Make sure that if recovery lock verification becomes disabled when
+          we close the file
+       */
+        if (ctdb->tunable.verify_recovery_lock == 0) {
+               if (ctdb->recovery_lock_fd != -1) {
+                       close(ctdb->recovery_lock_fd);
+                       ctdb->recovery_lock_fd = -1;
+               }
+       }
+
        pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
        if (pnn == (uint32_t)-1) {
                DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
@@ -2408,43 +3003,64 @@ again:
                goto again;
        }
 
+       /* if we are not the recmaster we can safely ignore any ip reallocate requests */
+       if (rec->recmaster != pnn) {
+               if (rec->ip_reallocate_ctx != NULL) {
+                       talloc_free(rec->ip_reallocate_ctx);
+                       rec->ip_reallocate_ctx = NULL;
+                       rec->reallocate_callers = NULL;
+               }
+       }
+       /* if there are takeovers requested, perform it and notify the waiters */
+       if (rec->reallocate_callers) {
+               process_ipreallocate_requests(ctdb, rec);
+       }
+
        if (rec->recmaster == (uint32_t)-1) {
                DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
                force_election(rec, pnn, nodemap);
                goto again;
        }
-       
-       /* check that we (recovery daemon) and the local ctdb daemon
-          agrees on whether we are banned or not
+
+
+       /* if the local daemon is STOPPED, we verify that the databases are
+          also frozen and thet the recmode is set to active 
        */
-       if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
-               if (rec->banned_nodes[pnn] == NULL) {
-                       if (rec->recmaster == pnn) {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
-
-                               ctdb_unban_node(rec, pnn);
-                       } else {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
-                               ctdb_set_culprit(rec, pnn);
-                       }
-                       goto again;
+       if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
+               ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
                }
-       } else {
-               if (rec->banned_nodes[pnn] != NULL) {
-                       if (rec->recmaster == pnn) {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
+               if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
+                       DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
 
-                               ctdb_unban_node(rec, pnn);
-                       } else {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
+                       ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
+                               goto again;
+                       }
+                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
 
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
-                               ctdb_set_culprit(rec, pnn);
+                               goto again;
                        }
                        goto again;
                }
        }
+       /* If the local node is stopped, verify we are not the recmaster 
+          and yield this role if so
+       */
+       if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
+               DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
+               force_election(rec, pnn, nodemap);
+               goto again;
+       }
+       
+       /* check that we (recovery daemon) and the local ctdb daemon
+          agrees on whether we are banned or not
+       */
+//qqq
 
        /* remember our own node flags */
        rec->node_flags = nodemap->nodes[pnn].flags;
@@ -2484,7 +3100,7 @@ again:
 
        /* grap the nodemap from the recovery master to check if it is banned */
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                  mem_ctx, &remote_nodemap);
+                                  mem_ctx, &recmaster_nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
                          nodemap->nodes[j].pnn));
@@ -2492,28 +3108,21 @@ again:
        }
 
 
-       if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+       if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
                force_election(rec, pnn, nodemap);
                goto again;
        }
 
 
-       /* verify that we and the recmaster agrees on our flags */
-       if (nodemap->nodes[pnn].flags != remote_nodemap->nodes[pnn].flags) {
-               DEBUG(DEBUG_ERR, (__location__ " Recmaster disagrees on our flags flags:0x%x recmaster_flags:0x%x  Broadcasting out flags.\n", nodemap->nodes[pnn].flags, remote_nodemap->nodes[pnn].flags));
-
-               update_our_flags_on_all_nodes(ctdb, pnn, nodemap);
-       }
-
-
        /* verify that we have all ip addresses we should have and we dont
         * have addresses we shouldnt have.
         */ 
        if (ctdb->do_checkpublicip) {
-               if (verify_ip_allocation(ctdb, pnn) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
-                       goto again;
+               if (rec->ip_check_disable_ctx == NULL) {
+                       if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
+                       }
                }
        }
 
@@ -2538,34 +3147,11 @@ again:
                goto again;
        }
 
-       /* update the list of public ips that a node can handle for
-          all connected nodes
-       */
        if (ctdb->num_nodes != nodemap->num) {
                DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
                reload_nodes_file(ctdb);
                goto again;
        }
-       for (j=0; j<nodemap->num; j++) {
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               /* release any existing data */
-               if (ctdb->nodes[j]->public_ips) {
-                       talloc_free(ctdb->nodes[j]->public_ips);
-                       ctdb->nodes[j]->public_ips = NULL;
-               }
-               /* grab a new shiny list of public ips from the node */
-               if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
-                       ctdb->nodes[j]->pnn, 
-                       ctdb->nodes,
-                       &ctdb->nodes[j]->public_ips)) {
-                       DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
-                               ctdb->nodes[j]->pnn));
-                       goto again;
-               }
-       }
-
 
        /* verify that all active nodes agree that we are the recmaster */
        switch (verify_recmaster(rec, nodemap, pnn)) {
@@ -2584,16 +3170,16 @@ again:
 
        if (rec->need_recovery) {
                /* a previous recovery didn't finish */
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                goto again;             
        }
 
        /* verify that all active nodes are in normal mode 
           and not in recovery mode 
-        */
+       */
        switch (verify_recmode(ctdb, nodemap)) {
        case MONITOR_RECOVERY_NEEDED:
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                goto again;
        case MONITOR_FAILED:
                goto again;
@@ -2604,47 +3190,54 @@ again:
        }
 
 
-       /* we should have the reclock - check its not stale */
-       if (ctdb->recovery_lock_fd == -1) {
-               DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
-               goto again;
+        if (ctdb->tunable.verify_recovery_lock != 0) {
+               /* we should have the reclock - check its not stale */
+               ret = check_recovery_lock(ctdb);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       goto again;
+               }
        }
 
-       if (pread(ctdb->recovery_lock_fd, &c, 1, 0) == -1) {
-               DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
-               close(ctdb->recovery_lock_fd);
-               ctdb->recovery_lock_fd = -1;
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
+       /* get the nodemap for all active remote nodes
+        */
+       remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
+       if (remote_nodemaps == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
                goto again;
        }
+       for(i=0; i<nodemap->num; i++) {
+               remote_nodemaps[i] = NULL;
+       }
+       if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
+               goto again;
+       } 
 
-       /* get the nodemap for all active remote nodes and verify
-          they are the same as for this node
-        */
+       /* verify that all other nodes have the same nodemap as we have
+       */
        for (j=0; j<nodemap->num; j++) {
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (nodemap->nodes[j].pnn == pnn) {
-                       continue;
-               }
 
-               ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                          mem_ctx, &remote_nodemap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
-                                 nodemap->nodes[j].pnn));
+               if (remote_nodemaps[j] == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
+                       ctdb_set_culprit(rec, j);
+
                        goto again;
                }
 
-               /* if the nodes disagree on how many nodes there are
+               /* if the nodes disagree on how many nodes there are
                   then this is a good reason to try recovery
                 */
-               if (remote_nodemap->num != nodemap->num) {
+               if (remote_nodemaps[j]->num != nodemap->num) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
-                                 nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
+                                 nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                        goto again;
                }
 
@@ -2652,25 +3245,47 @@ again:
                   active, then that is also a good reason to do recovery
                 */
                for (i=0;i<nodemap->num;i++) {
-                       if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
+                       if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
                                DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
                                          nodemap->nodes[j].pnn, i, 
-                                         remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
-                               do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
-                               goto again;
-                       }
-                       if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
-                           (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                               DEBUG(DEBUG_WARNING, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
-                                         nodemap->nodes[j].pnn, i,
-                                         remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
+                                         remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
+                               ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
+                                           vnnmap);
                                goto again;
                        }
                }
 
+               /* verify the flags are consistent
+               */
+               for (i=0; i<nodemap->num; i++) {
+                       if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                               continue;
+                       }
+                       
+                       if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
+                               DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
+                                 nodemap->nodes[j].pnn, 
+                                 nodemap->nodes[i].pnn, 
+                                 remote_nodemaps[j]->nodes[i].flags,
+                                 nodemap->nodes[j].flags));
+                               if (i == j) {
+                                       DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
+                                       update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
+                                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                                   vnnmap);
+                                       goto again;
+                               } else {
+                                       DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
+                                       update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
+                                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                                       do_recovery(rec, mem_ctx, pnn, nodemap, 
+                                                   vnnmap);
+                                       goto again;
+                               }
+                       }
+               }
        }
 
 
@@ -2680,7 +3295,8 @@ again:
        if (vnnmap->size != rec->num_active) {
                DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
                          vnnmap->size, rec->num_active));
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
+               ctdb_set_culprit(rec, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                goto again;
        }
 
@@ -2703,7 +3319,8 @@ again:
                if (i == vnnmap->size) {
                        DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
                                  nodemap->nodes[j].pnn));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                        goto again;
                }
        }
@@ -2732,7 +3349,8 @@ again:
                if (vnnmap->generation != remote_vnnmap->generation) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                        goto again;
                }
 
@@ -2740,7 +3358,8 @@ again:
                if (vnnmap->size != remote_vnnmap->size) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                        goto again;
                }
 
@@ -2749,8 +3368,9 @@ again:
                        if (remote_vnnmap->map[i] != vnnmap->map[i]) {
                                DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
                                          nodemap->nodes[j].pnn));
+                               ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
+                                           vnnmap);
                                goto again;
                        }
                }
@@ -2758,21 +3378,37 @@ again:
 
        /* we might need to change who has what IP assigned */
        if (rec->need_takeover_run) {
+               uint32_t culprit = (uint32_t)-1;
+
                rec->need_takeover_run = false;
 
+               /* update the list of public ips that a node can handle for
+                  all connected nodes
+               */
+               ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                        culprit));
+                       ctdb_set_culprit(rec, culprit);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       goto again;
+               }
+
                /* execute the "startrecovery" event script on all nodes */
                ret = run_startrecovery_eventscript(rec, nodemap);
                if (ret!=0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       goto again;
                }
 
                ret = ctdb_takeover_run(ctdb, nodemap);
                if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       goto again;
                }
 
                /* execute the "recovered" event script on all nodes */
@@ -2784,8 +3420,8 @@ again:
 // cascading recovery.
                if (ret!=0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                }
 #endif
        }
@@ -2823,7 +3459,7 @@ static void ctdb_check_recd(struct event_context *ev, struct timed_event *te,
                if (ctdb->methods != NULL) {
                        ctdb->methods->shutdown(ctdb);
                }
-               ctdb_event_script(ctdb, "shutdown");
+               ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
 
                exit(10);       
        }
@@ -2845,7 +3481,9 @@ static void recd_sig_child_handler(struct event_context *ev,
        while (pid != 0) {
                pid = waitpid(-1, &status, WNOHANG);
                if (pid == -1) {
-                       DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
+                       if (errno != ECHILD) {
+                               DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
+                       }
                        return;
                }
                if (pid > 0) {
@@ -2859,7 +3497,6 @@ static void recd_sig_child_handler(struct event_context *ev,
  */
 int ctdb_start_recoverd(struct ctdb_context *ctdb)
 {
-       int ret;
        int fd[2];
        struct signal_event *se;
 
@@ -2884,35 +3521,18 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        close(fd[1]);
 
-       /* shutdown the transport */
-       if (ctdb->methods) {
-               ctdb->methods->shutdown(ctdb);
+       srandom(getpid() ^ time(NULL));
+
+       if (switch_from_server_to_client(ctdb) != 0) {
+               DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
+               exit(1);
        }
 
-       /* get a new event context */
-       talloc_free(ctdb->ev);
-       ctdb->ev = event_context_init(ctdb);
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
 
        event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
                     ctdb_recoverd_parent, &fd[0]);     
 
-       close(ctdb->daemon.sd);
-       ctdb->daemon.sd = -1;
-
-       srandom(getpid() ^ time(NULL));
-
-       /* the recovery daemon does not need to be realtime */
-       if (ctdb->do_setsched) {
-               ctdb_restore_scheduler(ctdb);
-       }
-
-       /* initialise ctdb */
-       ret = ctdb_socket_connect(ctdb);
-       if (ret != 0) {
-               DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
-               exit(1);
-       }
-
        /* set up a handler to pick up sigchld */
        se = event_add_signal(ctdb->ev, ctdb,
                                     SIGCHLD, 0,