server/recoverd: if we can't get the recovery lock, ban ourself
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
index 3ab44a739548ceb1a60c325a99cb83ec69a32ae5..30c34b3a64dd7089fb6be94b2c2a7f1e67e72532 100644 (file)
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "system/filesys.h"
 #include "system/time.h"
 #include "system/network.h"
 #include "system/wait.h"
 #include "popt.h"
 #include "cmdline.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
 #include "db_wrap.h"
 #include "dlinklist.h"
 
 
-struct ban_state {
-       struct ctdb_recoverd *rec;
-       uint32_t banned_node;
+/* list of "ctdb ipreallocate" processes to call back when we have
+   finished the takeover run.
+*/
+struct ip_reallocate_list {
+       struct ip_reallocate_list *next;
+       struct rd_memdump_reply *rd;
+};
+
+struct ctdb_banning_state {
+       uint32_t count;
+       struct timeval last_reported_time;
 };
 
 /*
@@ -44,11 +52,8 @@ struct ctdb_recoverd {
        uint32_t recmaster;
        uint32_t num_active;
        uint32_t num_connected;
+       uint32_t last_culprit_node;
        struct ctdb_node_map *nodemap;
-       uint32_t last_culprit;
-       uint32_t culprit_counter;
-       struct timeval first_recover_time;
-       struct ban_state **banned_nodes;
        struct timeval priority_time;
        bool need_takeover_run;
        bool need_recovery;
@@ -56,82 +61,25 @@ struct ctdb_recoverd {
        struct timed_event *send_election_te;
        struct timed_event *election_timeout;
        struct vacuum_info *vacuum_info;
+       TALLOC_CTX *ip_reallocate_ctx;
+       struct ip_reallocate_list *reallocate_callers;
+       TALLOC_CTX *ip_check_disable_ctx;
+       struct ctdb_control_get_ifaces *ifaces;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
 
 
-/*
-  unban a node
- */
-static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
-{
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
-
-       if (!ctdb_validate_pnn(ctdb, pnn)) {
-               DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
-               return;
-       }
-
-       /* If we are unbanning a different node then just pass the ban info on */
-       if (pnn != ctdb->pnn) {
-               TDB_DATA data;
-               int ret;
-               
-               DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
-
-               data.dptr = (uint8_t *)&pnn;
-               data.dsize = sizeof(uint32_t);
-
-               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
-                       return;
-               }
-
-               return;
-       }
-
-       /* make sure we remember we are no longer banned in case 
-          there is an election */
-       rec->node_flags &= ~NODE_FLAGS_BANNED;
-
-       DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
-       ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
-
-       if (rec->banned_nodes[pnn] == NULL) {
-               DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
-               return;
-       }
-
-       talloc_free(rec->banned_nodes[pnn]);
-       rec->banned_nodes[pnn] = NULL;
-}
-
-
-/*
-  called when a ban has timed out
- */
-static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
-{
-       struct ban_state *state = talloc_get_type(p, struct ban_state);
-       struct ctdb_recoverd *rec = state->rec;
-       uint32_t pnn = state->banned_node;
-
-       DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
-       ctdb_unban_node(rec, pnn);
-}
-
 /*
   ban a node for a period of time
  */
 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
 {
+       int ret;
        struct ctdb_context *ctdb = rec->ctdb;
-
+       struct ctdb_ban_time bantime;
+       
        DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
 
        if (!ctdb_validate_pnn(ctdb, pnn)) {
@@ -139,61 +87,15 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
                return;
        }
 
-       if (0 == ctdb->tunable.enable_bans) {
-               DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
-               return;
-       }
-
-       /* If we are banning a different node then just pass the ban info on */
-       if (pnn != ctdb->pnn) {
-               struct ctdb_ban_info b;
-               TDB_DATA data;
-               int ret;
-               
-               DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
-
-               b.pnn = pnn;
-               b.ban_time = ban_time;
-
-               data.dptr = (uint8_t *)&b;
-               data.dsize = sizeof(b);
-
-               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
-                       return;
-               }
+       bantime.pnn  = pnn;
+       bantime.time = ban_time;
 
+       ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
                return;
        }
 
-       DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
-       ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
-
-       /* banning ourselves - lower our election priority */
-       rec->priority_time = timeval_current();
-
-       /* make sure we remember we are banned in case there is an 
-          election */
-       rec->node_flags |= NODE_FLAGS_BANNED;
-
-       if (rec->banned_nodes[pnn] != NULL) {
-               DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
-               talloc_free(rec->banned_nodes[pnn]);
-               rec->banned_nodes[pnn] = NULL;
-       }
-
-       rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
-
-       rec->banned_nodes[pnn]->rec = rec;
-       rec->banned_nodes[pnn]->banned_node = pnn;
-
-       if (ban_time != 0) {
-               event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
-                               timeval_current_ofs(ban_time, 0),
-                               ctdb_ban_timeout, rec->banned_nodes[pnn]);
-       }
 }
 
 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
@@ -212,7 +114,7 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -229,39 +131,44 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
 /*
   remember the trouble maker
  */
-static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
+static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
+       struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
+       struct ctdb_banning_state *ban_state;
+
+       if (culprit > ctdb->num_nodes) {
+               DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
+               return;
+       }
 
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
-               /* either a new node is the culprit, or we've decided to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
+       if (ctdb->nodes[culprit]->ban_state == NULL) {
+               ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
+               CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
+
+               
+       }
+       ban_state = ctdb->nodes[culprit]->ban_state;
+       if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
+               /* this was the first time in a long while this node
+                  misbehaved so we will forgive any old transgressions.
+               */
+               ban_state->count = 0;
        }
-       rec->culprit_counter++;
+
+       ban_state->count += count;
+       ban_state->last_reported_time = timeval_current();
+       rec->last_culprit_node = culprit;
 }
 
 /*
   remember the trouble maker
  */
-static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
+static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
-               /* either a new node is the culprit, or we've decided to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
-       }
-       rec->culprit_counter += count;
+       ctdb_set_culprit_count(rec, culprit, 1);
 }
 
+
 /* this callback is called for every node that failed to execute the
    start recovery event
 */
@@ -288,7 +195,7 @@ static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        NULL,
                                        startrecovery_fail_callback,
