ctdb-daemon: Rename struct ctdb_reply_call to ctdb_reply_call_old
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_call.c
index fe83bc158afdeafa433bd1f5a081212dec2ad00f..62381b85fe550a7743111000eadf2fc843b6ff29 100644 (file)
   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
   protocol design and packet details
 */
-#include "includes.h"
-#include "tdb.h"
-#include "lib/util/dlinklist.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+#include "ctdb_logging.h"
+
+#include "common/rb_tree.h"
+#include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
 
 struct ctdb_sticky_record {
        struct ctdb_context *ctdb;
@@ -120,7 +132,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
                                    struct ctdb_db_context *ctdb_db,
                                    TDB_DATA key,
-                                   struct ctdb_req_call *c, 
+                                   struct ctdb_req_call_old *c, 
                                    struct ctdb_ltdb_header *header)
 {
        uint32_t lmaster = ctdb_lmaster(ctdb, &key);
@@ -131,12 +143,12 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
        }
        c->hopcount++;
 
-       if (c->hopcount%100 == 99) {
-               DEBUG(DEBUG_WARNING,("High hopcount %d dbid:0x%08x "
-                       "key:0x%08x pnn:%d src:%d lmaster:%d "
+       if (c->hopcount%100 > 95) {
+               DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
+                       "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
                        "header->dmaster:%d dst:%d\n",
-                       c->hopcount, ctdb_db->db_id, ctdb_hash(&key),
-                       ctdb->pnn, c->hdr.srcnode, lmaster,
+                       c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
+                       c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
                        header->dmaster, c->hdr.destnode));
        }
 
@@ -190,6 +202,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
 
        r->hdr.destnode  = new_dmaster;
        r->hdr.reqid     = reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->rsn           = header->rsn;
        r->keylen        = key.dsize;
        r->datalen       = data.dsize;
@@ -211,7 +224,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
   CTDB_REPLY_DMASTER to the new dmaster
 */
 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
-                                  struct ctdb_req_call *c, 
+                                  struct ctdb_req_call_old *c, 
                                   struct ctdb_ltdb_header *header,
                                   TDB_DATA *key, TDB_DATA *data)
 {
@@ -242,6 +255,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = lmaster;
        r->hdr.reqid     = c->hdr.reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->db_id         = c->db_id;
        r->rsn           = header->rsn;
        r->dmaster       = c->hdr.srcnode;
@@ -261,8 +275,9 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        talloc_free(r);
 }
 
-static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
-                                      struct timeval t, void *private_data)
+static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
+                                       struct tevent_timer *te,
+                                       struct timeval t, void *private_data)
 {
        struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
                                                       struct ctdb_sticky_record);
@@ -281,16 +296,13 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
        uint32_t *k;
        struct ctdb_sticky_record *sr;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr == NULL) {
                talloc_free(tmp_ctx);
@@ -306,7 +318,10 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
                        DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
                        return -1;
                }
-               event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
+               tevent_add_timer(ctdb->ev, sr->pindown,
+                                timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
+                                                    (ctdb->tunable.sticky_pindown * 1000) % 1000000),
+                                ctdb_sticky_pindown_timeout, sr);
        }
 
        return 0;
@@ -335,7 +350,7 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        header.dmaster = ctdb->pnn;
        header.flags = record_flags;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
 
        if (state) {
                if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
@@ -399,7 +414,7 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
                return;
        }
 
-       ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
+       ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
 
        ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
        if (ret != 0) {
@@ -412,7 +427,147 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        }
 }
 
