ctdb: hold transaction locks during freeze, mark during recover.
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
index d601ca60a82f0e63eb66e9efc5d806206833e327..b82f0e77f5418e8b36d63c64631df41c406dffec 100644 (file)
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "system/filesys.h"
 #include "system/time.h"
 #include "system/network.h"
 #include "system/wait.h"
 #include "popt.h"
 #include "cmdline.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
 #include "db_wrap.h"
 #include "dlinklist.h"
 
 
-struct ban_state {
-       struct ctdb_recoverd *rec;
-       uint32_t banned_node;
-};
-
 /* list of "ctdb ipreallocate" processes to call back when we have
    finished the takeover run.
 */
@@ -44,6 +39,11 @@ struct ip_reallocate_list {
        struct rd_memdump_reply *rd;
 };
 
+struct ctdb_banning_state {
+       uint32_t count;
+       struct timeval last_reported_time;
+};
+
 /*
   private state of recovery daemon
  */
@@ -52,11 +52,8 @@ struct ctdb_recoverd {
        uint32_t recmaster;
        uint32_t num_active;
        uint32_t num_connected;
+       uint32_t last_culprit_node;
        struct ctdb_node_map *nodemap;
-       uint32_t last_culprit;
-       uint32_t culprit_counter;
-       struct timeval first_recover_time;
-       struct ban_state **banned_nodes;
        struct timeval priority_time;
        bool need_takeover_run;
        bool need_recovery;
@@ -66,82 +63,23 @@ struct ctdb_recoverd {
        struct vacuum_info *vacuum_info;
        TALLOC_CTX *ip_reallocate_ctx;
        struct ip_reallocate_list *reallocate_callers;
+       TALLOC_CTX *ip_check_disable_ctx;
+       struct ctdb_control_get_ifaces *ifaces;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
 
 
-/*
-  unban a node
- */
-static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
-{
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
-
-       if (!ctdb_validate_pnn(ctdb, pnn)) {
-               DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
-               return;
-       }
-
-       /* If we are unbanning a different node then just pass the ban info on */
-       if (pnn != ctdb->pnn) {
-               TDB_DATA data;
-               int ret;
-               
-               DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
-
-               data.dptr = (uint8_t *)&pnn;
-               data.dsize = sizeof(uint32_t);
-
-               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
-                       return;
-               }
-
-               return;
-       }
-
-       /* make sure we remember we are no longer banned in case 
-          there is an election */
-       rec->node_flags &= ~NODE_FLAGS_BANNED;
-
-       DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
-       ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
-
-       if (rec->banned_nodes[pnn] == NULL) {
-               DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
-               return;
-       }
-
-       talloc_free(rec->banned_nodes[pnn]);
-       rec->banned_nodes[pnn] = NULL;
-}
-
-
-/*
-  called when a ban has timed out
- */
-static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
-{
-       struct ban_state *state = talloc_get_type(p, struct ban_state);
-       struct ctdb_recoverd *rec = state->rec;
-       uint32_t pnn = state->banned_node;
-
-       DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
-       ctdb_unban_node(rec, pnn);
-}
-
 /*
   ban a node for a period of time
  */
 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
 {
+       int ret;
        struct ctdb_context *ctdb = rec->ctdb;
-
+       struct ctdb_ban_time bantime;
+       
        DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
 
        if (!ctdb_validate_pnn(ctdb, pnn)) {
@@ -149,61 +87,15 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
                return;
        }
 
-       if (0 == ctdb->tunable.enable_bans) {
-               DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
-               return;
-       }
-
-       /* If we are banning a different node then just pass the ban info on */
-       if (pnn != ctdb->pnn) {
-               struct ctdb_ban_info b;
-               TDB_DATA data;
-               int ret;
-               
-               DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
-
-               b.pnn = pnn;
-               b.ban_time = ban_time;
-
-               data.dptr = (uint8_t *)&b;
-               data.dsize = sizeof(b);
-
-               ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
-                       return;
-               }
+       bantime.pnn  = pnn;
+       bantime.time = ban_time;
 
+       ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
                return;
        }
 
-       DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
-       ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
-
-       /* banning ourselves - lower our election priority */
-       rec->priority_time = timeval_current();
-
-       /* make sure we remember we are banned in case there is an 
-          election */
-       rec->node_flags |= NODE_FLAGS_BANNED;
-
-       if (rec->banned_nodes[pnn] != NULL) {
-               DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
-               talloc_free(rec->banned_nodes[pnn]);
-               rec->banned_nodes[pnn] = NULL;
-       }
-
-       rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
-
-       rec->banned_nodes[pnn]->rec = rec;
-       rec->banned_nodes[pnn]->banned_node = pnn;
-
-       if (ban_time != 0) {
-               event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
-                               timeval_current_ofs(ban_time, 0),
-                               ctdb_ban_timeout, rec->banned_nodes[pnn]);
-       }
 }
 
 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
@@ -222,7 +114,7 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -239,39 +131,44 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
 /*
   remember the trouble maker
  */
-static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
+static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
+       struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
+       struct ctdb_banning_state *ban_state;
+
+       if (culprit > ctdb->num_nodes) {
+               DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
+               return;
+       }
+
+       if (ctdb->nodes[culprit]->ban_state == NULL) {
+               ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
+               CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
 
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
-               /* either a new node is the culprit, or we've decided to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
+               
+       }
+       ban_state = ctdb->nodes[culprit]->ban_state;
+       if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
+               /* this was the first time in a long while this node
+                  misbehaved so we will forgive any old transgressions.
+               */
+               ban_state->count = 0;
        }
-       rec->culprit_counter++;
+
+       ban_state->count += count;
+       ban_state->last_reported_time = timeval_current();
+       rec->last_culprit_node = culprit;
 }
 
 /*
   remember the trouble maker
  */