@@ -305,7 +212,7 @@ static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_
 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
 {
        if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
-               DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
+               DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
                return;
        }
        if (node_pnn < ctdb->num_nodes) {
@@ -326,7 +233,8 @@ static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
-                                       nodes, CONTROL_TIMEOUT(),
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
                                        false, tdb_null,
                                        async_getcap_callback, NULL,
                                        NULL) != 0) {
@@ -339,10 +247,26 @@ static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *
        return 0;
 }
 
+static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
+       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
+}
+
+static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
+       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
+}
+
 /*
   change recovery mode on all nodes
  */
-static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
+static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
 {
        TDB_DATA data;
        uint32_t *nodes;
@@ -354,14 +278,20 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        /* freeze all nodes */
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (rec_mode == CTDB_RECOVERY_ACTIVE) {
-               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
-                                               nodes, CONTROL_TIMEOUT(),
+               int i;
+
+               for (i=1; i<=NUM_DB_PRIORITIES; i++) {
+                       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
+                                               nodes, i,
+                                               CONTROL_TIMEOUT(),
                                                false, tdb_null,
-                                               NULL, NULL,
-                                               NULL) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
+                                               NULL,
+                                               set_recmode_fail_callback,
+                                               rec) != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
+                               talloc_free(tmp_ctx);
+                               return -1;
+                       }
                }
        }
 
@@ -370,7 +300,8 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        data.dptr = (unsigned char *)&rec_mode;
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
-                                       nodes, CONTROL_TIMEOUT(),
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
                                        false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -400,7 +331,7 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -413,6 +344,50 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
        return 0;
 }
 
+/* update all remote nodes to use the same db priority that we have
+   this can fail if the remove node has not yet been upgraded to 
+   support this function, so we always return success and never fail
+   a recovery if this call fails.
+*/
+static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
+       struct ctdb_node_map *nodemap, 
+       uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+{
+       int db;
+       uint32_t *nodes;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
+
+       /* step through all local databases */
+       for (db=0; db<dbmap->num;db++) {
+               TDB_DATA data;
+               struct ctdb_db_priority db_prio;
+               int ret;
+
+               db_prio.db_id     = dbmap->dbs[db].dbid;
+               ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
+                       continue;
+               }
+
+               DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
+
+               data.dptr  = (uint8_t *)&db_prio;
+               data.dsize = sizeof(db_prio);
+
+               if (ctdb_client_async_control(ctdb,
+                                       CTDB_CONTROL_SET_DB_PRIORITY,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
+               }
+       }
+
+       return 0;
+}                      
 
 /*
   ensure all other nodes have attached to any databases that we have
@@ -547,7 +522,8 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
   pull the remote database contents from one node into the recdb
  */
 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
-                                   struct tdb_wrap *recdb, uint32_t dbid)
+                                   struct tdb_wrap *recdb, uint32_t dbid,
+                                   bool persistent)
 {
        int ret;
        TDB_DATA outdata;
@@ -632,7 +608,8 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
 static int pull_remote_database(struct ctdb_context *ctdb,
                                struct ctdb_recoverd *rec, 
                                struct ctdb_node_map *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid)
+                               struct tdb_wrap *recdb, uint32_t dbid,
+                               bool persistent)
 {
        int j;
 
@@ -644,7 +621,7 @@ static int pull_remote_database(struct ctdb_context *ctdb,
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
+               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
                                 nodemap->nodes[j].pnn));
                        ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
@@ -698,62 +675,6 @@ static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_nod
 }
 
 
-/*
-  handler for when the admin bans a node
-*/
-static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                       TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-
-       if (data.dsize != sizeof(*b)) {
-               DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
-               talloc_free(mem_ctx);
-               return;
-       }
-
-       if (b->pnn != ctdb->pnn) {
-               DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
-               return;
-       }
-
-       DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
-                b->pnn, b->ban_time));
-
-       ctdb_ban_node(rec, b->pnn, b->ban_time);
-       talloc_free(mem_ctx);
-}
-
-/*
-  handler for when the admin unbans a node
-*/
-static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                         TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       uint32_t pnn;
-
-       if (data.dsize != sizeof(uint32_t)) {
-               DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
-               talloc_free(mem_ctx);
-               return;
-       }
-       pnn = *(uint32_t *)data.dptr;
-
-       if (pnn != ctdb->pnn) {
-               DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
-               return;
-       }
-
-       DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
-       ctdb_unban_node(rec, pnn);
-       talloc_free(mem_ctx);
-}
-
-
 struct vacuum_info {
        struct vacuum_info *next, *prev;
        struct ctdb_recoverd *rec;
@@ -966,10 +887,11 @@ static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te,
 /*
   wait for a given number of seconds
  */
-static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
+static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
 {
        uint32_t timed_out = 0;
-       event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
+       time_t usecs = (secs - (time_t)secs) * 1000000;
+       event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
        while (!timed_out) {
                event_loop_once(ctdb->ev);
        }
@@ -983,6 +905,9 @@ static void ctdb_election_timeout(struct event_context *ev, struct timed_event *
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
        rec->election_timeout = NULL;
+       fast_start = false;
+
+       DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
 }
 
 
@@ -1032,15 +957,6 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                        return MONITOR_FAILED;
                }
                if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
-                       int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
-
-                       if (ban_changed) {
-                               DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
-                               nodemap->nodes[j].pnn,
-                               remote_nodemap->nodes[j].flags,
-                               nodemap->nodes[j].flags));
-                       }
-
                        /* We should tell our daemon about this so it
                           updates its flags or else we will log the same 
                           message again in the next iteration of recovery.
@@ -1060,15 +976,6 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                                 nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
                                 nodemap->nodes[j].flags));
                        nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
-
-                       /* If the BANNED flag has changed for the node
-                          this is a good reason to do a new election.
-                        */
-                       if (ban_changed) {
-                               talloc_free(mem_ctx);
-                               return MONITOR_ELECTION_NEEDED;
-                       }
-
                }
                talloc_free(remote_nodemap);
        }
@@ -1106,16 +1013,19 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
        unsigned tdb_flags;
 
        /* open up the temporary recovery database */
