ctdb-daemon: Rename struct ctdb_reply_call to ctdb_reply_call_old
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_call.c
index bd88ce9617001129595a2b6e2a924632fe00ec29..62381b85fe550a7743111000eadf2fc843b6ff29 100644 (file)
   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
   protocol design and packet details
 */
-#include "includes.h"
-#include "tdb.h"
-#include "lib/util/dlinklist.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+#include "ctdb_logging.h"
+
+#include "common/rb_tree.h"
+#include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
 
 struct ctdb_sticky_record {
        struct ctdb_context *ctdb;
@@ -120,7 +132,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
                                    struct ctdb_db_context *ctdb_db,
                                    TDB_DATA key,
-                                   struct ctdb_req_call *c, 
+                                   struct ctdb_req_call_old *c, 
                                    struct ctdb_ltdb_header *header)
 {
        uint32_t lmaster = ctdb_lmaster(ctdb, &key);
@@ -190,6 +202,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
 
        r->hdr.destnode  = new_dmaster;
        r->hdr.reqid     = reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->rsn           = header->rsn;
        r->keylen        = key.dsize;
        r->datalen       = data.dsize;
@@ -211,7 +224,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
   CTDB_REPLY_DMASTER to the new dmaster
 */
 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
-                                  struct ctdb_req_call *c, 
+                                  struct ctdb_req_call_old *c, 
                                   struct ctdb_ltdb_header *header,
                                   TDB_DATA *key, TDB_DATA *data)
 {
@@ -242,6 +255,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = lmaster;
        r->hdr.reqid     = c->hdr.reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->db_id         = c->db_id;
        r->rsn           = header->rsn;
        r->dmaster       = c->hdr.srcnode;
@@ -261,8 +275,9 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        talloc_free(r);
 }
 
-static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
-                                      struct timeval t, void *private_data)
+static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
+                                       struct tevent_timer *te,
+                                       struct timeval t, void *private_data)
 {
        struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
                                                       struct ctdb_sticky_record);
@@ -281,16 +296,13 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
        uint32_t *k;
        struct ctdb_sticky_record *sr;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr == NULL) {
                talloc_free(tmp_ctx);
@@ -306,7 +318,10 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
                        DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
                        return -1;
                }
-               event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
+               tevent_add_timer(ctdb->ev, sr->pindown,
+                                timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
+                                                    (ctdb->tunable.sticky_pindown * 1000) % 1000000),
+                                ctdb_sticky_pindown_timeout, sr);
        }
 
        return 0;
@@ -335,7 +350,7 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        header.dmaster = ctdb->pnn;
        header.flags = record_flags;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
 
        if (state) {
                if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
@@ -412,7 +427,147 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        }
 }
 