+struct dmaster_defer_call {
+       struct dmaster_defer_call *next, *prev;
+       struct ctdb_context *ctdb;
+       struct ctdb_req_header *hdr;
+};
+
+struct dmaster_defer_queue {
+       struct ctdb_db_context *ctdb_db;
+       uint32_t generation;
+       struct dmaster_defer_call *deferred_calls;
+};
+
+static void dmaster_defer_reprocess(struct tevent_context *ev,
+                                   struct tevent_timer *te,
+                                   struct timeval t,
+                                   void *private_data)
+{
+       struct dmaster_defer_call *call = talloc_get_type(
+               private_data, struct dmaster_defer_call);
+
+       ctdb_input_pkt(call->ctdb, call->hdr);
+       talloc_free(call);
+}
+
+static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
+{
+       /* Ignore requests, if database recovery happens in-between. */
+       if (ddq->generation != ddq->ctdb_db->generation) {
+               return 0;
+       }
+
+       while (ddq->deferred_calls != NULL) {
+               struct dmaster_defer_call *call = ddq->deferred_calls;
+
+               DLIST_REMOVE(ddq->deferred_calls, call);
+
+               talloc_steal(call->ctdb, call);
+               tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
+                                dmaster_defer_reprocess, call);
+       }
+       return 0;
+}
+
+static void *insert_ddq_callback(void *parm, void *data)
+{
+       if (data) {
+               talloc_free(data);
+       }
+       return parm;
+}
+
+/**
+ * This function is used to reigster a key in database that needs to be updated.
+ * Any requests for that key should get deferred till this is completed.
+ */
+static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
+                              struct ctdb_req_header *hdr,
+                              TDB_DATA key)
+{
+       uint32_t *k;
+       struct dmaster_defer_queue *ddq;
+
+       k = ctdb_key_to_idkey(hdr, key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
+               return -1;
+       }
+
+       /* Already exists */
+       ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+       if (ddq != NULL) {
+               if (ddq->generation == ctdb_db->generation) {
+                       talloc_free(k);
+                       return 0;
+               }
+
+               /* Recovery ocurred - get rid of old queue. All the deferred
+                * requests will be resent anyway from ctdb_call_resend_db.
+                */
+               talloc_free(ddq);
+       }
+
+       ddq = talloc(hdr, struct dmaster_defer_queue);
+       if (ddq == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
+               talloc_free(k);
+               return -1;
+       }
+       ddq->ctdb_db = ctdb_db;
+       ddq->generation = hdr->generation;
+       ddq->deferred_calls = NULL;
+
+       trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
+                                   insert_ddq_callback, ddq);
+       talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
+
+       talloc_free(k);
+       return 0;
+}
+
+static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
+                            struct ctdb_req_header *hdr,
+                            TDB_DATA key)
+{
+       struct dmaster_defer_queue *ddq;
+       struct dmaster_defer_call *call;
+       uint32_t *k;
+
+       k = ctdb_key_to_idkey(hdr, key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
+               return -1;
+       }
+
+       ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+       if (ddq == NULL) {
+               talloc_free(k);
+               return -1;
+       }
+
+       talloc_free(k);
+
+       if (ddq->generation != hdr->generation) {
+               talloc_set_destructor(ddq, NULL);
+               talloc_free(ddq);
+               return -1;
+       }
+
+       call = talloc(ddq, struct dmaster_defer_call);
+       if (call == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
+               return -1;
+       }
+
+       call->ctdb = ctdb_db->ctdb;
+       call->hdr = talloc_steal(call, hdr);
+
+       DLIST_ADD_END(ddq->deferred_calls, call, NULL);
 
+       return 0;
+}
 
 /*
   called when a CTDB_REQ_DMASTER packet comes in
@@ -430,6 +585,23 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        size_t len;
        int ret;
 
+       ctdb_db = find_ctdb_db(ctdb, c->db_id);
+       if (!ctdb_db) {
+               ctdb_send_error(ctdb, hdr, -1,
+                               "Unknown database in request. db_id==0x%08x",
+                               c->db_id);
+               return;
+       }
+
+       if (hdr->generation != ctdb_db->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, ctdb_db->generation));
+               return;
+       }
+
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = c->data + c->keylen;
@@ -437,17 +609,12 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
                        + sizeof(uint32_t);
        if (len <= c->hdr.length) {
-               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+               memcpy(&record_flags, &c->data[c->keylen + c->datalen],
+                      sizeof(record_flags));
        }
 
-       ctdb_db = find_ctdb_db(ctdb, c->db_id);
-       if (!ctdb_db) {
-               ctdb_send_error(ctdb, hdr, -1,
-                               "Unknown database in request. db_id==0x%08x",
-                               c->db_id);
-               return;
-       }
-       
+       dmaster_defer_setup(ctdb_db, hdr, key);
+
        /* fetch the current record */
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
                                           ctdb_call_input_pkt, ctdb, false);