-       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
+       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
+                              ctdb->db_directory_state,
+                              ctdb->pnn);
        if (name == NULL) {
                return NULL;
        }
        unlink(name);
 
        tdb_flags = TDB_NOLOCK;
-       if (!ctdb->do_setsched) {
+       if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
+       tdb_flags |= TDB_DISALLOW_NESTING;
 
        recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
                              tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
@@ -1137,6 +1047,7 @@ struct recdb_data {
        struct ctdb_marshall_buffer *recdata;
        uint32_t len;
        bool failed;
+       bool persistent;
 };
 
 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
@@ -1152,7 +1063,9 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
        /* update the dmaster field to point to us */
        hdr = (struct ctdb_ltdb_header *)data.dptr;
-       hdr->dmaster = params->ctdb->pnn;
+       if (!params->persistent) {
+               hdr->dmaster = params->ctdb->pnn;
+       }
 
        /* add the record to the blob ready to send to the nodes */
        rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
@@ -1179,6 +1092,7 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
   push the recdb database out to all nodes
  */
 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
+                              bool persistent,
                               struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
 {
        struct recdb_data params;
@@ -1199,6 +1113,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
        params.recdata = recdata;
        params.len = offsetof(struct ctdb_marshall_buffer, data);
        params.failed = false;
+       params.persistent = persistent;
 
        if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
@@ -1221,7 +1136,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, outdata,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1247,6 +1162,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
 static int recover_database(struct ctdb_recoverd *rec, 
                            TALLOC_CTX *mem_ctx,
                            uint32_t dbid,
+                           bool persistent,
                            uint32_t pnn, 
                            struct ctdb_node_map *nodemap,
                            uint32_t transaction_id)
@@ -1264,7 +1180,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        }
 
        /* pull all remote databases onto the recdb */
-       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
+       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
                return -1;
@@ -1281,7 +1197,7 @@ static int recover_database(struct ctdb_recoverd *rec,
 
        nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1292,7 +1208,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        
        /* push out the correct database. This sets the dmaster and skips 
           the empty records */
-       ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
+       ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
        if (ret != 0) {
                talloc_free(recdb);
                return -1;
@@ -1313,14 +1229,122 @@ static void reload_nodes_file(struct ctdb_context *ctdb)
        ctdb_load_nodes_file(ctdb);
 }
 
-       
+static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
+                                        struct ctdb_recoverd *rec,
+                                        struct ctdb_node_map *nodemap,
+                                        uint32_t *culprit)
+{
+       int j;
+       int ret;
+
+       if (ctdb->num_nodes != nodemap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
+                                 ctdb->num_nodes, nodemap->num));
+               if (culprit) {
+                       *culprit = ctdb->pnn;
+               }
+               return -1;
+       }
+
+       for (j=0; j<nodemap->num; j++) {
+               /* release any existing data */
+               if (ctdb->nodes[j]->known_public_ips) {
+                       talloc_free(ctdb->nodes[j]->known_public_ips);
+                       ctdb->nodes[j]->known_public_ips = NULL;
+               }
+               if (ctdb->nodes[j]->available_public_ips) {
+                       talloc_free(ctdb->nodes[j]->available_public_ips);
+                       ctdb->nodes[j]->available_public_ips = NULL;
+               }
+
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+
+               /* grab a new shiny list of public ips from the node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb,
+                                       CONTROL_TIMEOUT(),
+                                       ctdb->nodes[j]->pnn,
+                                       ctdb->nodes,
+                                       0,
+                                       &ctdb->nodes[j]->known_public_ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
+                               ctdb->nodes[j]->pnn));
+                       if (culprit) {
+                               *culprit = ctdb->nodes[j]->pnn;
+                       }
+                       return -1;
+               }
+
+               if (rec->ip_check_disable_ctx == NULL) {
+                       if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
+                               DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
+                               rec->need_takeover_run = true;
+                       }
+               }
+
+               /* grab a new shiny list of public ips from the node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb,
+                                       CONTROL_TIMEOUT(),
+                                       ctdb->nodes[j]->pnn,
+                                       ctdb->nodes,
+                                       CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
+                                       &ctdb->nodes[j]->available_public_ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
+                               ctdb->nodes[j]->pnn));
+                       if (culprit) {
+                               *culprit = ctdb->nodes[j]->pnn;
+                       }
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+/* when we start a recovery, make sure all nodes use the same reclock file
+   setting
+*/
+static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       TDB_DATA data;
+       uint32_t *nodes;
+
+       if (ctdb->recovery_lock_file == NULL) {
+               data.dptr  = NULL;
+               data.dsize = 0;
+       } else {
+               data.dsize = strlen(ctdb->recovery_lock_file) + 1;
+               data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
+       }
+
+       nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
+                                       false, data,
+                                       NULL, NULL,
+                                       rec) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
 static int do_recovery(struct ctdb_recoverd *rec, 
                       TALLOC_CTX *mem_ctx, uint32_t pnn,
-                      struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
-                      int32_t culprit)
+                      struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        int i, j, ret;
@@ -1329,34 +1353,46 @@ static int do_recovery(struct ctdb_recoverd *rec,
        TDB_DATA data;
        uint32_t *nodes;
        struct timeval start_time;
+       uint32_t culprit = (uint32_t)-1;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
        /* if recovery fails, force it again */
        rec->need_recovery = true;
 
-       if (culprit != -1) {
-               ctdb_set_culprit(rec, culprit);
-       }
+       for (i=0; i<ctdb->num_nodes; i++) {
+               struct ctdb_banning_state *ban_state;
 
-       if (rec->culprit_counter > 2*nodemap->num) {
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
-                        rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
-                        ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 2*ctdb->num_nodes) {
+                       continue;
+               }
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
        }
 
-       DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
-       start_time = timeval_current();
-       if (!ctdb_recovery_lock(ctdb, true)) {
-               ctdb_set_culprit(rec, pnn);
-               DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
-               return -1;
+
+        if (ctdb->tunable.verify_recovery_lock != 0) {
+               DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
+               start_time = timeval_current();
+               if (!ctdb_recovery_lock(ctdb, true)) {
+                       DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
+                                        "and ban ourself for %u seconds\n",
+                                        ctdb->tunable.recovery_ban_period));
+                       ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
+                       return -1;
+               }
+               ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
+               DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
        }