+struct dmaster_defer_call {
+       struct dmaster_defer_call *next, *prev;
+       struct ctdb_context *ctdb;
+       struct ctdb_req_header *hdr;
+};
+
+struct dmaster_defer_queue {
+       struct ctdb_db_context *ctdb_db;
+       uint32_t generation;
+       struct dmaster_defer_call *deferred_calls;
+};
+
+static void dmaster_defer_reprocess(struct tevent_context *ev,
+                                   struct tevent_timer *te,
+                                   struct timeval t,
+                                   void *private_data)
+{
+       struct dmaster_defer_call *call = talloc_get_type(
+               private_data, struct dmaster_defer_call);
+
+       ctdb_input_pkt(call->ctdb, call->hdr);
+       talloc_free(call);
+}
+
+static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
+{
+       /* Ignore requests, if database recovery happens in-between. */
+       if (ddq->generation != ddq->ctdb_db->generation) {
+               return 0;
+       }
+
+       while (ddq->deferred_calls != NULL) {
+               struct dmaster_defer_call *call = ddq->deferred_calls;
+
+               DLIST_REMOVE(ddq->deferred_calls, call);
+
+               talloc_steal(call->ctdb, call);
+               tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
+                                dmaster_defer_reprocess, call);
+       }
+       return 0;
+}
+
+static void *insert_ddq_callback(void *parm, void *data)
+{
+       if (data) {
+               talloc_free(data);
+       }
+       return parm;
+}
+
+/**
+ * This function is used to reigster a key in database that needs to be updated.
+ * Any requests for that key should get deferred till this is completed.
+ */
+static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
+                              struct ctdb_req_header *hdr,
+                              TDB_DATA key)
+{
+       uint32_t *k;
+       struct dmaster_defer_queue *ddq;
+
+       k = ctdb_key_to_idkey(hdr, key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
+               return -1;
+       }
+
+       /* Already exists */
+       ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+       if (ddq != NULL) {
+               if (ddq->generation == ctdb_db->generation) {
+                       talloc_free(k);
+                       return 0;
+               }
+
+               /* Recovery ocurred - get rid of old queue. All the deferred
+                * requests will be resent anyway from ctdb_call_resend_db.
+                */
+               talloc_free(ddq);
+       }
+
+       ddq = talloc(hdr, struct dmaster_defer_queue);
+       if (ddq == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
+               talloc_free(k);
+               return -1;
+       }
+       ddq->ctdb_db = ctdb_db;
+       ddq->generation = hdr->generation;
+       ddq->deferred_calls = NULL;
+
+       trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
+                                   insert_ddq_callback, ddq);
+       talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
+
+       talloc_free(k);
+       return 0;
+}
+
+static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
+                            struct ctdb_req_header *hdr,
+                            TDB_DATA key)
+{
+       struct dmaster_defer_queue *ddq;
+       struct dmaster_defer_call *call;
+       uint32_t *k;
+
+       k = ctdb_key_to_idkey(hdr, key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
+               return -1;
+       }
+
+       ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+       if (ddq == NULL) {
+               talloc_free(k);
+               return -1;
+       }
+
+       talloc_free(k);
+
+       if (ddq->generation != hdr->generation) {
+               talloc_set_destructor(ddq, NULL);
+               talloc_free(ddq);
+               return -1;
+       }
+
+       call = talloc(ddq, struct dmaster_defer_call);
+       if (call == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
+               return -1;
+       }
+
+       call->ctdb = ctdb_db->ctdb;
+       call->hdr = talloc_steal(call, hdr);
 
+       DLIST_ADD_END(ddq->deferred_calls, call, NULL);
+
+       return 0;
+}
 
 /*
   called when a CTDB_REQ_DMASTER packet comes in
@@ -430,6 +585,23 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        size_t len;
        int ret;
 
+       ctdb_db = find_ctdb_db(ctdb, c->db_id);
+       if (!ctdb_db) {
+               ctdb_send_error(ctdb, hdr, -1,
+                               "Unknown database in request. db_id==0x%08x",
+                               c->db_id);
+               return;
+       }
+
+       if (hdr->generation != ctdb_db->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, ctdb_db->generation));
+               return;
+       }
+
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = c->data + c->keylen;
@@ -437,17 +609,12 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
                        + sizeof(uint32_t);
        if (len <= c->hdr.length) {
-               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+               memcpy(&record_flags, &c->data[c->keylen + c->datalen],
+                      sizeof(record_flags));
        }
 
-       ctdb_db = find_ctdb_db(ctdb, c->db_id);
-       if (!ctdb_db) {
-               ctdb_send_error(ctdb, hdr, -1,
-                               "Unknown database in request. db_id==0x%08x",
-                               c->db_id);
-               return;
-       }
-       
+       dmaster_defer_setup(ctdb_db, hdr, key);
+
        /* fetch the current record */
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
                                           ctdb_call_input_pkt, ctdb, false);