@@ -474,7 +641,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        if (header.dmaster != hdr->srcnode) {
                DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
                         ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
-                        ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
+                        ctdb_db->db_id, hdr->generation, ctdb_db->generation,
                         (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
                         (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
                if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
@@ -489,7 +656,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        if (header.rsn > c->rsn) {
                DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
                         ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
-                        ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
+                        ctdb_db->db_id, hdr->generation, ctdb_db->generation,
                         (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
        }
 
@@ -513,7 +680,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        }
 }
 
-static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
+static void ctdb_sticky_record_timeout(struct tevent_context *ev,
+                                      struct tevent_timer *te,
                                       struct timeval t, void *private_data)
 {
        struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
@@ -537,16 +705,13 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
        uint32_t *k;
        struct ctdb_sticky_record *sr;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr != NULL) {
                talloc_free(tmp_ctx);
@@ -564,11 +729,15 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
        sr->ctdb_db = ctdb_db;
        sr->pindown = NULL;
 
-       DEBUG(DEBUG_ERR,("Make record sticky in db %s\n", ctdb_db->db_name));
+       DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
+                        ctdb->tunable.sticky_duration,
+                        ctdb_db->db_name, ctdb_hash(&key)));
 
        trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
 
-       event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
+       tevent_add_timer(ctdb->ev, sr,
+                        timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
+                        ctdb_sticky_record_timeout, sr);
 
        talloc_free(tmp_ctx);
        return 0;
@@ -584,8 +753,9 @@ struct pinned_down_deferred_call {
        struct ctdb_req_header *hdr;
 };
 
-static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void pinned_down_requeue(struct tevent_context *ev,
+                               struct tevent_timer *te,
+                               struct timeval t, void *private_data)
 {
        struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
        struct ctdb_context *ctdb = handle->ctdb;
@@ -605,7 +775,8 @@ static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
        handle->hdr  = pinned_down->hdr;
        talloc_steal(handle, handle->hdr);
 
-       event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
+       tevent_add_timer(ctdb->ev, handle, timeval_zero(),
+                        pinned_down_requeue, handle);
 
        return 0;
 }
@@ -618,16 +789,13 @@ ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context
        struct ctdb_sticky_record *sr;
        struct pinned_down_deferred_call *pinned_down;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr == NULL) {
                talloc_free(tmp_ctx);
@@ -658,7 +826,7 @@ ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context
 static void
 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
 {
-       int i;
+       int i, id;
 
        /* smallest value is always at index 0 */
        if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
@@ -681,16 +849,27 @@ ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int
                goto sort_keys;
        }
 
-       if (ctdb_db->statistics.hot_keys[0].key.dptr != NULL) {
-               talloc_free(ctdb_db->statistics.hot_keys[0].key.dptr);
+       if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
+               id = ctdb_db->statistics.num_hot_keys;
+               ctdb_db->statistics.num_hot_keys++;
+       } else {
+               id = 0;
        }
-       ctdb_db->statistics.hot_keys[0].key.dsize = key.dsize;
-       ctdb_db->statistics.hot_keys[0].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
-       ctdb_db->statistics.hot_keys[0].count = hopcount;
 
+       if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
+               talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
+       }
+       ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
+       ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
+       ctdb_db->statistics.hot_keys[id].count = hopcount;
+       DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
+                           ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
 
 sort_keys:
-       for (i = 2; i < MAX_HOT_KEYS; i++) {
+       for (i = 1; i < MAX_HOT_KEYS; i++) {
+               if (ctdb_db->statistics.hot_keys[i].count == 0) {
+                       continue;
+               }
                if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
                        hopcount = ctdb_db->statistics.hot_keys[i].count;
                        ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
@@ -708,9 +887,9 @@ sort_keys:
 */
 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+       struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
        TDB_DATA data;
-       struct ctdb_reply_call *r;
+       struct ctdb_reply_call_old *r;
        int ret, len;
        struct ctdb_ltdb_header header;
        struct ctdb_call *call;
@@ -722,7 +901,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
-
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
                ctdb_send_error(ctdb, hdr, -1,
@@ -731,6 +909,15 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
+       if (hdr->generation != ctdb_db->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, ctdb_db->generation));
+               return;
+       }
+
        call = talloc(hdr, struct ctdb_call);
        CTDB_NO_MEMORY_FATAL(ctdb, call);
 
@@ -748,11 +935,17 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        */
        if (ctdb_db->sticky) {
                if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
-                 DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
+                       DEBUG(DEBUG_WARNING,
+                             ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
+                       talloc_free(call);
                        return;
                }
        }
 
+       if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
+               talloc_free(call);
+               return;
+       }
 
        /* determine if we are the dmaster for this key. This also
           fetches the record data (if any), thus avoiding a 2nd fetch of the data 
@@ -762,10 +955,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                                           ctdb_call_input_pkt, ctdb, false);
        if (ret == -1) {
                ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
+               talloc_free(call);
                return;
        }
        if (ret == -2) {
                DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
+               talloc_free(call);
                return;
        }
 
@@ -815,6 +1010,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                if (ret != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                }
+               talloc_free(call);
                return;
        }
 
@@ -869,12 +1065,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                        DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                }
 
-               len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
+               len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
                r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
-                                           struct ctdb_reply_call);
+                                           struct ctdb_reply_call_old);
                CTDB_NO_MEMORY_FATAL(ctdb, r);
                r->hdr.destnode  = c->hdr.srcnode;
                r->hdr.reqid     = c->hdr.reqid;
+               r->hdr.generation = ctdb_db->generation;
                r->status        = 0;
                r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
                header.rsn      -= 2;
@@ -891,6 +1088,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
 
                talloc_free(r);
+               talloc_free(call);
                return;
        }
 
@@ -913,21 +1111,16 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
           should make it sticky.
        */
        if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