-       ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
-       DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
 
        /* get a list of all databases */
        ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
@@ -1381,12 +1417,23 @@ static int do_recovery(struct ctdb_recoverd *rec,
                DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
                return -1;
        }
-
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
 
+       /* update the database priority for all remote databases */
+       ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
+       }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
+
+
+       /* update all other nodes to use the same setting for reclock files
+          as the local recovery master.
+       */
+       sync_recovery_lock_file_across_cluster(rec);
 
        /* set recovery mode to active on all nodes */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return -1;
@@ -1399,8 +1446,25 @@ static int do_recovery(struct ctdb_recoverd *rec,
                return -1;
        }
 
-       /* pick a new generation number */
-       generation = new_generation();
+       /*
+         update all nodes to have the same flags that we have
+        */
+       for (i=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                       return -1;
+               }
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
+
+       /* pick a new generation number */
+       generation = new_generation();
 
        /* change the vnnmap on this node to use the new generation 
           number but not on any other nodes.
@@ -1424,18 +1488,31 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
+                                       NULL,
+                                       transaction_start_fail_callback,
+                                       rec) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
+               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL,
+                                       NULL,
+                                       NULL) != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
+               }
                return -1;
        }
 
        DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
 
        for (i=0;i<dbmap->num;i++) {
-               if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
+               ret = recover_database(rec, mem_ctx,
+                                      dbmap->dbs[i].dbid,
+                                      dbmap->dbs[i].persistent,
+                                      pnn, nodemap, generation);
+               if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
                        return -1;
                }
@@ -1445,7 +1522,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        /* commit all the changes */
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1532,7 +1609,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
        /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
                return -1;
@@ -1543,6 +1620,12 @@ static int do_recovery(struct ctdb_recoverd *rec,
        /*
          tell nodes to takeover their public IPs
         */
+       ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                culprit));
+               return -1;
+       }
        rec->need_takeover_run = false;
        ret = ctdb_takeover_run(ctdb, nodemap);
        if (ret != 0) {
@@ -1562,12 +1645,33 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        /* send a message to all clients telling them that the cluster 
           has been reconfigured */
-       ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
+       ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
 
        rec->need_recovery = false;
 
+       /* we managed to complete a full recovery, make sure to forgive
+          any past sins by the nodes that could now participate in the
+          recovery.
+       */
+       DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
+       for (i=0;i<nodemap->num;i++) {
+               struct ctdb_banning_state *ban_state;
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
+               if (ban_state == NULL) {
+                       continue;
+               }
+
+               ban_state->count = 0;
+       }
+
+
        /* We just finished a recovery successfully. 
           We now wait for rerecovery_timeout before we allow 
           another recovery to take place.
@@ -1604,7 +1708,6 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
 
        em->pnn = rec->ctdb->pnn;
        em->priority_time = rec->priority_time;
-       em->node_flags = rec->node_flags;
 
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
        if (ret != 0) {
@@ -1612,6 +1715,9 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
                return;
        }
 
+       rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
+       em->node_flags = rec->node_flags;
+
        for (i=0;i<nodemap->num;i++) {
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
                        em->num_connected++;
@@ -1647,11 +1753,21 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
                return false;
        }       
 
+       /* we cant win if we are stopped */
+       if (rec->node_flags & NODE_FLAGS_STOPPED) {
+               return false;
+       }       
+
        /* we will automatically win if the other node is banned */
        if (em->node_flags & NODE_FLAGS_BANNED) {
                return true;
        }
 
+       /* we will automatically win if the other node is banned */
+       if (em->node_flags & NODE_FLAGS_STOPPED) {
+               return true;
+       }
+
        /* try to use the most connected node */
        if (cmp == 0) {
                cmp = (int)myem.num_connected - (int)em->num_connected;
@@ -1689,7 +1805,8 @@ static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool u
 
 
        /* send an election message to all active nodes */
-       ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
+       DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
+       ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
 
 
        /* A new node that is already frozen has entered the cluster.
@@ -1787,7 +1904,7 @@ static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));           
 
-       ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
+       ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
                talloc_free(tmp_ctx);
@@ -1811,6 +1928,157 @@ static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid,
 }
 
 
+static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
+                             struct timeval yt, void *p)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
+
+       talloc_free(rec->ip_check_disable_ctx);
+       rec->ip_check_disable_ctx = NULL;
+}
+
+
+static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_public_ip *ip;
+
+       if (rec->recmaster != rec->ctdb->pnn) {
+               DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
+               return;
+       }
+
+       if (data.dsize != sizeof(struct ctdb_public_ip)) {
+               DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
+               return;
+       }
+
+       ip = (struct ctdb_public_ip *)data.dptr;
+
+       update_ip_assignment_tree(rec->ctdb, ip);
+}
+
+
+static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       uint32_t timeout;
+
+       if (rec->ip_check_disable_ctx != NULL) {
+               talloc_free(rec->ip_check_disable_ctx);
+               rec->ip_check_disable_ctx = NULL;
+       }
+
+       if (data.dsize != sizeof(uint32_t)) {
+               DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
+                                "expexting %lu\n", (long unsigned)data.dsize,
+                                (long unsigned)sizeof(uint32_t)));
+               return;
+       }
+       if (data.dptr == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
+               return;
+       }
+
+       timeout = *((uint32_t *)data.dptr);
+       DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
+
+       rec->ip_check_disable_ctx = talloc_new(rec);
+       CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
+
+       event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
+}
+
+
+/*
+  handler for ip reallocate, just add it to the list of callers and 
+  handle this later in the monitor_cluster loop so we do not recurse
+  with other callers to takeover_run()
+*/
+static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ip_reallocate_list *caller;
+
+       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+               DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
+               return;
+       }
+
+       if (rec->ip_reallocate_ctx == NULL) {
+               rec->ip_reallocate_ctx = talloc_new(rec);
+               CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
+       }
+
+       caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
+       CTDB_NO_MEMORY_FATAL(ctdb, caller);
+
+       caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
+       caller->next = rec->reallocate_callers;
+       rec->reallocate_callers = caller;
+
+       return;
+}
+
+static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       TDB_DATA result;
+       int32_t ret;
+       struct ip_reallocate_list *callers;
+       uint32_t culprit;
+
+       DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
+
+       /* update the list of public ips that a node can handle for
+          all connected nodes
+       */
+       ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                culprit));
+               rec->need_takeover_run = true;
+       }
+       if (ret == 0) {
+               ret = ctdb_takeover_run(ctdb, rec->nodemap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                        culprit));
+                       rec->need_takeover_run = true;
+               }
+       }
+
+       result.dsize = sizeof(int32_t);
+       result.dptr  = (uint8_t *)&ret;
+
+       for (callers=rec->reallocate_callers; callers; callers=callers->next) {
+
+               /* Someone that sent srvid==0 does not want a reply */
+               if (callers->rd->srvid == 0) {
+                       continue;
+               }
+               DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
+                                 "%u:%llu\n", (unsigned)callers->rd->pnn,
+                                 (unsigned long long)callers->rd->srvid));
+               ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
+                                        "message to %u:%llu\n",
+                                        (unsigned)callers->rd->pnn,
+                                        (unsigned long long)callers->rd->srvid));
+               }
+       }
+
+       talloc_free(tmp_ctx);
+       talloc_free(rec->ip_reallocate_ctx);
+       rec->ip_reallocate_ctx = NULL;
+       rec->reallocate_callers = NULL;
+       
+}
+
 
 /*
   handler for recovery master elections
@@ -1826,6 +2094,8 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
        /* we got an election packet - update the timeout for the election */
        talloc_free(rec->election_timeout);
        rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