@@ -474,7 +641,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        if (header.dmaster != hdr->srcnode) {
                DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
                         ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
-                        ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
+                        ctdb_db->db_id, hdr->generation, ctdb_db->generation,
                         (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
                         (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
                if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
@@ -489,7 +656,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        if (header.rsn > c->rsn) {
                DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
                         ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
-                        ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
+                        ctdb_db->db_id, hdr->generation, ctdb_db->generation,
                         (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
        }
 
@@ -513,7 +680,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        }
 }
 
-static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
+static void ctdb_sticky_record_timeout(struct tevent_context *ev,
+                                      struct tevent_timer *te,
                                       struct timeval t, void *private_data)
 {
        struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
@@ -537,16 +705,13 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
        uint32_t *k;
        struct ctdb_sticky_record *sr;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr != NULL) {
                talloc_free(tmp_ctx);
@@ -570,7 +735,9 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
 
        trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
 
-       event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
+       tevent_add_timer(ctdb->ev, sr,
+                        timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
+                        ctdb_sticky_record_timeout, sr);
 
        talloc_free(tmp_ctx);
        return 0;
@@ -586,8 +753,9 @@ struct pinned_down_deferred_call {
        struct ctdb_req_header *hdr;
 };
 
-static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void pinned_down_requeue(struct tevent_context *ev,
+                               struct tevent_timer *te,
+                               struct timeval t, void *private_data)
 {
        struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
        struct ctdb_context *ctdb = handle->ctdb;
@@ -607,7 +775,8 @@ static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
        handle->hdr  = pinned_down->hdr;
        talloc_steal(handle, handle->hdr);
 
-       event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
+       tevent_add_timer(ctdb->ev, handle, timeval_zero(),
+                        pinned_down_requeue, handle);
 
        return 0;
 }
@@ -620,16 +789,13 @@ ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context
        struct ctdb_sticky_record *sr;
        struct pinned_down_deferred_call *pinned_down;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr == NULL) {
                talloc_free(tmp_ctx);
@@ -721,9 +887,9 @@ sort_keys:
 */
 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+       struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
        TDB_DATA data;
-       struct ctdb_reply_call *r;
+       struct ctdb_reply_call_old *r;
        int ret, len;
        struct ctdb_ltdb_header header;
        struct ctdb_call *call;
@@ -735,7 +901,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
-
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
                ctdb_send_error(ctdb, hdr, -1,
@@ -744,6 +909,15 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
+       if (hdr->generation != ctdb_db->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, ctdb_db->generation));
+               return;
+       }
+
        call = talloc(hdr, struct ctdb_call);
        CTDB_NO_MEMORY_FATAL(ctdb, call);
 
@@ -768,6 +942,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                }
        }
 
+       if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
+               talloc_free(call);
+               return;
+       }
 
        /* determine if we are the dmaster for this key. This also
           fetches the record data (if any), thus avoiding a 2nd fetch of the data 
@@ -887,12 +1065,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                        DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                }
 
-               len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
+               len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
                r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
-                                           struct ctdb_reply_call);
+                                           struct ctdb_reply_call_old);
                CTDB_NO_MEMORY_FATAL(ctdb, r);
                r->hdr.destnode  = c->hdr.srcnode;
                r->hdr.reqid     = c->hdr.reqid;
+               r->hdr.generation = ctdb_db->generation;
                r->status        = 0;
                r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
                header.rsn      -= 2;
@@ -941,7 +1120,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
         * expensive as a migration.
         */
        if (c->hdr.srcnode != ctdb->pnn) {
-               if (ctdb_db->transaction_active) {
+               if (ctdb_db->persistent_state) {
                        DEBUG(DEBUG_INFO, (__location__ " refusing migration"
                              " of key %s while transaction is active\n",
                              (char *)call->key.dptr));
@@ -971,12 +1150,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
        }
 
-       len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
+       len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
-                                   struct ctdb_reply_call);
+                                   struct ctdb_reply_call_old);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = hdr->srcnode;
        r->hdr.reqid     = hdr->reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->status        = call->status;
        r->datalen       = call->reply_data.dsize;
        if (call->reply_data.dsize) {
@@ -997,10 +1177,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
  */
 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+       struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
        struct ctdb_call_state *state;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
                return;
@@ -1012,6 +1192,15 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
+       if (hdr->generation != state->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, state->generation));
+               return;
+       }
+
 
        /* read only delegation processing */
        /* If we got a FETCH_WITH_HEADER we should check if this is a ro
@@ -1106,7 +1295,16 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
                return;
        }
-       
+
+       if (hdr->generation != ctdb_db->generation) {
+               DEBUG(DEBUG_DEBUG,
+                     ("ctdb operation %u request %u from node %u to %u had an"
+                      " invalid generation:%u while our generation is:%u\n",
+                      hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
+                      hdr->generation, ctdb_db->generation));
+               return;
+       }
+
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = &c->data[key.dsize];
@@ -1114,9 +1312,12 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
                + sizeof(uint32_t);
        if (len <= c->hdr.length) {
-               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+               memcpy(&record_flags, &c->data[c->keylen + c->datalen],
+                      sizeof(record_flags));
        }
 
+       dmaster_defer_setup(ctdb_db, hdr, key);
+
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
                                     ctdb_call_input_pkt, ctdb, false);
        if (ret == -2) {
@@ -1139,7 +1340,7 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
        struct ctdb_call_state *state;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
                         ctdb->pnn, hdr->reqid));
@@ -1167,8 +1368,8 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 */
 static int ctdb_call_destructor(struct ctdb_call_state *state)
 {
-       DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
-       ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
+       DLIST_REMOVE(state->ctdb_db->pending_calls, state);
+       reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
        return 0;
 }
 