-               DEBUG(DEBUG_ERR, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db->db_name, c->hopcount, ctdb->tunable.sticky_duration));
                ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
        }
 
 
-       /* if this nodes has done enough consecutive calls on the same record
-          then give them the record
-          or if the node requested an immediate migration
-       */
-       if ( c->hdr.srcnode != ctdb->pnn &&
-            ((header.laccessor == c->hdr.srcnode
-              && header.lacount >= ctdb->tunable.max_lacount
-              && ctdb->tunable.max_lacount != 0)
-             || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
-               if (ctdb_db->transaction_active) {
+       /* Try if possible to migrate the record off to the caller node.
+        * From the clients perspective a fetch of the data is just as 
+        * expensive as a migration.
+        */
+       if (c->hdr.srcnode != ctdb->pnn) {
+               if (ctdb_db->persistent_state) {
                        DEBUG(DEBUG_INFO, (__location__ " refusing migration"
                              " of key %s while transaction is active\n",
                              (char *)call->key.dptr));
@@ -941,11 +1134,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                        }
-                       return;
                }
+               talloc_free(call);
+               return;
        }
 
-       ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
+       ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
                call->status = -1;
@@ -956,12 +1150,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
        }
 
-       len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
+       len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
-                                   struct ctdb_reply_call);
+                                   struct ctdb_reply_call_old);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = hdr->srcnode;
        r->hdr.reqid     = hdr->reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->status        = call->status;
        r->datalen       = call->reply_data.dsize;
        if (call->reply_data.dsize) {
@@ -971,20 +1166,21 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        ctdb_queue_packet(ctdb, &r->hdr);
 
        talloc_free(r);
+       talloc_free(call);
 }
 
-/*
-  called when a CTDB_REPLY_CALL packet comes in
-
-  This packet comes in response to a CTDB_REQ_CALL request packet. It
-  contains any reply data from the call
-*/
+/**
* called when a CTDB_REPLY_CALL packet comes in
+ *
* This packet comes in response to a CTDB_REQ_CALL request packet. It
* contains any reply data from the call
+ */
 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+       struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
        struct ctdb_call_state *state;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
                return;
@@ -996,6 +1192,15 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
+       if (hdr->generation != state->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, state->generation));
+               return;
+       }
+
 
        /* read only delegation processing */
        /* If we got a FETCH_WITH_HEADER we should check if this is a ro
@@ -1069,13 +1274,13 @@ finished_ro:
 }
 
 
-/*
-  called when a CTDB_REPLY_DMASTER packet comes in
-
 This packet comes in from the lmaster response to a CTDB_REQ_CALL
-  request packet. It means that the current dmaster wants to give us
-  the dmaster role
-*/
+/**
* called when a CTDB_REPLY_DMASTER packet comes in
+ *
* This packet comes in from the lmaster in response to a CTDB_REQ_CALL
* request packet. It means that the current dmaster wants to give us
+ * the dmaster role.
+ */
 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
        struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
@@ -1090,7 +1295,16 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
                return;
        }
-       
+
+       if (hdr->generation != ctdb_db->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, ctdb_db->generation));
+               return;
+       }
+
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = &c->data[key.dsize];
@@ -1098,9 +1312,12 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
                + sizeof(uint32_t);
        if (len <= c->hdr.length) {
-               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+               memcpy(&record_flags, &c->data[c->keylen + c->datalen],
+                      sizeof(record_flags));
        }
 
+       dmaster_defer_setup(ctdb_db, hdr, key);
+
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
                                     ctdb_call_input_pkt, ctdb, false);
        if (ret == -2) {
@@ -1123,7 +1340,7 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
        struct ctdb_call_state *state;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
                         ctdb->pnn, hdr->reqid));