+                                               fast_start ?
+                                               timeval_current_ofs(0, 500000) :
                                                timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
                                                ctdb_election_timeout, rec);
 
@@ -1850,12 +2120,14 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
        talloc_free(rec->send_election_te);
        rec->send_election_te = NULL;
 
-       /* release the recmaster lock */
-       if (em->pnn != ctdb->pnn &&
-           ctdb->recovery_lock_fd != -1) {
-               close(ctdb->recovery_lock_fd);
-               ctdb->recovery_lock_fd = -1;
-               unban_all_nodes(ctdb);
+        if (ctdb->tunable.verify_recovery_lock != 0) {
+               /* release the recmaster lock */
+               if (em->pnn != ctdb->pnn &&
+                   ctdb->recovery_lock_fd != -1) {
+                       close(ctdb->recovery_lock_fd);
+                       ctdb->recovery_lock_fd = -1;
+                       unban_all_nodes(ctdb);
+               }
        }
 
        /* ok, let that guy become recmaster then */
@@ -1866,12 +2138,6 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-       /* release any bans */
-       rec->last_culprit = (uint32_t)-1;
-       talloc_free(rec->banned_nodes);
-       rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
-
        talloc_free(mem_ctx);
        return;
 }
@@ -1886,8 +2152,10 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
        int ret;
        struct ctdb_context *ctdb = rec->ctdb;
 
+       DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
+
        /* set all nodes to recovery mode to stop all internode traffic */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
@@ -1895,6 +2163,8 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
 
        talloc_free(rec->election_timeout);
        rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
+                                               fast_start ?
+                                               timeval_current_ofs(0, 500000) :
                                                timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
                                                ctdb_election_timeout, rec);
 
@@ -1923,6 +2193,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        uint32_t changed_flags;
        int i;
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       int disabled_flag_changed;
 
        if (data.dsize != sizeof(*c)) {
                DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
@@ -1956,6 +2227,8 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
+       disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
+
        nodemap->nodes[i].flags = c->new_flags;
 
        ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
@@ -1976,7 +2249,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                   lead to an ip address failover but that is handled 
                   during recovery
                */
-               if (changed_flags & NODE_FLAGS_DISABLED) {
+               if (disabled_flag_changed) {
                        rec->need_takeover_run = true;
                }
        }
@@ -1992,11 +2265,47 @@ static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid,
 {
        int ret;
        struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+       struct ctdb_node_map *nodemap=NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t recmaster;
+       uint32_t *nodes;
 
-       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
+       /* find the recovery master */
+       ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
+               talloc_free(tmp_ctx);
+               return;
        }
+
+       /* read the node flags from the recmaster */
+       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       if (c->pnn >= nodemap->num) {
+               DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       /* send the flags update to all connected nodes */
+       nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
+
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
+                                     nodes, 0, CONTROL_TIMEOUT(),
+                                     false, data,
+                                     NULL, NULL,
+                                     NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
+
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       talloc_free(tmp_ctx);
 }
 
 
@@ -2184,15 +2493,18 @@ static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ct
 }
 
 
-/* called to check that the allocation of public ip addresses is ok.
+/* called to check that the local allocation of public ip addresses is ok.
 */
-static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
+static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
+       struct ctdb_control_get_ifaces *ifaces = NULL;
        struct ctdb_all_public_ips *ips = NULL;
        struct ctdb_uptime *uptime1 = NULL;
        struct ctdb_uptime *uptime2 = NULL;
        int ret, j;
+       bool need_iface_check = false;
+       bool need_takeover_run = false;
 
        ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
                                CTDB_CURRENT_NODE, &uptime1);
@@ -2202,6 +2514,30 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
                return -1;
        }
 