-static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
+static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
 {
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       if (rec->last_culprit != culprit ||
-           timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
-               DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
-               /* either a new node is the culprit, or we've decided to forgive them */
-               rec->last_culprit = culprit;
-               rec->first_recover_time = timeval_current();
-               rec->culprit_counter = 0;
-       }
-       rec->culprit_counter += count;
+       ctdb_set_culprit_count(rec, culprit, 1);
 }
 
+
 /* this callback is called for every node that failed to execute the
    start recovery event
 */
@@ -298,7 +195,7 @@ static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        NULL,
                                        startrecovery_fail_callback,
@@ -315,7 +212,7 @@ static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_
 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
 {
        if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
-               DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
+               DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
                return;
        }
        if (node_pnn < ctdb->num_nodes) {
@@ -336,7 +233,8 @@ static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
-                                       nodes, CONTROL_TIMEOUT(),
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
                                        false, tdb_null,
                                        async_getcap_callback, NULL,
                                        NULL) != 0) {
@@ -349,10 +247,26 @@ static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *
        return 0;
 }
 
+static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
+       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
+}
+
+static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
+       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
+}
+
 /*
   change recovery mode on all nodes
  */
-static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
+static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
 {
        TDB_DATA data;
        uint32_t *nodes;
@@ -364,14 +278,20 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        /* freeze all nodes */
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (rec_mode == CTDB_RECOVERY_ACTIVE) {
-               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
-                                               nodes, CONTROL_TIMEOUT(),
+               int i;
+
+               for (i=1; i<=NUM_DB_PRIORITIES; i++) {
+                       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
+                                               nodes, i,
+                                               CONTROL_TIMEOUT(),
                                                false, tdb_null,
-                                               NULL, NULL,
-                                               NULL) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
+                                               NULL,
+                                               set_recmode_fail_callback,
+                                               rec) != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
+                               talloc_free(tmp_ctx);
+                               return -1;
+                       }
                }
        }
 
@@ -380,7 +300,8 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        data.dptr = (unsigned char *)&rec_mode;
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
-                                       nodes, CONTROL_TIMEOUT(),
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
                                        false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -410,7 +331,7 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -423,6 +344,50 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
        return 0;
 }
 
+/* update all remote nodes to use the same db priority that we have
+   this can fail if the remove node has not yet been upgraded to 
+   support this function, so we always return success and never fail
+   a recovery if this call fails.
+*/
+static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
+       struct ctdb_node_map *nodemap, 
+       uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+{
+       int db;
+       uint32_t *nodes;
+
+       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
+
+       /* step through all local databases */
+       for (db=0; db<dbmap->num;db++) {
+               TDB_DATA data;
+               struct ctdb_db_priority db_prio;
+               int ret;
+
+               db_prio.db_id     = dbmap->dbs[db].dbid;
+               ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
+                       continue;
+               }
+
+               DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
+
+               data.dptr  = (uint8_t *)&db_prio;
+               data.dsize = sizeof(db_prio);
+
+               if (ctdb_client_async_control(ctdb,
+                                       CTDB_CONTROL_SET_DB_PRIORITY,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, data,
+                                       NULL, NULL,
+                                       NULL) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
+               }
+       }
+
+       return 0;
+}                      
 
 /*
   ensure all other nodes have attached to any databases that we have
@@ -557,7 +522,8 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
   pull the remote database contents from one node into the recdb
  */
 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
-                                   struct tdb_wrap *recdb, uint32_t dbid)
+                                   struct tdb_wrap *recdb, uint32_t dbid,
+                                   bool persistent)
 {
        int ret;
        TDB_DATA outdata;
@@ -642,7 +608,8 @@ static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
 static int pull_remote_database(struct ctdb_context *ctdb,
                                struct ctdb_recoverd *rec, 
                                struct ctdb_node_map *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid)
+                               struct tdb_wrap *recdb, uint32_t dbid,
+                               bool persistent)
 {
        int j;
 
@@ -654,7 +621,7 @@ static int pull_remote_database(struct ctdb_context *ctdb,
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
+               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
                                 nodemap->nodes[j].pnn));
                        ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
@@ -708,62 +675,6 @@ static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_nod
 }
 
 
-/*
-  handler for when the admin bans a node
-*/
-static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                       TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-
-       if (data.dsize != sizeof(*b)) {
-               DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
-               talloc_free(mem_ctx);
-               return;
-       }
-
-       if (b->pnn != ctdb->pnn) {
-               DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
-               return;
-       }
-
-       DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
-                b->pnn, b->ban_time));
-
-       ctdb_ban_node(rec, b->pnn, b->ban_time);
-       talloc_free(mem_ctx);
-}
-
-/*
-  handler for when the admin unbans a node
-*/
-static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                         TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       TALLOC_CTX *mem_ctx = talloc_new(ctdb);
-       uint32_t pnn;
-
-       if (data.dsize != sizeof(uint32_t)) {
-               DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
-               talloc_free(mem_ctx);
-               return;
-       }
-       pnn = *(uint32_t *)data.dptr;
-
-       if (pnn != ctdb->pnn) {
-               DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
-               return;
-       }
-
-       DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
-       ctdb_unban_node(rec, pnn);
-       talloc_free(mem_ctx);
-}
-
-
 struct vacuum_info {
        struct vacuum_info *next, *prev;
        struct ctdb_recoverd *rec;
@@ -976,10 +887,11 @@ static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te,
 /*
   wait for a given number of seconds
  */
-static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
+static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
 {
        uint32_t timed_out = 0;
-       event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
+       time_t usecs = (secs - (time_t)secs) * 1000000;
+       event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
        while (!timed_out) {
                event_loop_once(ctdb->ev);
        }
@@ -993,6 +905,9 @@ static void ctdb_election_timeout(struct event_context *ev, struct timed_event *
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
        rec->election_timeout = NULL;
+       fast_start = false;
+
+       DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
 }
 
 
@@ -1042,15 +957,6 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                        return MONITOR_FAILED;
                }
                if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
-                       int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
-
-                       if (ban_changed) {
-                               DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
-                               nodemap->nodes[j].pnn,
-                               remote_nodemap->nodes[j].flags,
-                               nodemap->nodes[j].flags));
-                       }
-
                        /* We should tell our daemon about this so it
                           updates its flags or else we will log the same 
                           message again in the next iteration of recovery.
@@ -1070,15 +976,6 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                                 nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
                                 nodemap->nodes[j].flags));
                        nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
-
-                       /* If the BANNED flag has changed for the node
-                          this is a good reason to do a new election.
-                        */
-                       if (ban_changed) {
-                               talloc_free(mem_ctx);
-                               return MONITOR_ELECTION_NEEDED;
-                       }
-
                }
                talloc_free(remote_nodemap);
        }