@@ -1151,8 +1368,8 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 */
 static int ctdb_call_destructor(struct ctdb_call_state *state)
 {
-       DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
-       ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
+       DLIST_REMOVE(state->ctdb_db->pending_calls, state);
+       reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
        return 0;
 }
 
@@ -1164,11 +1381,11 @@ static void ctdb_call_resend(struct ctdb_call_state *state)
 {
        struct ctdb_context *ctdb = state->ctdb_db->ctdb;
 
-       state->generation = ctdb->vnn_map->generation;
+       state->generation = state->ctdb_db->generation;
 
        /* use a new reqid, in case the old reply does eventually come in */
-       ctdb_reqid_remove(ctdb, state->reqid);
-       state->reqid = ctdb_reqid_new(ctdb, state);
+       reqid_remove(ctdb->idr, state->reqid);
+       state->reqid = reqid_new(ctdb->idr, state);
        state->c->hdr.reqid = state->reqid;
 
        /* update the generation count for this request, so its valid with the new vnn_map */
@@ -1178,26 +1395,38 @@ static void ctdb_call_resend(struct ctdb_call_state *state)
        state->c->hdr.destnode = ctdb->pnn;
 
        ctdb_queue_packet(ctdb, &state->c->hdr);
-       DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
+       DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
+                           state->ctdb_db->db_name, state->reqid, state->generation));
 }
 
 /*
   resend all pending calls on recovery
  */
-void ctdb_call_resend_all(struct ctdb_context *ctdb)
+void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
 {
        struct ctdb_call_state *state, *next;
-       for (state=ctdb->pending_calls;state;state=next) {
+
+       for (state = ctdb_db->pending_calls; state; state = next) {
                next = state->next;
                ctdb_call_resend(state);
        }
 }
 
+void ctdb_call_resend_all(struct ctdb_context *ctdb)
+{
+       struct ctdb_db_context *ctdb_db;
+
+       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+               ctdb_call_resend_db(ctdb_db);
+       }
+}
+
 /*
   this allows the caller to setup a async.fn 
 */
-static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void call_local_trigger(struct tevent_context *ev,
+                              struct tevent_timer *te,
+                              struct timeval t, void *private_data)
 {
        struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
        if (state->async.fn) {
@@ -1232,12 +1461,13 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
        *(state->call) = *call;
        state->ctdb_db = ctdb_db;
 
-       ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
+       ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
        if (ret != 0) {
                DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
        }
 
-       event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
+       tevent_add_timer(ctdb->ev, state, timeval_zero(),
+                        call_local_trigger, state);
 
        return state;
 }
@@ -1267,18 +1497,19 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        state->call = talloc(state, struct ctdb_call);
        CTDB_NO_MEMORY_NULL(ctdb, state->call);
 
-       state->reqid = ctdb_reqid_new(ctdb, state);
+       state->reqid = reqid_new(ctdb->idr, state);
        state->ctdb_db = ctdb_db;
        talloc_set_destructor(state, ctdb_call_destructor);
 
-       len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
+       len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
        state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
-                                          struct ctdb_req_call);
+                                          struct ctdb_req_call_old);
        CTDB_NO_MEMORY_NULL(ctdb, state->c);
        state->c->hdr.destnode  = header->dmaster;
 
        /* this limits us to 16k outstanding messages - not unreasonable */
        state->c->hdr.reqid     = state->reqid;
+       state->c->hdr.generation = ctdb_db->generation;
        state->c->flags         = call->flags;
        state->c->db_id         = ctdb_db->db_id;
        state->c->callid        = call->call_id;
@@ -1293,9 +1524,9 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        state->call->key.dptr       = &state->c->data[0];
 
        state->state  = CTDB_CALL_WAIT;
-       state->generation = ctdb->vnn_map->generation;
+       state->generation = ctdb_db->generation;
 
-       DLIST_ADD(ctdb->pending_calls, state);
+       DLIST_ADD(ctdb_db->pending_calls, state);
 
        ctdb_queue_packet(ctdb, &state->c->hdr);
 