+
+       /* read the interfaces from the local node */
+       ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
+
+       if (!rec->ifaces) {
+               need_iface_check = true;
+       } else if (rec->ifaces->num != ifaces->num) {
+               need_iface_check = true;
+       } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
+               need_iface_check = true;
+       }
+
+       if (need_iface_check) {
+               DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
+                                    "local node %u - force takeover run\n",
+                                    pnn));
+               need_takeover_run = true;
+       }
+
        /* read the ip allocation from the local node */
        ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
        if (ret != 0) {
@@ -2237,12 +2573,15 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
        /* skip the check if we have started but not finished recovery */
        if (timeval_compare(&uptime1->last_recovery_finished,
                            &uptime1->last_recovery_started) != 1) {
-               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
+               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
                talloc_free(mem_ctx);
 
                return 0;
        }
 
+       talloc_free(rec->ifaces);
+       rec->ifaces = talloc_steal(rec, ifaces);
+
        /* verify that we have the ip addresses we should have
           and we dont have ones we shouldnt have.
           if we find an inconsistency we set recmode to
@@ -2254,44 +2593,33 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
                        if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
                                DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
                                        ctdb_addr_to_str(&ips->ips[j].addr)));
-                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
-
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
-
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
+                               need_takeover_run = true;
                        }
                } else {
                        if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
                                DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
                                        ctdb_addr_to_str(&ips->ips[j].addr)));
+                               need_takeover_run = true;
+                       }
+               }
+       }
 
-                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+       if (need_takeover_run) {
+               struct takeover_run_reply rd;
+               TDB_DATA data;
 
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+               DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
 
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                       }
+               rd.pnn = ctdb->pnn;
+               rd.srvid = 0;
+               data.dptr = (uint8_t *)&rd;
+               data.dsize = sizeof(rd);
+
+               ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
                }
        }
-
        talloc_free(mem_ctx);
        return 0;
 }
@@ -2318,7 +2646,7 @@ static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
 
        nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        async_getnodemap_callback,
                                        NULL,
@@ -2448,6 +2776,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                close(state->fd[0]);
                state->fd[0] = -1;
 
+               debug_extra = talloc_asprintf(NULL, "recovery-lock:");
                if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
                        DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
                        cc = RECLOCK_FAILED;
@@ -2463,6 +2792,9 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
        }
        close(state->fd[1]);
        state->fd[1] = -1;
+       set_close_on_exec(state->fd[0]);
+
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
 
        talloc_set_destructor(state, check_reclock_destructor);
 
@@ -2475,7 +2807,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
        }
 
        state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
-                               EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+                               EVENT_FD_READ,
                                reclock_child_handler,
                                (void *)state);
 
@@ -2484,6 +2816,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                talloc_free(state);
                return -1;
        }
+       tevent_fd_set_auto_close(state->fde);
 
        while (state->status == RECLOCK_CHECKING) {
                event_loop_once(ctdb->ev);
@@ -2501,70 +2834,73 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
        return 0;
 }
 
-/*
-  the main monitoring loop
- */
-static void monitor_cluster(struct ctdb_context *ctdb)
+static int update_recovery_lock_file(struct ctdb_context *ctdb)
 {
-       uint32_t pnn;
-       TALLOC_CTX *mem_ctx=NULL;
-       struct ctdb_node_map *nodemap=NULL;
-       struct ctdb_node_map *recmaster_nodemap=NULL;
-       struct ctdb_node_map **remote_nodemaps=NULL;
-       struct ctdb_vnn_map *vnnmap=NULL;
-       struct ctdb_vnn_map *remote_vnnmap=NULL;
-       int32_t debug_level;
-       int i, j, ret;
-       struct ctdb_recoverd *rec;
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       const char *reclockfile;
 
-       DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
-
-       rec = talloc_zero(ctdb, struct ctdb_recoverd);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec);
-
-       rec->ctdb = ctdb;
-       rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
+       if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
+               talloc_free(tmp_ctx);
+               return -1;      
+       }
 
-       rec->priority_time = timeval_current();
+       if (reclockfile == NULL) {
+               if (ctdb->recovery_lock_file != NULL) {
+                       DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
+                       talloc_free(ctdb->recovery_lock_file);
+                       ctdb->recovery_lock_file = NULL;
+                       if (ctdb->recovery_lock_fd != -1) {
+                               close(ctdb->recovery_lock_fd);
+                               ctdb->recovery_lock_fd = -1;
+                       }
+               }
+               ctdb->tunable.verify_recovery_lock = 0;
+               talloc_free(tmp_ctx);
+               return 0;
+       }
 
-       /* register a message port for sending memory dumps */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
+       if (ctdb->recovery_lock_file == NULL) {
+               ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
+               if (ctdb->recovery_lock_fd != -1) {
+                       close(ctdb->recovery_lock_fd);
+                       ctdb->recovery_lock_fd = -1;
+               }
+               talloc_free(tmp_ctx);
+               return 0;
+       }
 
-       /* register a message port for recovery elections */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
 
-       /* when nodes are disabled/enabled */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
+       if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
+               talloc_free(tmp_ctx);
+               return 0;
+       }
 
-       /* when we are asked to puch out a flag change */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
+       talloc_free(ctdb->recovery_lock_file);
+       ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
+       ctdb->tunable.verify_recovery_lock = 0;
+       if (ctdb->recovery_lock_fd != -1) {
+               close(ctdb->recovery_lock_fd);
+               ctdb->recovery_lock_fd = -1;
+       }
 
-       /* when nodes are banned */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
+       talloc_free(tmp_ctx);
+       return 0;
+}
 
-       /* and one for when nodes are unbanned */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
+static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
+                     TALLOC_CTX *mem_ctx)
+{
+       uint32_t pnn;
+       struct ctdb_node_map *nodemap=NULL;
+       struct ctdb_node_map *recmaster_nodemap=NULL;
+       struct ctdb_node_map **remote_nodemaps=NULL;
+       struct ctdb_vnn_map *vnnmap=NULL;
+       struct ctdb_vnn_map *remote_vnnmap=NULL;
+       int32_t debug_level;
+       int i, j, ret;
 
-       /* register a message port for vacuum fetch */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
 
-       /* register a message port for reloadnodes  */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
-
-again:
-       if (mem_ctx) {
-               talloc_free(mem_ctx);
-               mem_ctx = NULL;
-       }
-       mem_ctx = talloc_new(ctdb);
-       if (!mem_ctx) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
-               exit(-1);
-       }
-
-       /* we only check for recovery once every second */
-       ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
 
        /* verify that the main daemon is still running */
        if (kill(ctdb->ctdbd_pid, 0) != 0) {
@@ -2577,14 +2913,14 @@ again:
 
        if (rec->election_timeout) {
                /* an election is in progress */
-               goto again;
+               return;
        }
 
        /* read the debug level from the parent and update locally */
        ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
        if (ret !=0) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
-               goto again;
+               return;
        }
        LogLevel = debug_level;
 
@@ -2593,31 +2929,57 @@ again:
           as early as possible so we dont wait until we have pulled the node
           map from the local node. thats why we have the hardcoded value 20
        */
-       if (rec->culprit_counter > 20) {
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
-                        rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
-                        ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
+       for (i=0; i<ctdb->num_nodes; i++) {
+               struct ctdb_banning_state *ban_state;
+
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 20) {
+                       continue;
+               }
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
        }
 
        /* get relevant tunables */
        ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