@@ -1116,16 +1013,19 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
        unsigned tdb_flags;
 
        /* open up the temporary recovery database */
-       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
+       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
+                              ctdb->db_directory_state,
+                              ctdb->pnn);
        if (name == NULL) {
                return NULL;
        }
        unlink(name);
 
        tdb_flags = TDB_NOLOCK;
-       if (!ctdb->do_setsched) {
+       if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
+       tdb_flags |= TDB_DISALLOW_NESTING;
 
        recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
                              tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
@@ -1147,6 +1047,7 @@ struct recdb_data {
        struct ctdb_marshall_buffer *recdata;
        uint32_t len;
        bool failed;
+       bool persistent;
 };
 
 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
@@ -1162,7 +1063,9 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
        /* update the dmaster field to point to us */
        hdr = (struct ctdb_ltdb_header *)data.dptr;
-       hdr->dmaster = params->ctdb->pnn;
+       if (!params->persistent) {
+               hdr->dmaster = params->ctdb->pnn;
+       }
 
        /* add the record to the blob ready to send to the nodes */
        rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
@@ -1189,6 +1092,7 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
   push the recdb database out to all nodes
  */
 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
+                              bool persistent,
                               struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
 {
        struct recdb_data params;
@@ -1209,6 +1113,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
        params.recdata = recdata;
        params.len = offsetof(struct ctdb_marshall_buffer, data);
        params.failed = false;
+       params.persistent = persistent;
 
        if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
@@ -1231,7 +1136,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
 
        nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, outdata,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1257,6 +1162,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
 static int recover_database(struct ctdb_recoverd *rec, 
                            TALLOC_CTX *mem_ctx,
                            uint32_t dbid,
+                           bool persistent,
                            uint32_t pnn, 
                            struct ctdb_node_map *nodemap,
                            uint32_t transaction_id)
@@ -1274,7 +1180,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        }
 
        /* pull all remote databases onto the recdb */
-       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
+       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
                return -1;
@@ -1291,7 +1197,7 @@ static int recover_database(struct ctdb_recoverd *rec,
 
        nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1302,7 +1208,7 @@ static int recover_database(struct ctdb_recoverd *rec,
        
        /* push out the correct database. This sets the dmaster and skips 
           the empty records */
-       ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
+       ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
        if (ret != 0) {
                talloc_free(recdb);
                return -1;
@@ -1323,14 +1229,124 @@ static void reload_nodes_file(struct ctdb_context *ctdb)
        ctdb_load_nodes_file(ctdb);
 }
 
-       
+static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
+                                        struct ctdb_recoverd *rec,
+                                        struct ctdb_node_map *nodemap,
+                                        uint32_t *culprit)
+{
+       int j;
+       int ret;
+
+       if (ctdb->num_nodes != nodemap->num) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
+                                 ctdb->num_nodes, nodemap->num));
+               if (culprit) {
+                       *culprit = ctdb->pnn;
+               }
+               return -1;
+       }
+
+       for (j=0; j<nodemap->num; j++) {
+               /* release any existing data */
+               if (ctdb->nodes[j]->known_public_ips) {
+                       talloc_free(ctdb->nodes[j]->known_public_ips);
+                       ctdb->nodes[j]->known_public_ips = NULL;
+               }
+               if (ctdb->nodes[j]->available_public_ips) {
+                       talloc_free(ctdb->nodes[j]->available_public_ips);
+                       ctdb->nodes[j]->available_public_ips = NULL;
+               }
+
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+
+               /* grab a new shiny list of public ips from the node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb,
+                                       CONTROL_TIMEOUT(),
+                                       ctdb->nodes[j]->pnn,
+                                       ctdb->nodes,
+                                       0,
+                                       &ctdb->nodes[j]->known_public_ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
+                               ctdb->nodes[j]->pnn));
+                       if (culprit) {
+                               *culprit = ctdb->nodes[j]->pnn;
+                       }
+                       return -1;
+               }
+
+               if (ctdb->tunable.disable_ip_failover == 0) {
+                       if (rec->ip_check_disable_ctx == NULL) {
+                               if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
+                                       DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
+                                       rec->need_takeover_run = true;
+                               }
+                       }
+               }
+
+               /* grab a new shiny list of public ips from the node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb,
+                                       CONTROL_TIMEOUT(),
+                                       ctdb->nodes[j]->pnn,
+                                       ctdb->nodes,
+                                       CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
+                                       &ctdb->nodes[j]->available_public_ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
+                               ctdb->nodes[j]->pnn));
+                       if (culprit) {
+                               *culprit = ctdb->nodes[j]->pnn;
+                       }
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+/* when we start a recovery, make sure all nodes use the same reclock file
+   setting
+*/
+static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       TDB_DATA data;
+       uint32_t *nodes;
+
+       if (ctdb->recovery_lock_file == NULL) {
+               data.dptr  = NULL;
+               data.dsize = 0;
+       } else {
+               data.dsize = strlen(ctdb->recovery_lock_file) + 1;
+               data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
+       }
+
+       nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(),
+                                       false, data,
+                                       NULL, NULL,
+                                       rec) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
 static int do_recovery(struct ctdb_recoverd *rec, 
                       TALLOC_CTX *mem_ctx, uint32_t pnn,
-                      struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
-                      int32_t culprit)
+                      struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
 {
        struct ctdb_context *ctdb = rec->ctdb;
        int i, j, ret;
@@ -1339,21 +1355,28 @@ static int do_recovery(struct ctdb_recoverd *rec,
        TDB_DATA data;
        uint32_t *nodes;
        struct timeval start_time;
+       uint32_t culprit = (uint32_t)-1;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
        /* if recovery fails, force it again */
        rec->need_recovery = true;
 
-       if (culprit != -1) {
-               ctdb_set_culprit(rec, culprit);
-       }
+       for (i=0; i<ctdb->num_nodes; i++) {
+               struct ctdb_banning_state *ban_state;
 
-       if (rec->culprit_counter > 2*nodemap->num) {
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
-                        rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
-                        ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 2*ctdb->num_nodes) {
+                       continue;
+               }
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
        }
 
 
@@ -1366,10 +1389,10 @@ static int do_recovery(struct ctdb_recoverd *rec,
                        return -1;
                }
                ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
-               DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
+               DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
        }
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
 
        /* get a list of all databases */
        ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
@@ -1394,12 +1417,23 @@ static int do_recovery(struct ctdb_recoverd *rec,
                DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
                return -1;
        }
-
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
 
+       /* update the database priority for all remote databases */
+       ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
+       }
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
+
+
+       /* update all other nodes to use the same setting for reclock files
+          as the local recovery master.
+       */
+       sync_recovery_lock_file_across_cluster(rec);
 
        /* set recovery mode to active on all nodes */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return -1;