@@ -1180,11 +1381,11 @@ static void ctdb_call_resend(struct ctdb_call_state *state)
 {
        struct ctdb_context *ctdb = state->ctdb_db->ctdb;
 
-       state->generation = ctdb->vnn_map->generation;
+       state->generation = state->ctdb_db->generation;
 
        /* use a new reqid, in case the old reply does eventually come in */
-       ctdb_reqid_remove(ctdb, state->reqid);
-       state->reqid = ctdb_reqid_new(ctdb, state);
+       reqid_remove(ctdb->idr, state->reqid);
+       state->reqid = reqid_new(ctdb->idr, state);
        state->c->hdr.reqid = state->reqid;
 
        /* update the generation count for this request, so its valid with the new vnn_map */
@@ -1194,26 +1395,38 @@ static void ctdb_call_resend(struct ctdb_call_state *state)
        state->c->hdr.destnode = ctdb->pnn;
 
        ctdb_queue_packet(ctdb, &state->c->hdr);
-       DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
+       DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
+                           state->ctdb_db->db_name, state->reqid, state->generation));
 }
 
 /*
   resend all pending calls on recovery
  */
-void ctdb_call_resend_all(struct ctdb_context *ctdb)
+void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
 {
        struct ctdb_call_state *state, *next;
-       for (state=ctdb->pending_calls;state;state=next) {
+
+       for (state = ctdb_db->pending_calls; state; state = next) {
                next = state->next;
                ctdb_call_resend(state);
        }
 }
 
+void ctdb_call_resend_all(struct ctdb_context *ctdb)
+{
+       struct ctdb_db_context *ctdb_db;
+
+       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+               ctdb_call_resend_db(ctdb_db);
+       }
+}
+
 /*
   this allows the caller to setup a async.fn 
 */
-static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void call_local_trigger(struct tevent_context *ev,
+                              struct tevent_timer *te,
+                              struct timeval t, void *private_data)
 {
        struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
        if (state->async.fn) {
@@ -1253,7 +1466,8 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
                DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
        }
 
-       event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
+       tevent_add_timer(ctdb->ev, state, timeval_zero(),
+                        call_local_trigger, state);
 
        return state;
 }
@@ -1283,18 +1497,19 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        state->call = talloc(state, struct ctdb_call);
        CTDB_NO_MEMORY_NULL(ctdb, state->call);
 
-       state->reqid = ctdb_reqid_new(ctdb, state);
+       state->reqid = reqid_new(ctdb->idr, state);
        state->ctdb_db = ctdb_db;
        talloc_set_destructor(state, ctdb_call_destructor);
 
-       len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
+       len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
        state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
-                                          struct ctdb_req_call);
+                                          struct ctdb_req_call_old);
        CTDB_NO_MEMORY_NULL(ctdb, state->c);
        state->c->hdr.destnode  = header->dmaster;
 
        /* this limits us to 16k outstanding messages - not unreasonable */
        state->c->hdr.reqid     = state->reqid;
+       state->c->hdr.generation = ctdb_db->generation;
        state->c->flags         = call->flags;
        state->c->db_id         = ctdb_db->db_id;
        state->c->callid        = call->call_id;
@@ -1309,9 +1524,9 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        state->call->key.dptr       = &state->c->data[0];
 
        state->state  = CTDB_CALL_WAIT;
-       state->generation = ctdb->vnn_map->generation;
+       state->generation = ctdb_db->generation;
 
-       DLIST_ADD(ctdb->pending_calls, state);
+       DLIST_ADD(ctdb_db->pending_calls, state);
 
        ctdb_queue_packet(ctdb, &state->c->hdr);
 