-               goto again;
+               return;
+       }
+
+       /* get the current recovery lock file from the server */
+       if (update_recovery_lock_file(ctdb) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
+               return;
+       }
+
+       /* Make sure that if recovery lock verification becomes disabled when
+          we close the file
+       */
+        if (ctdb->tunable.verify_recovery_lock == 0) {
+               if (ctdb->recovery_lock_fd != -1) {
+                       close(ctdb->recovery_lock_fd);
+                       ctdb->recovery_lock_fd = -1;
+               }
        }
 
        pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
        if (pnn == (uint32_t)-1) {
                DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
-               goto again;
+               return;
        }
 
        /* get the vnnmap */
        ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
-               goto again;
+               return;
        }
 
 
@@ -2630,7 +2992,7 @@ again:
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
-               goto again;
+               return;
        }
        nodemap = rec->nodemap;
 
@@ -2638,46 +3000,63 @@ again:
        ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
-               goto again;
+               return;
+       }
+
+       /* if we are not the recmaster we can safely ignore any ip reallocate requests */
+       if (rec->recmaster != pnn) {
+               if (rec->ip_reallocate_ctx != NULL) {
+                       talloc_free(rec->ip_reallocate_ctx);
+                       rec->ip_reallocate_ctx = NULL;
+                       rec->reallocate_callers = NULL;
+               }
        }
 
        if (rec->recmaster == (uint32_t)-1) {
                DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
-       
-       /* check that we (recovery daemon) and the local ctdb daemon
-          agrees on whether we are banned or not
+
+
+       /* if the local daemon is STOPPED, we verify that the databases are
+          also frozen and thet the recmode is set to active 
        */
-       if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
-               if (rec->banned_nodes[pnn] == NULL) {
-                       if (rec->recmaster == pnn) {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
-
-                               ctdb_unban_node(rec, pnn);
-                       } else {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
-                               ctdb_set_culprit(rec, pnn);
-                       }
-                       goto again;
+       if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
+               ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
                }
-       } else {
-               if (rec->banned_nodes[pnn] != NULL) {
-                       if (rec->recmaster == pnn) {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
+               if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
+                       DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
 
-                               ctdb_unban_node(rec, pnn);
-                       } else {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
+                       ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
+                               return;
+                       }
+                       ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
 
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
-                               ctdb_set_culprit(rec, pnn);
+                               return;
                        }
-                       goto again;
+                       return;
                }
        }
+       /* If the local node is stopped, verify we are not the recmaster 
+          and yield this role if so
+       */
+       if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
+               DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
+               force_election(rec, pnn, nodemap);
+               return;
+       }
+       
+       /* check that we (recovery daemon) and the local ctdb daemon
+          agrees on whether we are banned or not
+       */
+//qqq
 
        /* remember our own node flags */
        rec->node_flags = nodemap->nodes[pnn].flags;
@@ -2705,14 +3084,14 @@ again:
        if (j == nodemap->num) {
                DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
 
        /* if recovery master is disconnected we must elect a new recmaster */
        if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
 
        /* grap the nodemap from the recovery master to check if it is banned */
@@ -2721,14 +3100,14 @@ again:
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
                          nodemap->nodes[j].pnn));
-               goto again;
+               return;
        }
 
 
        if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
 
 
@@ -2736,9 +3115,10 @@ again:
         * have addresses we shouldnt have.
         */ 
        if (ctdb->do_checkpublicip) {
-               if (verify_ip_allocation(ctdb, pnn) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
-                       goto again;
+               if (rec->ip_check_disable_ctx == NULL) {
+                       if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
+                       }
                }
        }
 
@@ -2747,7 +3127,7 @@ again:
           if recovery is needed
         */
        if (pnn != rec->recmaster) {
-               goto again;
+               return;
        }
 
 
@@ -2756,74 +3136,49 @@ again:
        if (ret == MONITOR_ELECTION_NEEDED) {
                DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
        if (ret != MONITOR_OK) {
                DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
-               goto again;
+               return;
        }
 
-       /* update the list of public ips that a node can handle for
-          all connected nodes
-       */
        if (ctdb->num_nodes != nodemap->num) {
                DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
                reload_nodes_file(ctdb);
-               goto again;
-       }
-       for (j=0; j<nodemap->num; j++) {
-               /* release any existing data */
-               if (ctdb->nodes[j]->public_ips) {
-                       talloc_free(ctdb->nodes[j]->public_ips);
-                       ctdb->nodes[j]->public_ips = NULL;
-               }
-
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               /* grab a new shiny list of public ips from the node */
-               if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
-                       ctdb->nodes[j]->pnn, 
-                       ctdb->nodes,
-                       &ctdb->nodes[j]->public_ips)) {
-                       DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
-                               ctdb->nodes[j]->pnn));
-                       goto again;
-               }
+               return;
        }
 
-
        /* verify that all active nodes agree that we are the recmaster */
        switch (verify_recmaster(rec, nodemap, pnn)) {
        case MONITOR_RECOVERY_NEEDED:
                /* can not happen */
-               goto again;
+               return;
        case MONITOR_ELECTION_NEEDED:
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        case MONITOR_OK:
                break;
        case MONITOR_FAILED:
-               goto again;
+               return;
        }
 
 
        if (rec->need_recovery) {
                /* a previous recovery didn't finish */
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
-               goto again;             
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+               return;
        }
 
        /* verify that all active nodes are in normal mode 
           and not in recovery mode 
-        */
+       */
        switch (verify_recmode(ctdb, nodemap)) {
        case MONITOR_RECOVERY_NEEDED:
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
-               goto again;
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+               return;
        case MONITOR_FAILED:
-               goto again;
+               return;
        case MONITOR_ELECTION_NEEDED:
                /* can not happen */
        case MONITOR_OK:
@@ -2831,12 +3186,20 @@ again:
        }
 
 