@@ -1412,6 +1446,23 @@ static int do_recovery(struct ctdb_recoverd *rec,
                return -1;
        }
 
+       /*
+         update all nodes to have the same flags that we have
+        */
+       for (i=0;i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                       return -1;
+               }
+       }
+
+       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
+
        /* pick a new generation number */
        generation = new_generation();
 
@@ -1437,18 +1488,31 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
+                                       NULL,
+                                       transaction_start_fail_callback,
+                                       rec) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
+               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL,
+                                       NULL,
+                                       NULL) != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
+               }
                return -1;
        }
 
        DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
 
        for (i=0;i<dbmap->num;i++) {
-               if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
+               ret = recover_database(rec, mem_ctx,
+                                      dbmap->dbs[i].dbid,
+                                      dbmap->dbs[i].persistent,
+                                      pnn, nodemap, generation);
+               if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
                        return -1;
                }
@@ -1458,7 +1522,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        /* commit all the changes */
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, data,
                                        NULL, NULL,
                                        NULL) != 0) {
@@ -1545,7 +1609,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
        /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
                return -1;
@@ -1556,11 +1620,18 @@ static int do_recovery(struct ctdb_recoverd *rec,
        /*
          tell nodes to takeover their public IPs
         */
+       ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                culprit));
+               rec->need_takeover_run = true;
+               return -1;
+       }
        rec->need_takeover_run = false;
        ret = ctdb_takeover_run(ctdb, nodemap);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
-               return -1;
+               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
+               rec->need_takeover_run = true;
        }
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
 
@@ -1575,19 +1646,40 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        /* send a message to all clients telling them that the cluster 
           has been reconfigured */
-       ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
+       ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
 
        rec->need_recovery = false;
 
+       /* we managed to complete a full recovery, make sure to forgive
+          any past sins by the nodes that could now participate in the
+          recovery.
+       */
+       DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
+       for (i=0;i<nodemap->num;i++) {
+               struct ctdb_banning_state *ban_state;
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
+               if (ban_state == NULL) {
+                       continue;
+               }
+
+               ban_state->count = 0;
+       }
+
+
        /* We just finished a recovery successfully. 
           We now wait for rerecovery_timeout before we allow 
           another recovery to take place.
        */
-       DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
+       DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
        ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
-       DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
+       DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
 
        return 0;
 }
@@ -1617,7 +1709,6 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
 
        em->pnn = rec->ctdb->pnn;
        em->priority_time = rec->priority_time;
-       em->node_flags = rec->node_flags;
 
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
        if (ret != 0) {
@@ -1625,6 +1716,9 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
                return;
        }
 
+       rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
+       em->node_flags = rec->node_flags;
+
        for (i=0;i<nodemap->num;i++) {
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
                        em->num_connected++;
@@ -1712,7 +1806,8 @@ static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool u
 
 
        /* send an election message to all active nodes */
-       ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
+       DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
+       ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
 
 
        /* A new node that is already frozen has entered the cluster.
@@ -1808,31 +1903,96 @@ static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));           
-
-       ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
-               talloc_free(tmp_ctx);
+DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));           
+
+       ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       talloc_free(tmp_ctx);
+}
+
+/*
+  handler for reload_nodes
+*/
+static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
+
+       reload_nodes_file(rec->ctdb);
+}
+
+
+static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
+                             struct timeval yt, void *p)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
+
+       talloc_free(rec->ip_check_disable_ctx);
+       rec->ip_check_disable_ctx = NULL;
+}
+
+
+static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       struct ctdb_public_ip *ip;
+
+       if (rec->recmaster != rec->ctdb->pnn) {
+               DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
+               return;
+       }
+
+       if (data.dsize != sizeof(struct ctdb_public_ip)) {
+               DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
                return;
        }
 
-       talloc_free(tmp_ctx);
+       ip = (struct ctdb_public_ip *)data.dptr;
+
+       update_ip_assignment_tree(rec->ctdb, ip);
 }
 
-/*
-  handler for reload_nodes
-*/
-static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+
+static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
                             TDB_DATA data, void *private_data)
 {
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       uint32_t timeout;
 
-       DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
+       if (rec->ip_check_disable_ctx != NULL) {
+               talloc_free(rec->ip_check_disable_ctx);
+               rec->ip_check_disable_ctx = NULL;
+       }
 
-       reload_nodes_file(rec->ctdb);
+       if (data.dsize != sizeof(uint32_t)) {
+               DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
+                                "expexting %lu\n", (long unsigned)data.dsize,
+                                (long unsigned)sizeof(uint32_t)));
+               return;
+       }
+       if (data.dptr == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
+               return;
+       }
+
+       timeout = *((uint32_t *)data.dptr);
+       DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
+
+       rec->ip_check_disable_ctx = talloc_new(rec);
+       CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
+
+       event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
 }
 