@@ -1311,7 +1542,7 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
 {
        while (state->state < CTDB_CALL_DONE) {
-               event_loop_once(state->ctdb_db->ctdb->ev);
+               tevent_loop_once(state->ctdb_db->ctdb->ev);
        }
        if (state->state != CTDB_CALL_DONE) {
                ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
@@ -1373,7 +1604,7 @@ struct revokechild_handle {
        struct revokechild_handle *next, *prev;
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
-       struct fd_event *fde;
+       struct tevent_fd *fde;
        int status;
        int fd[2];
        pid_t child;
@@ -1387,8 +1618,9 @@ struct revokechild_requeue_handle {
        void *ctx;
 };
 
-static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void deferred_call_requeue(struct tevent_context *ev,
+                                 struct tevent_timer *te,
+                                 struct timeval t, void *private_data)
 {
        struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
 
@@ -1400,7 +1632,7 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c
 {
        struct ctdb_context *ctdb = deferred_call->ctdb;
        struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
-       struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
+       struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)deferred_call->hdr;
 
        requeue_handle->ctdb = ctdb;
        requeue_handle->hdr  = deferred_call->hdr;
@@ -1409,7 +1641,9 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c
        talloc_steal(requeue_handle, requeue_handle->hdr);
 
        /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
-       event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
+       tevent_add_timer(ctdb->ev, requeue_handle,
+                        timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
+                        deferred_call_requeue, requeue_handle);
 
        return 0;
 }
@@ -1433,15 +1667,16 @@ static int revokechild_destructor(struct revokechild_handle *rc)
        return 0;
 }
 
-static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
-                            uint16_t flags, void *private_data)
+static void revokechild_handler(struct tevent_context *ev,
+                               struct tevent_fd *fde,
+                               uint16_t flags, void *private_data)
 {
        struct revokechild_handle *rc = talloc_get_type(private_data, 
                                                     struct revokechild_handle);
        int ret;
        char c;
 
-       ret = read(rc->fd[0], &c, 1);
+       ret = sys_read(rc->fd[0], &c, 1);
        if (ret != 1) {
                DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
                rc->status = -1;
@@ -1497,7 +1732,7 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat
        struct ctdb_revoke_state *revoke_state = private_data;
        struct ctdb_client_control_state *state;
 
-       state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
+       state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
                revoke_state->status = -1;
@@ -1510,8 +1745,9 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat
 
 }
 
-static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
-                             struct timeval yt, void *private_data)
+static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
+                                       struct tevent_timer *te,
+                                       struct timeval yt, void *private_data)
 {
        struct ctdb_revoke_state *state = private_data;
 
@@ -1523,7 +1759,8 @@ static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_e
 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
 {
        struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
-       int status;
+       struct ctdb_ltdb_header new_header;
+       TDB_DATA new_data;
 
        state->ctdb_db = ctdb_db;
        state->key     = key;
@@ -1532,55 +1769,60 @@ static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db
  
        ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
 
-       event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
+       tevent_add_timer(ctdb->ev, state,
+                        timeval_current_ofs(ctdb->tunable.control_timeout, 0),
+                        ctdb_revoke_timeout_handler, state);
 
        while (state->finished == 0) {
-               event_loop_once(ctdb->ev);
+               tevent_loop_once(ctdb->ev);
        }
 
-       status = state->status;
-
-       if (status == 0) {
-               struct ctdb_ltdb_header new_header;
-               TDB_DATA new_data;
+       if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       header->rsn++;
+       if (new_header.rsn > header->rsn) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
 
-               if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               header->rsn++;
-               if (new_header.rsn > header->rsn) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
+       /*
+        * If revoke on all nodes succeed, revoke is complete.  Otherwise,
+        * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
+        */
+       if (state->status == 0) {
                new_header.rsn++;
                new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
-               if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
+       } else {
+               DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
+               new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
+       }
+       if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
                ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
+               talloc_free(state);
+               return -1;
        }
+       ctdb_ltdb_unlock(ctdb_db, key);
 
        talloc_free(state);
-       return status;
+       return 0;
 }
 
 
@@ -1645,6 +1887,7 @@ int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_contex
                close(rc->fd[0]);
                debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
 
+               ctdb_set_process_name("ctdb_revokechild");
                if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
                        c = 1;
@@ -1654,7 +1897,7 @@ int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_contex
                c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
 
 child_finished:
-               write(rc->fd[1], &c, 1);
+               sys_write(rc->fd[1], &c, 1);
                /* make sure we die when our parent dies */
                while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
                        sleep(5);
@@ -1669,9 +1912,8 @@ child_finished:
        /* This is an active revokechild child process */
        DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
 
-       rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
-                                  EVENT_FD_READ, revokechild_handler,
-                                  (void *)rc);
+       rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
+                               revokechild_handler, (void *)rc);
        if (rc->fde == NULL) {
                DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
                talloc_free(rc);