-       /* we should have the reclock - check its not stale */
-       ret = check_recovery_lock(ctdb);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
-               goto again;
+        if (ctdb->tunable.verify_recovery_lock != 0) {
+               /* we should have the reclock - check its not stale */
+               ret = check_recovery_lock(ctdb);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
+               }
+       }
+
+       /* if there are takeovers requested, perform it and notify the waiters */
+       if (rec->reallocate_callers) {
+               process_ipreallocate_requests(ctdb, rec);
        }
 
        /* get the nodemap for all active remote nodes
@@ -2844,14 +3207,14 @@ again:
        remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
        if (remote_nodemaps == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
-               goto again;
+               return;
        }
        for(i=0; i<nodemap->num; i++) {
                remote_nodemaps[i] = NULL;
        }
        if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
-               goto again;
+               return;
        } 
 
        /* verify that all other nodes have the same nodemap as we have
@@ -2865,7 +3228,7 @@ again:
                        DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
                        ctdb_set_culprit(rec, j);
 
-                       goto again;
+                       return;
                }
 
                /* if the nodes disagree on how many nodes there are
@@ -2874,8 +3237,9 @@ again:
                if (remote_nodemaps[j]->num != nodemap->num) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
                                  nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                /* if the nodes disagree on which nodes exist and are
@@ -2886,9 +3250,10 @@ again:
                                DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
                                          nodemap->nodes[j].pnn, i, 
                                          remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
+                               ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
-                               goto again;
+                                           vnnmap);
+                               return;
                        }
                }
 
@@ -2908,15 +3273,17 @@ again:
                                if (i == j) {
                                        DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
                                        update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
+                                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                        do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                                   vnnmap, nodemap->nodes[j].pnn);
-                                       goto again;
+                                                   vnnmap);
+                                       return;
                                } else {
                                        DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
                                        update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
+                                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                        do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                                   vnnmap, nodemap->nodes[j].pnn);
-                                       goto again;
+                                                   vnnmap);
+                                       return;
                                }
                        }
                }
@@ -2929,8 +3296,9 @@ again:
        if (vnnmap->size != rec->num_active) {
                DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
                          vnnmap->size, rec->num_active));
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
-               goto again;
+               ctdb_set_culprit(rec, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+               return;
        }
 
        /* verify that all active nodes in the nodemap also exist in 
@@ -2952,8 +3320,9 @@ again:
                if (i == vnnmap->size) {
                        DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
                                  nodemap->nodes[j].pnn));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
        }
 
@@ -2974,23 +3343,25 @@ again:
                if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
                                  nodemap->nodes[j].pnn));
-                       goto again;
+                       return;
                }
 
                /* verify the vnnmap generation is the same */
                if (vnnmap->generation != remote_vnnmap->generation) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                /* verify the vnnmap size is the same */
                if (vnnmap->size != remote_vnnmap->size) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                /* verify the vnnmap is the same */
@@ -2998,30 +3369,47 @@ again:
                        if (remote_vnnmap->map[i] != vnnmap->map[i]) {
                                DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
                                          nodemap->nodes[j].pnn));
+                               ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
-                               goto again;
+                                           vnnmap);
+                               return;
                        }
                }
        }
 
        /* we might need to change who has what IP assigned */
        if (rec->need_takeover_run) {
+               uint32_t culprit = (uint32_t)-1;
+
                rec->need_takeover_run = false;
 
+               /* update the list of public ips that a node can handle for
+                  all connected nodes
+               */
+               ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                        culprit));
+                       ctdb_set_culprit(rec, culprit);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
+               }
+
                /* execute the "startrecovery" event script on all nodes */
                ret = run_startrecovery_eventscript(rec, nodemap);
                if (ret!=0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                ret = ctdb_takeover_run(ctdb, nodemap);
                if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                /* execute the "recovered" event script on all nodes */
@@ -3033,15 +3421,78 @@ again:
 // cascading recovery.
                if (ret!=0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                }
 #endif
        }
+}
+
+/*
+  the main monitoring loop
+ */
+static void monitor_cluster(struct ctdb_context *ctdb)
+{
+       struct ctdb_recoverd *rec;
+
+       DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
+
+       rec = talloc_zero(ctdb, struct ctdb_recoverd);
+       CTDB_NO_MEMORY_FATAL(ctdb, rec);
+
+       rec->ctdb = ctdb;
+
+       rec->priority_time = timeval_current();
+
+       /* register a message port for sending memory dumps */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
+
+       /* register a message port for recovery elections */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
+
+       /* when nodes are disabled/enabled */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
+
+       /* when we are asked to puch out a flag change */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
+
+       /* register a message port for vacuum fetch */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
+
+       /* register a message port for reloadnodes  */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
+
+       /* register a message port for performing a takeover run */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
+
+       /* register a message port for disabling the ip check for a short while */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
+
+       /* register a message port for updating the recovery daemons node assignment for an ip */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
 
+       for (;;) {
+               TALLOC_CTX *mem_ctx = talloc_new(ctdb);
+               struct timeval start;
+               double elapsed;
 
-       goto again;
+               if (!mem_ctx) {
+                       DEBUG(DEBUG_CRIT,(__location__
+                                         " Failed to create temp context\n"));
+                       exit(-1);
+               }
+
+               start = timeval_current();
+               main_loop(ctdb, rec, mem_ctx);
+               talloc_free(mem_ctx);
 
+               /* we only check for recovery once every second */
+               elapsed = timeval_elapsed(&start);
+               if (elapsed < ctdb->tunable.recover_interval) {
+                       ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
+                                         - elapsed);
+               }
+       }
 }
 
 /*
@@ -3072,7 +3523,7 @@ static void ctdb_check_recd(struct event_context *ev, struct timed_event *te,
                if (ctdb->methods != NULL) {
                        ctdb->methods->shutdown(ctdb);
                }
-               ctdb_event_script(ctdb, "shutdown");
+               ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
 
                exit(10);       
        }
@@ -3112,6 +3563,7 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 {
        int fd[2];
        struct signal_event *se;
+       struct tevent_fd *fde;
 
        if (pipe(fd) != 0) {
                return -1;
@@ -3136,13 +3588,16 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        srandom(getpid() ^ time(NULL));
 
-       if (switch_from_server_to_client(ctdb) != 0) {
+       if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
                DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
                exit(1);
        }
 
-       event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
+
+       fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
                     ctdb_recoverd_parent, &fd[0]);     
+       tevent_fd_set_auto_close(fde);
 
        /* set up a handler to pick up sigchld */
        se = event_add_signal(ctdb->ev, ctdb,