+
 /*
   handler for ip reallocate, just add it to the list of callers and 
   handle this later in the monitor_cluster loop so we do not recurse
@@ -1851,7 +2011,7 @@ static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        if (rec->ip_reallocate_ctx == NULL) {
                rec->ip_reallocate_ctx = talloc_new(rec);
-               CTDB_NO_MEMORY_FATAL(ctdb, caller);
+               CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
        }
 
        caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
@@ -1870,17 +2030,45 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb
        TDB_DATA result;
        int32_t ret;
        struct ip_reallocate_list *callers;
+       uint32_t culprit;
 
        DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
-       ret = ctdb_takeover_run(ctdb, rec->nodemap);
+
+       /* update the list of public ips that a node can handle for
+          all connected nodes
+       */
+       ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                culprit));
+               rec->need_takeover_run = true;
+       }
+       if (ret == 0) {
+               ret = ctdb_takeover_run(ctdb, rec->nodemap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
+                       rec->need_takeover_run = true;
+               }
+       }
+
        result.dsize = sizeof(int32_t);
        result.dptr  = (uint8_t *)&ret;
 
        for (callers=rec->reallocate_callers; callers; callers=callers->next) {
-               DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
-               ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
+
+               /* Someone that sent srvid==0 does not want a reply */
+               if (callers->rd->srvid == 0) {
+                       continue;
+               }
+               DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
+                                 "%u:%llu\n", (unsigned)callers->rd->pnn,
+                                 (unsigned long long)callers->rd->srvid));
+               ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
+                       DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
+                                        "message to %u:%llu\n",
+                                        (unsigned)callers->rd->pnn,
+                                        (unsigned long long)callers->rd->srvid));
                }
        }
 
@@ -1906,6 +2094,8 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
        /* we got an election packet - update the timeout for the election */
        talloc_free(rec->election_timeout);
        rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
+                                               fast_start ?
+                                               timeval_current_ofs(0, 500000) :
                                                timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
                                                ctdb_election_timeout, rec);
 
@@ -1948,12 +2138,6 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-       /* release any bans */
-       rec->last_culprit = (uint32_t)-1;
-       talloc_free(rec->banned_nodes);
-       rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
-
        talloc_free(mem_ctx);
        return;
 }
@@ -1968,8 +2152,10 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
        int ret;
        struct ctdb_context *ctdb = rec->ctdb;
 
+       DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
+
        /* set all nodes to recovery mode to stop all internode traffic */
-       ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
                return;
@@ -1977,6 +2163,8 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
 
        talloc_free(rec->election_timeout);
        rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
+                                               fast_start ?
+                                               timeval_current_ofs(0, 500000) :
                                                timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
                                                ctdb_election_timeout, rec);
 
@@ -2005,6 +2193,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
        uint32_t changed_flags;
        int i;
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       int disabled_flag_changed;
 
        if (data.dsize != sizeof(*c)) {
                DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
@@ -2038,6 +2227,8 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
+       disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
+
        nodemap->nodes[i].flags = c->new_flags;
 
        ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
@@ -2058,7 +2249,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                   lead to an ip address failover but that is handled 
                   during recovery
                */
-               if (changed_flags & NODE_FLAGS_DISABLED) {
+               if (disabled_flag_changed) {
                        rec->need_takeover_run = true;
                }
        }
@@ -2074,11 +2265,47 @@ static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid,
 {
        int ret;
        struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+       struct ctdb_node_map *nodemap=NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t recmaster;
+       uint32_t *nodes;
 
-       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
+       /* find the recovery master */
+       ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       /* read the node flags from the recmaster */
+       ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       if (c->pnn >= nodemap->num) {
+               DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       /* send the flags update to all connected nodes */
+       nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
+
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
+                                     nodes, 0, CONTROL_TIMEOUT(),
+                                     false, data,
+                                     NULL, NULL,
+                                     NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
+
+               talloc_free(tmp_ctx);
+               return;
        }
+
+       talloc_free(tmp_ctx);
 }
 
 
@@ -2266,15 +2493,18 @@ static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ct
 }
 
 
-/* called to check that the allocation of public ip addresses is ok.
+/* called to check that the local allocation of public ip addresses is ok.
 */
-static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
+static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
+       struct ctdb_control_get_ifaces *ifaces = NULL;
        struct ctdb_all_public_ips *ips = NULL;
        struct ctdb_uptime *uptime1 = NULL;
        struct ctdb_uptime *uptime2 = NULL;
        int ret, j;
+       bool need_iface_check = false;
+       bool need_takeover_run = false;
 
        ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
                                CTDB_CURRENT_NODE, &uptime1);
@@ -2284,6 +2514,30 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
                return -1;
        }
 
+
+       /* read the interfaces from the local node */
+       ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
+               talloc_free(mem_ctx);
+               return -1;
+       }
+
+       if (!rec->ifaces) {
+               need_iface_check = true;
+       } else if (rec->ifaces->num != ifaces->num) {
+               need_iface_check = true;
+       } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
+               need_iface_check = true;
+       }
+
+       if (need_iface_check) {
+               DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
+                                    "local node %u - force takeover run\n",
+                                    pnn));
+               need_takeover_run = true;
+       }
+
        /* read the ip allocation from the local node */
        ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
        if (ret != 0) {
@@ -2319,61 +2573,61 @@ static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
        /* skip the check if we have started but not finished recovery */
        if (timeval_compare(&uptime1->last_recovery_finished,
                            &uptime1->last_recovery_started) != 1) {
-               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
+               DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
                talloc_free(mem_ctx);
 
                return 0;
        }
 
+       talloc_free(rec->ifaces);
+       rec->ifaces = talloc_steal(rec, ifaces);
+
        /* verify that we have the ip addresses we should have
           and we dont have ones we shouldnt have.
           if we find an inconsistency we set recmode to
           active on the local node and wait for the recmaster
-          to do a full blown recovery
+          to do a full blown recovery.
+          also if the pnn is -1 and we are healthy and can host the ip
+          we also request a ip reallocation.
        */
-       for (j=0; j<ips->num; j++) {
-               if (ips->ips[j].pnn == pnn) {
-                       if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
-                               DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
-                                       ctdb_addr_to_str(&ips->ips[j].addr)));
-                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
-
-                                       talloc_free(mem_ctx);
-                                       return -1;
+       if (ctdb->tunable.disable_ip_failover == 0) {
+               for (j=0; j<ips->num; j++) {
+                       if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
+                               DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
+                                               ctdb_addr_to_str(&ips->ips[j].addr)));
+                               need_takeover_run = true;
+                       } else if (ips->ips[j].pnn == pnn) {
+                               if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                                       DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
+                                               ctdb_addr_to_str(&ips->ips[j].addr)));
+                                       need_takeover_run = true;
                                }