@@ -1327,7 +1542,7 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
 {
        while (state->state < CTDB_CALL_DONE) {
-               event_loop_once(state->ctdb_db->ctdb->ev);
+               tevent_loop_once(state->ctdb_db->ctdb->ev);
        }
        if (state->state != CTDB_CALL_DONE) {
                ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
@@ -1389,7 +1604,7 @@ struct revokechild_handle {
        struct revokechild_handle *next, *prev;
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
-       struct fd_event *fde;
+       struct tevent_fd *fde;
        int status;
        int fd[2];
        pid_t child;
@@ -1403,8 +1618,9 @@ struct revokechild_requeue_handle {
        void *ctx;
 };
 
-static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void deferred_call_requeue(struct tevent_context *ev,
+                                 struct tevent_timer *te,
+                                 struct timeval t, void *private_data)
 {
        struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
 
@@ -1416,7 +1632,7 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c
 {
        struct ctdb_context *ctdb = deferred_call->ctdb;
        struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
-       struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
+       struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)deferred_call->hdr;
 
        requeue_handle->ctdb = ctdb;
        requeue_handle->hdr  = deferred_call->hdr;
@@ -1425,7 +1641,9 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c
        talloc_steal(requeue_handle, requeue_handle->hdr);
 
        /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
-       event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
+       tevent_add_timer(ctdb->ev, requeue_handle,
+                        timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
+                        deferred_call_requeue, requeue_handle);
 
        return 0;
 }
@@ -1449,15 +1667,16 @@ static int revokechild_destructor(struct revokechild_handle *rc)
        return 0;
 }
 
-static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
-                            uint16_t flags, void *private_data)
+static void revokechild_handler(struct tevent_context *ev,
+                               struct tevent_fd *fde,
+                               uint16_t flags, void *private_data)
 {
        struct revokechild_handle *rc = talloc_get_type(private_data, 
                                                     struct revokechild_handle);
        int ret;
        char c;
 
-       ret = read(rc->fd[0], &c, 1);
+       ret = sys_read(rc->fd[0], &c, 1);
        if (ret != 1) {
                DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
                rc->status = -1;
@@ -1513,7 +1732,7 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat
        struct ctdb_revoke_state *revoke_state = private_data;
        struct ctdb_client_control_state *state;
 
-       state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
+       state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
                revoke_state->status = -1;
@@ -1526,8 +1745,9 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat
 
 }
 
-static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
-                             struct timeval yt, void *private_data)
+static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
+                                       struct tevent_timer *te,
+                                       struct timeval yt, void *private_data)
 {
        struct ctdb_revoke_state *state = private_data;
 
@@ -1539,7 +1759,8 @@ static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_e
 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
 {
        struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
-       int status;
+       struct ctdb_ltdb_header new_header;
+       TDB_DATA new_data;
 
        state->ctdb_db = ctdb_db;
        state->key     = key;
@@ -1548,55 +1769,60 @@ static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db
  
        ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
 
-       event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
+       tevent_add_timer(ctdb->ev, state,
+                        timeval_current_ofs(ctdb->tunable.control_timeout, 0),
+                        ctdb_revoke_timeout_handler, state);
 
        while (state->finished == 0) {
-               event_loop_once(ctdb->ev);
+               tevent_loop_once(ctdb->ev);
        }
 
-       status = state->status;
-
-       if (status == 0) {
-               struct ctdb_ltdb_header new_header;
-               TDB_DATA new_data;
+       if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       header->rsn++;
+       if (new_header.rsn > header->rsn) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
 
-               if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               header->rsn++;
-               if (new_header.rsn > header->rsn) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
+       /*
+        * If revoke on all nodes succeed, revoke is complete.  Otherwise,
+        * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
+        */
+       if (state->status == 0) {
                new_header.rsn++;
                new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
-               if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
+       } else {
+               DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
+               new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
+       }
+       if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
                ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
+               talloc_free(state);
+               return -1;
        }
+       ctdb_ltdb_unlock(ctdb_db, key);
 
        talloc_free(state);
-       return status;
+       return 0;
 }
 
 
@@ -1671,7 +1897,7 @@ int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_contex
                c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
 
 child_finished:
-               write(rc->fd[1], &c, 1);
+               sys_write(rc->fd[1], &c, 1);
                /* make sure we die when our parent dies */
                while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
                        sleep(5);
@@ -1686,9 +1912,8 @@ child_finished:
        /* This is an active revokechild child process */
        DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
 
-       rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
-                                  EVENT_FD_READ, revokechild_handler,
-                                  (void *)rc);
+       rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
+                               revokechild_handler, (void *)rc);
        if (rc->fde == NULL) {
                DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
                talloc_free(rc);