-                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
-
-                                       talloc_free(mem_ctx);
-                                       return -1;
+                       } else {
+                               if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
+                                       DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
+                                               ctdb_addr_to_str(&ips->ips[j].addr)));
+                                       need_takeover_run = true;
                                }
                        }
-               } else {
-                       if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
-                               DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
-                                       ctdb_addr_to_str(&ips->ips[j].addr)));
+               }
+       }
 
-                               ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
+       if (need_takeover_run) {
+               struct takeover_run_reply rd;
+               TDB_DATA data;
 
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                               ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
+               DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
 
-                                       talloc_free(mem_ctx);
-                                       return -1;
-                               }
-                       }
+               rd.pnn = ctdb->pnn;
+               rd.srvid = 0;
+               data.dptr = (uint8_t *)&rd;
+               data.dsize = sizeof(rd);
+
+               ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
                }
        }
-
        talloc_free(mem_ctx);
        return 0;
 }
@@ -2400,7 +2654,7 @@ static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
 
        nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
-                                       nodes,
+                                       nodes, 0,
                                        CONTROL_TIMEOUT(), false, tdb_null,
                                        async_getnodemap_callback,
                                        NULL,
@@ -2514,7 +2768,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                return -1;
        }
 
-       state->child = fork();
+       state->child = ctdb_fork(ctdb);
        if (state->child == (pid_t)-1) {
                DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
                close(state->fd[0]);
@@ -2530,6 +2784,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                close(state->fd[0]);
                state->fd[0] = -1;
 
+               debug_extra = talloc_asprintf(NULL, "recovery-lock:");
                if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
                        DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
                        cc = RECLOCK_FAILED;
@@ -2545,6 +2800,9 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
        }
        close(state->fd[1]);
        state->fd[1] = -1;
+       set_close_on_exec(state->fd[0]);
+
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
 
        talloc_set_destructor(state, check_reclock_destructor);
 
@@ -2557,7 +2815,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
        }
 
        state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
-                               EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+                               EVENT_FD_READ,
                                reclock_child_handler,
                                (void *)state);
 
@@ -2566,6 +2824,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                talloc_free(state);
                return -1;
        }
+       tevent_fd_set_auto_close(state->fde);
 
        while (state->status == RECLOCK_CHECKING) {
                event_loop_once(ctdb->ev);
@@ -2636,14 +2895,11 @@ static int update_recovery_lock_file(struct ctdb_context *ctdb)
        talloc_free(tmp_ctx);
        return 0;
 }
-               
-/*
-  the main monitoring loop
- */
-static void monitor_cluster(struct ctdb_context *ctdb)
+
+static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
+                     TALLOC_CTX *mem_ctx)
 {
        uint32_t pnn;
-       TALLOC_CTX *mem_ctx=NULL;
        struct ctdb_node_map *nodemap=NULL;
        struct ctdb_node_map *recmaster_nodemap=NULL;
        struct ctdb_node_map **remote_nodemaps=NULL;
@@ -2651,59 +2907,8 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        struct ctdb_vnn_map *remote_vnnmap=NULL;
        int32_t debug_level;
        int i, j, ret;
-       struct ctdb_recoverd *rec;
-
-       DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
-
-       rec = talloc_zero(ctdb, struct ctdb_recoverd);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec);
-
-       rec->ctdb = ctdb;
-       rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
-       CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
-
-       rec->priority_time = timeval_current();
-
-       /* register a message port for sending memory dumps */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
-
-       /* register a message port for recovery elections */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
-
-       /* when nodes are disabled/enabled */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
-
-       /* when we are asked to puch out a flag change */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
-
-       /* when nodes are banned */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
-
-       /* and one for when nodes are unbanned */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
-
-       /* register a message port for vacuum fetch */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
-
-       /* register a message port for reloadnodes  */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
-
-       /* register a message port for performing a takeover run */
-       ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
 
-again:
-       if (mem_ctx) {
-               talloc_free(mem_ctx);
-               mem_ctx = NULL;
-       }
-       mem_ctx = talloc_new(ctdb);
-       if (!mem_ctx) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
-               exit(-1);
-       }
 
-       /* we only check for recovery once every second */
-       ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
 
        /* verify that the main daemon is still running */
        if (kill(ctdb->ctdbd_pid, 0) != 0) {
@@ -2716,14 +2921,14 @@ again:
 
        if (rec->election_timeout) {
                /* an election is in progress */
-               goto again;
+               return;
        }
 
        /* read the debug level from the parent and update locally */
        ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
        if (ret !=0) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
-               goto again;
+               return;
        }
        LogLevel = debug_level;
 
@@ -2732,24 +2937,34 @@ again:
           as early as possible so we dont wait until we have pulled the node
           map from the local node. thats why we have the hardcoded value 20
        */
-       if (rec->culprit_counter > 20) {
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
-                        rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
-                        ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
+       for (i=0; i<ctdb->num_nodes; i++) {
+               struct ctdb_banning_state *ban_state;
+
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 20) {
+                       continue;
+               }
+               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
        }
 
        /* get relevant tunables */
        ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
-               goto again;
+               return;
        }
 
        /* get the current recovery lock file from the server */
        if (update_recovery_lock_file(ctdb) != 0) {
                DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
-               goto again;
+               return;
        }
 
        /* Make sure that if recovery lock verification becomes disabled when
@@ -2765,14 +2980,14 @@ again:
        pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
        if (pnn == (uint32_t)-1) {
                DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
-               goto again;
+               return;
        }
 
        /* get the vnnmap */
        ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
-               goto again;
+               return;
        }
 
 
@@ -2785,7 +3000,7 @@ again:
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
-               goto again;
+               return;
        }
        nodemap = rec->nodemap;
 
@@ -2793,7 +3008,7 @@ again:
        ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
-               goto again;
+               return;
        }
 
        /* if we are not the recmaster we can safely ignore any ip reallocate requests */
@@ -2812,7 +3027,7 @@ again:
        if (rec->recmaster == (uint32_t)-1) {
                DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
 
 
@@ -2827,18 +3042,18 @@ again:
                if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
                        DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
 
-                       ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+                       ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
-                               goto again;
+                               return;
                        }
                        ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
 
-                               goto again;
+                               return;
                        }
-                       goto again;
+                       return;
                }
        }
        /* If the local node is stopped, verify we are not the recmaster 
@@ -2847,40 +3062,13 @@ again:
        if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
                DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
        
        /* check that we (recovery daemon) and the local ctdb daemon
           agrees on whether we are banned or not
        */
-       if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
-               if (rec->banned_nodes[pnn] == NULL) {
-                       if (rec->recmaster == pnn) {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
-
-                               ctdb_unban_node(rec, pnn);
-                       } else {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
-                               ctdb_set_culprit(rec, pnn);
-                       }
-                       goto again;
-               }
-       } else {
-               if (rec->banned_nodes[pnn] != NULL) {
-                       if (rec->recmaster == pnn) {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
-
-                               ctdb_unban_node(rec, pnn);
-                       } else {
-                               DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
-
-                               ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
-                               ctdb_set_culprit(rec, pnn);
-                       }
-                       goto again;
-               }
-       }
+//qqq
 
        /* remember our own node flags */
        rec->node_flags = nodemap->nodes[pnn].flags;
@@ -2908,14 +3096,14 @@ again:
        if (j == nodemap->num) {
                DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
 
        /* if recovery master is disconnected we must elect a new recmaster */
        if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
 
        /* grap the nodemap from the recovery master to check if it is banned */
@@ -2924,24 +3112,25 @@ again:
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
                          nodemap->nodes[j].pnn));
-               goto again;
+               return;
        }
 
 
        if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
 
 
        /* verify that we have all ip addresses we should have and we dont
         * have addresses we shouldnt have.
         */ 
-       if (ctdb->do_checkpublicip) {
-               if (verify_ip_allocation(ctdb, pnn) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
-                       goto again;
+       if (ctdb->tunable.disable_ip_failover == 0) {
+               if (rec->ip_check_disable_ctx == NULL) {
+                       if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
+                               DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
+                       }
                }
        }
 
@@ -2950,7 +3139,7 @@ again:
           if recovery is needed
         */
        if (pnn != rec->recmaster) {
-               goto again;
+               return;
        }
 
 
@@ -2959,74 +3148,49 @@ again:
        if (ret == MONITOR_ELECTION_NEEDED) {
                DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        }
        if (ret != MONITOR_OK) {
                DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
-               goto again;
+               return;
        }
 
-       /* update the list of public ips that a node can handle for
-          all connected nodes
-       */
        if (ctdb->num_nodes != nodemap->num) {
                DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
                reload_nodes_file(ctdb);
-               goto again;
-       }
-       for (j=0; j<nodemap->num; j++) {
-               /* release any existing data */
-               if (ctdb->nodes[j]->public_ips) {
-                       talloc_free(ctdb->nodes[j]->public_ips);
-                       ctdb->nodes[j]->public_ips = NULL;
-               }
-
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               /* grab a new shiny list of public ips from the node */
-               if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
-                       ctdb->nodes[j]->pnn, 
-                       ctdb->nodes,
-                       &ctdb->nodes[j]->public_ips)) {
-                       DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
-                               ctdb->nodes[j]->pnn));
-                       goto again;
-               }
+               return;
        }
 
-
        /* verify that all active nodes agree that we are the recmaster */
        switch (verify_recmaster(rec, nodemap, pnn)) {
        case MONITOR_RECOVERY_NEEDED:
                /* can not happen */
-               goto again;
+               return;
        case MONITOR_ELECTION_NEEDED:
                force_election(rec, pnn, nodemap);
-               goto again;
+               return;
        case MONITOR_OK:
                break;
        case MONITOR_FAILED:
-               goto again;
+               return;
        }
 
 
        if (rec->need_recovery) {
                /* a previous recovery didn't finish */
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
-               goto again;             
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+               return;
        }
 
        /* verify that all active nodes are in normal mode 
           and not in recovery mode 
-        */
+       */
        switch (verify_recmode(ctdb, nodemap)) {
        case MONITOR_RECOVERY_NEEDED:
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
-               goto again;
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+               return;
        case MONITOR_FAILED:
-               goto again;
+               return;
        case MONITOR_ELECTION_NEEDED:
                /* can not happen */
        case MONITOR_OK:
@@ -3039,8 +3203,9 @@ again:
                ret = check_recovery_lock(ctdb);
                if (ret != 0) {
                        DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
        }
 
@@ -3049,14 +3214,14 @@ again:
        remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
        if (remote_nodemaps == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
-               goto again;
+               return;
        }
        for(i=0; i<nodemap->num; i++) {
                remote_nodemaps[i] = NULL;
        }
        if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
-               goto again;
+               return;
        } 
 
        /* verify that all other nodes have the same nodemap as we have
@@ -3070,7 +3235,7 @@ again:
                        DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
                        ctdb_set_culprit(rec, j);
 
-                       goto again;
+                       return;
                }
 
                /* if the nodes disagree on how many nodes there are
@@ -3079,8 +3244,9 @@ again:
                if (remote_nodemaps[j]->num != nodemap->num) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
                                  nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                /* if the nodes disagree on which nodes exist and are
@@ -3091,9 +3257,10 @@ again:
                                DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
                                          nodemap->nodes[j].pnn, i, 
                                          remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
+                               ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
-                               goto again;
+                                           vnnmap);
+                               return;
                        }
                }
 
@@ -3113,15 +3280,17 @@ again:
                                if (i == j) {
                                        DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
                                        update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
+                                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                        do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                                   vnnmap, nodemap->nodes[j].pnn);
-                                       goto again;
+                                                   vnnmap);
+                                       return;
                                } else {
                                        DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
                                        update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
+                                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                        do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                                   vnnmap, nodemap->nodes[j].pnn);
-                                       goto again;
+                                                   vnnmap);
+                                       return;
                                }
                        }
                }
@@ -3134,8 +3303,9 @@ again:
        if (vnnmap->size != rec->num_active) {
                DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
                          vnnmap->size, rec->num_active));
-               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
-               goto again;
+               ctdb_set_culprit(rec, ctdb->pnn);
+               do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+               return;
        }
 
        /* verify that all active nodes in the nodemap also exist in 
@@ -3157,8 +3327,9 @@ again:
                if (i == vnnmap->size) {
                        DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
                                  nodemap->nodes[j].pnn));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
        }
 
@@ -3179,23 +3350,25 @@ again:
                if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
                                  nodemap->nodes[j].pnn));
-                       goto again;
+                       return;
                }
 
                /* verify the vnnmap generation is the same */
                if (vnnmap->generation != remote_vnnmap->generation) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                /* verify the vnnmap size is the same */
                if (vnnmap->size != remote_vnnmap->size) {
                        DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
                                  nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
-                       goto again;
+                       ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                /* verify the vnnmap is the same */
@@ -3203,30 +3376,44 @@ again:
                        if (remote_vnnmap->map[i] != vnnmap->map[i]) {
                                DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
                                          nodemap->nodes[j].pnn));
+                               ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
                                do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                           vnnmap, nodemap->nodes[j].pnn);
-                               goto again;
+                                           vnnmap);
+                               return;
                        }
                }
        }
 
        /* we might need to change who has what IP assigned */
        if (rec->need_takeover_run) {
+               uint32_t culprit = (uint32_t)-1;
+
                rec->need_takeover_run = false;
 
+               /* update the list of public ips that a node can handle for
+                  all connected nodes
+               */
+               ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
+                                        culprit));
+                       rec->need_takeover_run = true;
+                       return;
+               }
+
                /* execute the "startrecovery" event script on all nodes */
                ret = run_startrecovery_eventscript(rec, nodemap);
                if (ret!=0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
+                       return;
                }
 
                ret = ctdb_takeover_run(ctdb, nodemap);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
+                       return;
                }
 
                /* execute the "recovered" event script on all nodes */
@@ -3238,15 +3425,78 @@ again:
 // cascading recovery.
                if (ret!=0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
-                       do_recovery(rec, mem_ctx, pnn, nodemap, 
-                                   vnnmap, ctdb->pnn);
+                       ctdb_set_culprit(rec, ctdb->pnn);
+                       do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                }
 #endif
        }
+}
+
+/*
+  the main monitoring loop
+ */
+static void monitor_cluster(struct ctdb_context *ctdb)
+{
+       struct ctdb_recoverd *rec;
+
+       DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
+
+       rec = talloc_zero(ctdb, struct ctdb_recoverd);
+       CTDB_NO_MEMORY_FATAL(ctdb, rec);
+
+       rec->ctdb = ctdb;
+
+       rec->priority_time = timeval_current();
+
+       /* register a message port for sending memory dumps */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
+
+       /* register a message port for recovery elections */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
+
+       /* when nodes are disabled/enabled */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
 
+       /* when we are asked to puch out a flag change */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
+
+       /* register a message port for vacuum fetch */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
+
+       /* register a message port for reloadnodes  */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
+
+       /* register a message port for performing a takeover run */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
+
+       /* register a message port for disabling the ip check for a short while */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
 
-       goto again;
+       /* register a message port for updating the recovery daemons node assignment for an ip */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
 
+       for (;;) {
+               TALLOC_CTX *mem_ctx = talloc_new(ctdb);
+               struct timeval start;
+               double elapsed;
+
+               if (!mem_ctx) {
+                       DEBUG(DEBUG_CRIT,(__location__
+                                         " Failed to create temp context\n"));
+                       exit(-1);
+               }
+
+               start = timeval_current();
+               main_loop(ctdb, rec, mem_ctx);
+               talloc_free(mem_ctx);
+
+               /* we only check for recovery once every second */
+               elapsed = timeval_elapsed(&start);
+               if (elapsed < ctdb->tunable.recover_interval) {
+                       ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
+                                         - elapsed);
+               }
+       }
 }
 
 /*
@@ -3277,7 +3527,7 @@ static void ctdb_check_recd(struct event_context *ev, struct timed_event *te,
                if (ctdb->methods != NULL) {
                        ctdb->methods->shutdown(ctdb);
                }
-               ctdb_event_script(ctdb, "shutdown");
+               ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
 
                exit(10);       
        }
@@ -3317,6 +3567,7 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 {
        int fd[2];
        struct signal_event *se;
+       struct tevent_fd *fde;
 
        if (pipe(fd) != 0) {
                return -1;
@@ -3341,13 +3592,16 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        srandom(getpid() ^ time(NULL));
 
-       if (switch_from_server_to_client(ctdb) != 0) {
+       if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
                DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
                exit(1);
        }
 
-       event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
+
+       fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
                     ctdb_recoverd_parent, &fd[0]);     
+       tevent_fd_set_auto_close(fde);
 
        /* set up a handler to pick up sigchld */
        se = event_add_signal(ctdb->ev, ctdb,