ctdb-daemon: Check packet generation against database generation
[ambi/samba-autobuild/.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30
31 struct ctdb_sticky_record {
32         struct ctdb_context *ctdb;
33         struct ctdb_db_context *ctdb_db;
34         TDB_CONTEXT *pindown;
35 };
36
37 /*
38   find the ctdb_db from a db index
39  */
40  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
41 {
42         struct ctdb_db_context *ctdb_db;
43
44         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45                 if (ctdb_db->db_id == id) {
46                         break;
47                 }
48         }
49         return ctdb_db;
50 }
51
52 /*
53   a varient of input packet that can be used in lock requeue
54 */
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
56 {
57         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58         ctdb_input_pkt(ctdb, hdr);
59 }
60
61
62 /*
63   send an error reply
64 */
65 static void ctdb_send_error(struct ctdb_context *ctdb, 
66                             struct ctdb_req_header *hdr, uint32_t status,
67                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb, 
69                             struct ctdb_req_header *hdr, uint32_t status,
70                             const char *fmt, ...)
71 {
72         va_list ap;
73         struct ctdb_reply_error *r;
74         char *msg;
75         int msglen, len;
76
77         if (ctdb->methods == NULL) {
78                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79                 return;
80         }
81
82         va_start(ap, fmt);
83         msg = talloc_vasprintf(ctdb, fmt, ap);
84         if (msg == NULL) {
85                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
86         }
87         va_end(ap);
88
89         msglen = strlen(msg)+1;
90         len = offsetof(struct ctdb_reply_error, msg);
91         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
92                                     struct ctdb_reply_error);
93         CTDB_NO_MEMORY_FATAL(ctdb, r);
94
95         r->hdr.destnode  = hdr->srcnode;
96         r->hdr.reqid     = hdr->reqid;
97         r->status        = status;
98         r->msglen        = msglen;
99         memcpy(&r->msg[0], msg, msglen);
100
101         ctdb_queue_packet(ctdb, &r->hdr);
102
103         talloc_free(msg);
104 }
105
106
107 /**
108  * send a redirect reply
109  *
110  * The logic behind this function is this:
111  *
112  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113  * to its local ctdb (ctdb_request_call). If the node is not itself
114  * the record's DMASTER, it first redirects the packet to  the
115  * record's LMASTER. The LMASTER then redirects the call packet to
116  * the current DMASTER. Note that this works because of this: When
117  * a record is migrated off a node, then the new DMASTER is stored
118  * in the record's copy on the former DMASTER.
119  */
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121                                     struct ctdb_db_context *ctdb_db,
122                                     TDB_DATA key,
123                                     struct ctdb_req_call *c, 
124                                     struct ctdb_ltdb_header *header)
125 {
126         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
127
128         c->hdr.destnode = lmaster;
129         if (ctdb->pnn == lmaster) {
130                 c->hdr.destnode = header->dmaster;
131         }
132         c->hopcount++;
133
134         if (c->hopcount%100 > 95) {
135                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
136                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137                         "header->dmaster:%d dst:%d\n",
138                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
139                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
140                         header->dmaster, c->hdr.destnode));
141         }
142
143         ctdb_queue_packet(ctdb, &c->hdr);
144 }
145
146
147 /*
148   send a dmaster reply
149
150   caller must have the chainlock before calling this routine. Caller must be
151   the lmaster
152 */
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154                                     struct ctdb_ltdb_header *header,
155                                     TDB_DATA key, TDB_DATA data,
156                                     uint32_t new_dmaster,
157                                     uint32_t reqid)
158 {
159         struct ctdb_context *ctdb = ctdb_db->ctdb;
160         struct ctdb_reply_dmaster *r;
161         int ret, len;
162         TALLOC_CTX *tmp_ctx;
163
164         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
166                 return;
167         }
168
169         header->dmaster = new_dmaster;
170         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
171         if (ret != 0) {
172                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
173                 return;
174         }
175
176         if (ctdb->methods == NULL) {
177                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
178                 return;
179         }
180
181         /* put the packet on a temporary context, allowing us to safely free
182            it below even if ctdb_reply_dmaster() has freed it already */
183         tmp_ctx = talloc_new(ctdb);
184
185         /* send the CTDB_REPLY_DMASTER */
186         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188                                     struct ctdb_reply_dmaster);
189         CTDB_NO_MEMORY_FATAL(ctdb, r);
190
191         r->hdr.destnode  = new_dmaster;
192         r->hdr.reqid     = reqid;
193         r->hdr.generation = ctdb_db->generation;
194         r->rsn           = header->rsn;
195         r->keylen        = key.dsize;
196         r->datalen       = data.dsize;
197         r->db_id         = ctdb_db->db_id;
198         memcpy(&r->data[0], key.dptr, key.dsize);
199         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
200         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
201
202         ctdb_queue_packet(ctdb, &r->hdr);
203
204         talloc_free(tmp_ctx);
205 }
206
207 /*
208   send a dmaster request (give another node the dmaster for a record)
209
210   This is always sent to the lmaster, which ensures that the lmaster
211   always knows who the dmaster is. The lmaster will then send a
212   CTDB_REPLY_DMASTER to the new dmaster
213 */
214 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
215                                    struct ctdb_req_call *c, 
216                                    struct ctdb_ltdb_header *header,
217                                    TDB_DATA *key, TDB_DATA *data)
218 {
219         struct ctdb_req_dmaster *r;
220         struct ctdb_context *ctdb = ctdb_db->ctdb;
221         int len;
222         uint32_t lmaster = ctdb_lmaster(ctdb, key);
223
224         if (ctdb->methods == NULL) {
225                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
226                 return;
227         }
228
229         if (data->dsize != 0) {
230                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
231         }
232
233         if (lmaster == ctdb->pnn) {
234                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
235                                         c->hdr.srcnode, c->hdr.reqid);
236                 return;
237         }
238         
239         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
240                         + sizeof(uint32_t);
241         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
242                                     struct ctdb_req_dmaster);
243         CTDB_NO_MEMORY_FATAL(ctdb, r);
244         r->hdr.destnode  = lmaster;
245         r->hdr.reqid     = c->hdr.reqid;
246         r->hdr.generation = ctdb_db->generation;
247         r->db_id         = c->db_id;
248         r->rsn           = header->rsn;
249         r->dmaster       = c->hdr.srcnode;
250         r->keylen        = key->dsize;
251         r->datalen       = data->dsize;
252         memcpy(&r->data[0], key->dptr, key->dsize);
253         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
254         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
255
256         header->dmaster = c->hdr.srcnode;
257         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
258                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
259         }
260         
261         ctdb_queue_packet(ctdb, &r->hdr);
262
263         talloc_free(r);
264 }
265
266 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
267                                        struct timeval t, void *private_data)
268 {
269         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
270                                                        struct ctdb_sticky_record);
271
272         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
273         if (sr->pindown != NULL) {
274                 talloc_free(sr->pindown);
275                 sr->pindown = NULL;
276         }
277 }
278
279 static int
280 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
281 {
282         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
283         uint32_t *k;
284         struct ctdb_sticky_record *sr;
285
286         k = ctdb_key_to_idkey(tmp_ctx, key);
287         if (k == NULL) {
288                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
289                 talloc_free(tmp_ctx);
290                 return -1;
291         }
292
293         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
294         if (sr == NULL) {
295                 talloc_free(tmp_ctx);
296                 return 0;
297         }
298
299         talloc_free(tmp_ctx);
300
301         if (sr->pindown == NULL) {
302                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
303                 sr->pindown = talloc_new(sr);
304                 if (sr->pindown == NULL) {
305                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
306                         return -1;
307                 }
308                 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
309         }
310
311         return 0;
312 }
313
314 /*
315   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
316   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
317
318   must be called with the chainlock held. This function releases the chainlock
319 */
320 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
321                                 struct ctdb_req_header *hdr,
322                                 TDB_DATA key, TDB_DATA data,
323                                 uint64_t rsn, uint32_t record_flags)
324 {
325         struct ctdb_call_state *state;
326         struct ctdb_context *ctdb = ctdb_db->ctdb;
327         struct ctdb_ltdb_header header;
328         int ret;
329
330         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
331
332         ZERO_STRUCT(header);
333         header.rsn = rsn;
334         header.dmaster = ctdb->pnn;
335         header.flags = record_flags;
336
337         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
338
339         if (state) {
340                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
341                         /*
342                          * We temporarily add the VACUUM_MIGRATED flag to
343                          * the record flags, so that ctdb_ltdb_store can
344                          * decide whether the record should be stored or
345                          * deleted.
346                          */
347                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
348                 }
349         }
350
351         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
352                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
353
354                 ret = ctdb_ltdb_unlock(ctdb_db, key);
355                 if (ret != 0) {
356                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
357                 }
358                 return;
359         }
360
361         /* we just became DMASTER and this database is "sticky",
362            see if the record is flagged as "hot" and set up a pin-down
363            context to stop migrations for a little while if so
364         */
365         if (ctdb_db->sticky) {
366                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
367         }
368
369         if (state == NULL) {
370                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
371                          ctdb->pnn, hdr->reqid, hdr->srcnode));
372
373                 ret = ctdb_ltdb_unlock(ctdb_db, key);
374                 if (ret != 0) {
375                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
376                 }
377                 return;
378         }
379
380         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
381                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
382
383                 ret = ctdb_ltdb_unlock(ctdb_db, key);
384                 if (ret != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
386                 }
387                 return;
388         }
389
390         if (hdr->reqid != state->reqid) {
391                 /* we found a record  but it was the wrong one */
392                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
393
394                 ret = ctdb_ltdb_unlock(ctdb_db, key);
395                 if (ret != 0) {
396                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
397                 }
398                 return;
399         }
400
401         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
402
403         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
404         if (ret != 0) {
405                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
406         }
407
408         state->state = CTDB_CALL_DONE;
409         if (state->async.fn) {
410                 state->async.fn(state);
411         }
412 }
413
414 struct dmaster_defer_call {
415         struct dmaster_defer_call *next, *prev;
416         struct ctdb_context *ctdb;
417         struct ctdb_req_header *hdr;
418 };
419
420 struct dmaster_defer_queue {
421         struct ctdb_db_context *ctdb_db;
422         uint32_t generation;
423         struct dmaster_defer_call *deferred_calls;
424 };
425
426 static void dmaster_defer_reprocess(struct tevent_context *ev,
427                                     struct tevent_timer *te,
428                                     struct timeval t,
429                                     void *private_data)
430 {
431         struct dmaster_defer_call *call = talloc_get_type(
432                 private_data, struct dmaster_defer_call);
433
434         ctdb_input_pkt(call->ctdb, call->hdr);
435         talloc_free(call);
436 }
437
438 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
439 {
440         /* Ignore requests, if database recovery happens in-between. */
441         if (ddq->generation != ddq->ctdb_db->generation) {
442                 return 0;
443         }
444
445         while (ddq->deferred_calls != NULL) {
446                 struct dmaster_defer_call *call = ddq->deferred_calls;
447
448                 DLIST_REMOVE(ddq->deferred_calls, call);
449
450                 talloc_steal(call->ctdb, call);
451                 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
452                                  dmaster_defer_reprocess, call);
453         }
454         return 0;
455 }
456
457 static void *insert_ddq_callback(void *parm, void *data)
458 {
459         if (data) {
460                 talloc_free(data);
461         }
462         return parm;
463 }
464
465 /**
466  * This function is used to reigster a key in database that needs to be updated.
467  * Any requests for that key should get deferred till this is completed.
468  */
469 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
470                                struct ctdb_req_header *hdr,
471                                TDB_DATA key)
472 {
473         uint32_t *k;
474         struct dmaster_defer_queue *ddq;
475
476         k = ctdb_key_to_idkey(hdr, key);
477         if (k == NULL) {
478                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
479                 return -1;
480         }
481
482         /* Already exists */
483         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
484         if (ddq != NULL) {
485                 if (ddq->generation == ctdb_db->generation) {
486                         talloc_free(k);
487                         return 0;
488                 }
489
490                 /* Recovery ocurred - get rid of old queue. All the deferred
491                  * requests will be resent anyway from ctdb_call_resend_db.
492                  */
493                 talloc_free(ddq);
494         }
495
496         ddq = talloc(hdr, struct dmaster_defer_queue);
497         if (ddq == NULL) {
498                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
499                 talloc_free(k);
500                 return -1;
501         }
502         ddq->ctdb_db = ctdb_db;
503         ddq->generation = hdr->generation;
504         ddq->deferred_calls = NULL;
505
506         trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
507                                     insert_ddq_callback, ddq);
508         talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
509
510         talloc_free(k);
511         return 0;
512 }
513
514 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
515                              struct ctdb_req_header *hdr,
516                              TDB_DATA key)
517 {
518         struct dmaster_defer_queue *ddq;
519         struct dmaster_defer_call *call;
520         uint32_t *k;
521
522         k = ctdb_key_to_idkey(hdr, key);
523         if (k == NULL) {
524                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
525                 return -1;
526         }
527
528         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
529         if (ddq == NULL) {
530                 talloc_free(k);
531                 return -1;
532         }
533
534         talloc_free(k);
535
536         if (ddq->generation != hdr->generation) {
537                 talloc_set_destructor(ddq, NULL);
538                 talloc_free(ddq);
539                 return -1;
540         }
541
542         call = talloc(ddq, struct dmaster_defer_call);
543         if (call == NULL) {
544                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
545                 return -1;
546         }
547
548         call->ctdb = ctdb_db->ctdb;
549         call->hdr = talloc_steal(call, hdr);
550
551         DLIST_ADD_END(ddq->deferred_calls, call, NULL);
552
553         return 0;
554 }
555
556 /*
557   called when a CTDB_REQ_DMASTER packet comes in
558
559   this comes into the lmaster for a record when the current dmaster
560   wants to give up the dmaster role and give it to someone else
561 */
562 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
563 {
564         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
565         TDB_DATA key, data, data2;
566         struct ctdb_ltdb_header header;
567         struct ctdb_db_context *ctdb_db;
568         uint32_t record_flags = 0;
569         size_t len;
570         int ret;
571
572         ctdb_db = find_ctdb_db(ctdb, c->db_id);
573         if (!ctdb_db) {
574                 ctdb_send_error(ctdb, hdr, -1,
575                                 "Unknown database in request. db_id==0x%08x",
576                                 c->db_id);
577                 return;
578         }
579
580         if (hdr->generation != ctdb_db->generation) {
581                 DEBUG(DEBUG_DEBUG,
582                       ("ctdb operation %u request %u from node %u to %u had an"
583                        " invalid generation:%u while our generation is:%u\n",
584                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
585                        hdr->generation, ctdb_db->generation));
586                 return;
587         }
588
589         key.dptr = c->data;
590         key.dsize = c->keylen;
591         data.dptr = c->data + c->keylen;
592         data.dsize = c->datalen;
593         len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
594                         + sizeof(uint32_t);
595         if (len <= c->hdr.length) {
596                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
597                        sizeof(record_flags));
598         }
599
600         dmaster_defer_setup(ctdb_db, hdr, key);
601
602         /* fetch the current record */
603         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
604                                            ctdb_call_input_pkt, ctdb, false);
605         if (ret == -1) {
606                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
607                 return;
608         }
609         if (ret == -2) {
610                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
611                 return;
612         }
613
614         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
615                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
616                          ctdb->pnn, ctdb_lmaster(ctdb, &key), 
617                          hdr->generation, ctdb->vnn_map->generation));
618                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
619         }
620
621         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
622                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
623
624         /* its a protocol error if the sending node is not the current dmaster */
625         if (header.dmaster != hdr->srcnode) {
626                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
627                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
628                          ctdb_db->db_id, hdr->generation, ctdb_db->generation,
629                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
630                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
631                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
632                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
633
634                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
635                         ctdb_ltdb_unlock(ctdb_db, key);
636                         return;
637                 }
638         }
639
640         if (header.rsn > c->rsn) {
641                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
642                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
643                          ctdb_db->db_id, hdr->generation, ctdb_db->generation,
644                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
645         }
646
647         /* use the rsn from the sending node */
648         header.rsn = c->rsn;
649
650         /* store the record flags from the sending node */
651         header.flags = record_flags;
652
653         /* check if the new dmaster is the lmaster, in which case we
654            skip the dmaster reply */
655         if (c->dmaster == ctdb->pnn) {
656                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
657         } else {
658                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
659
660                 ret = ctdb_ltdb_unlock(ctdb_db, key);
661                 if (ret != 0) {
662                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
663                 }
664         }
665 }
666
667 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
668                                        struct timeval t, void *private_data)
669 {
670         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
671                                                        struct ctdb_sticky_record);
672         talloc_free(sr);
673 }
674
675 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
676 {
677         if (data) {
678                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
679                 talloc_free(data);
680         }
681         return parm;
682 }
683
684 static int
685 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
686 {
687         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
688         uint32_t *k;
689         struct ctdb_sticky_record *sr;
690
691         k = ctdb_key_to_idkey(tmp_ctx, key);
692         if (k == NULL) {
693                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
694                 talloc_free(tmp_ctx);
695                 return -1;
696         }
697
698         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
699         if (sr != NULL) {
700                 talloc_free(tmp_ctx);
701                 return 0;
702         }
703
704         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
705         if (sr == NULL) {
706                 talloc_free(tmp_ctx);
707                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
708                 return -1;
709         }
710
711         sr->ctdb    = ctdb;
712         sr->ctdb_db = ctdb_db;
713         sr->pindown = NULL;
714
715         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
716                          ctdb->tunable.sticky_duration,
717                          ctdb_db->db_name, ctdb_hash(&key)));
718
719         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
720
721         event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
722
723         talloc_free(tmp_ctx);
724         return 0;
725 }
726
727 struct pinned_down_requeue_handle {
728         struct ctdb_context *ctdb;
729         struct ctdb_req_header *hdr;
730 };
731
732 struct pinned_down_deferred_call {
733         struct ctdb_context *ctdb;
734         struct ctdb_req_header *hdr;
735 };
736
737 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
738                        struct timeval t, void *private_data)
739 {
740         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
741         struct ctdb_context *ctdb = handle->ctdb;
742
743         talloc_steal(ctdb, handle->hdr);
744         ctdb_call_input_pkt(ctdb, handle->hdr);
745
746         talloc_free(handle);
747 }
748
749 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
750 {
751         struct ctdb_context *ctdb = pinned_down->ctdb;
752         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
753
754         handle->ctdb = pinned_down->ctdb;
755         handle->hdr  = pinned_down->hdr;
756         talloc_steal(handle, handle->hdr);
757
758         event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
759
760         return 0;
761 }
762
763 static int
764 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
765 {
766         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
767         uint32_t *k;
768         struct ctdb_sticky_record *sr;
769         struct pinned_down_deferred_call *pinned_down;
770
771         k = ctdb_key_to_idkey(tmp_ctx, key);
772         if (k == NULL) {
773                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
774                 talloc_free(tmp_ctx);
775                 return -1;
776         }
777
778         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
779         if (sr == NULL) {
780                 talloc_free(tmp_ctx);
781                 return -1;
782         }
783
784         talloc_free(tmp_ctx);
785
786         if (sr->pindown == NULL) {
787                 return -1;
788         }
789         
790         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
791         if (pinned_down == NULL) {
792                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
793                 return -1;
794         }
795
796         pinned_down->ctdb = ctdb;
797         pinned_down->hdr  = hdr;
798
799         talloc_set_destructor(pinned_down, pinned_down_destructor);
800         talloc_steal(pinned_down, hdr);
801
802         return 0;
803 }
804
805 static void
806 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
807 {
808         int i, id;
809
810         /* smallest value is always at index 0 */
811         if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
812                 return;
813         }
814
815         /* see if we already know this key */
816         for (i = 0; i < MAX_HOT_KEYS; i++) {
817                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
818                         continue;
819                 }
820                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
821                         continue;
822                 }
823                 /* found an entry for this key */
824                 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
825                         return;
826                 }
827                 ctdb_db->statistics.hot_keys[i].count = hopcount;
828                 goto sort_keys;
829         }
830
831         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
832                 id = ctdb_db->statistics.num_hot_keys;
833                 ctdb_db->statistics.num_hot_keys++;
834         } else {
835                 id = 0;
836         }
837
838         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
839                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
840         }
841         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
842         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
843         ctdb_db->statistics.hot_keys[id].count = hopcount;
844         DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
845                             ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
846
847 sort_keys:
848         for (i = 1; i < MAX_HOT_KEYS; i++) {
849                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
850                         continue;
851                 }
852                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
853                         hopcount = ctdb_db->statistics.hot_keys[i].count;
854                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
855                         ctdb_db->statistics.hot_keys[0].count = hopcount;
856
857                         key = ctdb_db->statistics.hot_keys[i].key;
858                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
859                         ctdb_db->statistics.hot_keys[0].key = key;
860                 }
861         }
862 }
863
864 /*
865   called when a CTDB_REQ_CALL packet comes in
866 */
867 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
868 {
869         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
870         TDB_DATA data;
871         struct ctdb_reply_call *r;
872         int ret, len;
873         struct ctdb_ltdb_header header;
874         struct ctdb_call *call;
875         struct ctdb_db_context *ctdb_db;
876         int tmp_count, bucket;
877
878         if (ctdb->methods == NULL) {
879                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
880                 return;
881         }
882
883         ctdb_db = find_ctdb_db(ctdb, c->db_id);
884         if (!ctdb_db) {
885                 ctdb_send_error(ctdb, hdr, -1,
886                                 "Unknown database in request. db_id==0x%08x",
887                                 c->db_id);
888                 return;
889         }
890
891         if (hdr->generation != ctdb_db->generation) {
892                 DEBUG(DEBUG_DEBUG,
893                       ("ctdb operation %u request %u from node %u to %u had an"
894                        " invalid generation:%u while our generation is:%u\n",
895                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
896                        hdr->generation, ctdb_db->generation));
897                 return;
898         }
899
900         call = talloc(hdr, struct ctdb_call);
901         CTDB_NO_MEMORY_FATAL(ctdb, call);
902
903         call->call_id  = c->callid;
904         call->key.dptr = c->data;
905         call->key.dsize = c->keylen;
906         call->call_data.dptr = c->data + c->keylen;
907         call->call_data.dsize = c->calldatalen;
908         call->reply_data.dptr  = NULL;
909         call->reply_data.dsize = 0;
910
911
912         /* If this record is pinned down we should defer the
913            request until the pindown times out
914         */
915         if (ctdb_db->sticky) {
916                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
917                         DEBUG(DEBUG_WARNING,
918                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
919                         talloc_free(call);
920                         return;
921                 }
922         }
923
924         if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
925                 talloc_free(call);
926                 return;
927         }
928
929         /* determine if we are the dmaster for this key. This also
930            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
931            if the call will be answered locally */
932
933         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
934                                            ctdb_call_input_pkt, ctdb, false);
935         if (ret == -1) {
936                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
937                 talloc_free(call);
938                 return;
939         }
940         if (ret == -2) {
941                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
942                 talloc_free(call);
943                 return;
944         }
945
946         /* Dont do READONLY if we dont have a tracking database */
947         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
948                 c->flags &= ~CTDB_WANT_READONLY;
949         }
950
951         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
952                 header.flags &= ~CTDB_REC_RO_FLAGS;
953                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
954                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
955                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
956                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
957                 }
958                 /* and clear out the tracking data */
959                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
960                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
961                 }
962         }
963
964         /* if we are revoking, we must defer all other calls until the revoke
965          * had completed.
966          */
967         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
968                 talloc_free(data.dptr);
969                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
970
971                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
972                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
973                 }
974                 talloc_free(call);
975                 return;
976         }
977
978         /*
979          * If we are not the dmaster and are not hosting any delegations,
980          * then we redirect the request to the node than can answer it
981          * (the lmaster or the dmaster).
982          */
983         if ((header.dmaster != ctdb->pnn) 
984             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
985                 talloc_free(data.dptr);
986                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
987
988                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
989                 if (ret != 0) {
990                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
991                 }
992                 talloc_free(call);
993                 return;
994         }
995
996         if ( (!(c->flags & CTDB_WANT_READONLY))
997         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
998                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
999                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1000                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1001                 }
1002                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1003
1004                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1005                         ctdb_fatal(ctdb, "Failed to start record revoke");
1006                 }
1007                 talloc_free(data.dptr);
1008
1009                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1010                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1011                 }
1012                 talloc_free(call);
1013
1014                 return;
1015         }               
1016
1017         /* If this is the first request for delegation. bump rsn and set
1018          * the delegations flag
1019          */
1020         if ((c->flags & CTDB_WANT_READONLY)
1021         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1022         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1023                 header.rsn     += 3;
1024                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
1025                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1026                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1027                 }
1028         }
1029         if ((c->flags & CTDB_WANT_READONLY) 
1030         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1031                 TDB_DATA tdata;
1032
1033                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1034                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1035                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1036                 }
1037                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1038                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
1039                 }
1040                 free(tdata.dptr);
1041
1042                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1043                 if (ret != 0) {
1044                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1045                 }
1046
1047                 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1048                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1049                                             struct ctdb_reply_call);
1050                 CTDB_NO_MEMORY_FATAL(ctdb, r);
1051                 r->hdr.destnode  = c->hdr.srcnode;
1052                 r->hdr.reqid     = c->hdr.reqid;
1053                 r->hdr.generation = ctdb_db->generation;
1054                 r->status        = 0;
1055                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
1056                 header.rsn      -= 2;
1057                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
1058                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1059                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1060
1061                 if (data.dsize) {
1062                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1063                 }
1064
1065                 ctdb_queue_packet(ctdb, &r->hdr);
1066                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1067                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1068
1069                 talloc_free(r);
1070                 talloc_free(call);
1071                 return;
1072         }
1073
1074         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1075         tmp_count = c->hopcount;
1076         bucket = 0;
1077         while (tmp_count) {
1078                 tmp_count >>= 2;
1079                 bucket++;
1080         }
1081         if (bucket >= MAX_COUNT_BUCKETS) {
1082                 bucket = MAX_COUNT_BUCKETS - 1;
1083         }
1084         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1085         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1086         ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1087
1088         /* If this database supports sticky records, then check if the
1089            hopcount is big. If it is it means the record is hot and we
1090            should make it sticky.
1091         */
1092         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1093                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1094         }
1095
1096
1097         /* Try if possible to migrate the record off to the caller node.
1098          * From the clients perspective a fetch of the data is just as 
1099          * expensive as a migration.
1100          */
1101         if (c->hdr.srcnode != ctdb->pnn) {
1102                 if (ctdb_db->persistent_state) {
1103                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1104                               " of key %s while transaction is active\n",
1105                               (char *)call->key.dptr));
1106                 } else {
1107                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1108                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1109                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1110                         talloc_free(data.dptr);
1111
1112                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1113                         if (ret != 0) {
1114                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1115                         }
1116                 }
1117                 talloc_free(call);
1118                 return;
1119         }
1120
1121         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1122         if (ret != 0) {
1123                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1124                 call->status = -1;
1125         }
1126
1127         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1128         if (ret != 0) {
1129                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1130         }
1131
1132         len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
1133         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1134                                     struct ctdb_reply_call);
1135         CTDB_NO_MEMORY_FATAL(ctdb, r);
1136         r->hdr.destnode  = hdr->srcnode;
1137         r->hdr.reqid     = hdr->reqid;
1138         r->hdr.generation = ctdb_db->generation;
1139         r->status        = call->status;
1140         r->datalen       = call->reply_data.dsize;
1141         if (call->reply_data.dsize) {
1142                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1143         }
1144
1145         ctdb_queue_packet(ctdb, &r->hdr);
1146
1147         talloc_free(r);
1148         talloc_free(call);
1149 }
1150
1151 /**
1152  * called when a CTDB_REPLY_CALL packet comes in
1153  *
1154  * This packet comes in response to a CTDB_REQ_CALL request packet. It
1155  * contains any reply data from the call
1156  */
1157 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1158 {
1159         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1160         struct ctdb_call_state *state;
1161
1162         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1163         if (state == NULL) {
1164                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1165                 return;
1166         }
1167
1168         if (hdr->reqid != state->reqid) {
1169                 /* we found a record  but it was the wrong one */
1170                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1171                 return;
1172         }
1173
1174         if (hdr->generation != state->generation) {
1175                 DEBUG(DEBUG_DEBUG,
1176                       ("ctdb operation %u request %u from node %u to %u had an"
1177                        " invalid generation:%u while our generation is:%u\n",
1178                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1179                        hdr->generation, state->generation));
1180                 return;
1181         }
1182
1183
1184         /* read only delegation processing */
1185         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1186          * delegation since we may need to update the record header
1187          */
1188         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1189                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1190                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1191                 struct ctdb_ltdb_header oldheader;
1192                 TDB_DATA key, data, olddata;
1193                 int ret;
1194
1195                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1196                         goto finished_ro;
1197                         return;
1198                 }
1199
1200                 key.dsize = state->c->keylen;
1201                 key.dptr  = state->c->data;
1202                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1203                                      ctdb_call_input_pkt, ctdb, false);
1204                 if (ret == -2) {
1205                         return;
1206                 }
1207                 if (ret != 0) {
1208                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1209                         return;
1210                 }
1211
1212                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1213                 if (ret != 0) {
1214                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1215                         ctdb_ltdb_unlock(ctdb_db, key);
1216                         goto finished_ro;
1217                 }                       
1218
1219                 if (header->rsn <= oldheader.rsn) {
1220                         ctdb_ltdb_unlock(ctdb_db, key);
1221                         goto finished_ro;
1222                 }
1223
1224                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1225                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1226                         ctdb_ltdb_unlock(ctdb_db, key);
1227                         goto finished_ro;
1228                 }
1229
1230                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1231                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1232                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1233                 if (ret != 0) {
1234                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1235                         ctdb_ltdb_unlock(ctdb_db, key);
1236                         goto finished_ro;
1237                 }                       
1238
1239                 ctdb_ltdb_unlock(ctdb_db, key);
1240         }
1241 finished_ro:
1242
1243         state->call->reply_data.dptr = c->data;
1244         state->call->reply_data.dsize = c->datalen;
1245         state->call->status = c->status;
1246
1247         talloc_steal(state, c);
1248
1249         state->state = CTDB_CALL_DONE;
1250         if (state->async.fn) {
1251                 state->async.fn(state);
1252         }
1253 }
1254
1255
1256 /**
1257  * called when a CTDB_REPLY_DMASTER packet comes in
1258  *
1259  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1260  * request packet. It means that the current dmaster wants to give us
1261  * the dmaster role.
1262  */
1263 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1264 {
1265         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1266         struct ctdb_db_context *ctdb_db;
1267         TDB_DATA key, data;
1268         uint32_t record_flags = 0;
1269         size_t len;
1270         int ret;
1271
1272         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1273         if (ctdb_db == NULL) {
1274                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1275                 return;
1276         }
1277
1278         if (hdr->generation != ctdb_db->generation) {
1279                 DEBUG(DEBUG_DEBUG,
1280                       ("ctdb operation %u request %u from node %u to %u had an"
1281                        " invalid generation:%u while our generation is:%u\n",
1282                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1283                        hdr->generation, ctdb_db->generation));
1284                 return;
1285         }
1286
1287         key.dptr = c->data;
1288         key.dsize = c->keylen;
1289         data.dptr = &c->data[key.dsize];
1290         data.dsize = c->datalen;
1291         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1292                 + sizeof(uint32_t);
1293         if (len <= c->hdr.length) {
1294                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1295                        sizeof(record_flags));
1296         }
1297
1298         dmaster_defer_setup(ctdb_db, hdr, key);
1299
1300         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1301                                      ctdb_call_input_pkt, ctdb, false);
1302         if (ret == -2) {
1303                 return;
1304         }
1305         if (ret != 0) {
1306                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1307                 return;
1308         }
1309
1310         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1311 }
1312
1313
1314 /*
1315   called when a CTDB_REPLY_ERROR packet comes in
1316 */
1317 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1318 {
1319         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1320         struct ctdb_call_state *state;
1321
1322         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1323         if (state == NULL) {
1324                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1325                          ctdb->pnn, hdr->reqid));
1326                 return;
1327         }
1328
1329         if (hdr->reqid != state->reqid) {
1330                 /* we found a record  but it was the wrong one */
1331                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1332                 return;
1333         }
1334
1335         talloc_steal(state, c);
1336
1337         state->state  = CTDB_CALL_ERROR;
1338         state->errmsg = (char *)c->msg;
1339         if (state->async.fn) {
1340                 state->async.fn(state);
1341         }
1342 }
1343
1344
1345 /*
1346   destroy a ctdb_call
1347 */
1348 static int ctdb_call_destructor(struct ctdb_call_state *state)
1349 {
1350         DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1351         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1352         return 0;
1353 }
1354
1355
1356 /*
1357   called when a ctdb_call needs to be resent after a reconfigure event
1358 */
1359 static void ctdb_call_resend(struct ctdb_call_state *state)
1360 {
1361         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1362
1363         state->generation = state->ctdb_db->generation;
1364
1365         /* use a new reqid, in case the old reply does eventually come in */
1366         ctdb_reqid_remove(ctdb, state->reqid);
1367         state->reqid = ctdb_reqid_new(ctdb, state);
1368         state->c->hdr.reqid = state->reqid;
1369
1370         /* update the generation count for this request, so its valid with the new vnn_map */
1371         state->c->hdr.generation = state->generation;
1372
1373         /* send the packet to ourselves, it will be redirected appropriately */
1374         state->c->hdr.destnode = ctdb->pnn;
1375
1376         ctdb_queue_packet(ctdb, &state->c->hdr);
1377         DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1378 }
1379
1380 /*
1381   resend all pending calls on recovery
1382  */
1383 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1384 {
1385         struct ctdb_call_state *state, *next;
1386
1387         for (state = ctdb_db->pending_calls; state; state = next) {
1388                 next = state->next;
1389                 ctdb_call_resend(state);
1390         }
1391 }
1392
1393 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1394 {
1395         struct ctdb_db_context *ctdb_db;
1396
1397         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1398                 ctdb_call_resend_db(ctdb_db);
1399         }
1400 }
1401
1402 /*
1403   this allows the caller to setup a async.fn 
1404 */
1405 static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
1406                        struct timeval t, void *private_data)
1407 {
1408         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1409         if (state->async.fn) {
1410                 state->async.fn(state);
1411         }
1412 }       
1413
1414
1415 /*
1416   construct an event driven local ctdb_call
1417
1418   this is used so that locally processed ctdb_call requests are processed
1419   in an event driven manner
1420 */
1421 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1422                                              struct ctdb_call *call,
1423                                              struct ctdb_ltdb_header *header,
1424                                              TDB_DATA *data)
1425 {
1426         struct ctdb_call_state *state;
1427         struct ctdb_context *ctdb = ctdb_db->ctdb;
1428         int ret;
1429
1430         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1431         CTDB_NO_MEMORY_NULL(ctdb, state);
1432
1433         talloc_steal(state, data->dptr);
1434
1435         state->state = CTDB_CALL_DONE;
1436         state->call  = talloc(state, struct ctdb_call);
1437         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1438         *(state->call) = *call;
1439         state->ctdb_db = ctdb_db;
1440
1441         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1442         if (ret != 0) {
1443                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1444         }
1445
1446         event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1447
1448         return state;
1449 }
1450
1451
1452 /*
1453   make a remote ctdb call - async send. Called in daemon context.
1454
1455   This constructs a ctdb_call request and queues it for processing. 
1456   This call never blocks.
1457 */
1458 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1459                                                      struct ctdb_call *call, 
1460                                                      struct ctdb_ltdb_header *header)
1461 {
1462         uint32_t len;
1463         struct ctdb_call_state *state;
1464         struct ctdb_context *ctdb = ctdb_db->ctdb;
1465
1466         if (ctdb->methods == NULL) {
1467                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1468                 return NULL;
1469         }
1470
1471         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1472         CTDB_NO_MEMORY_NULL(ctdb, state);
1473         state->call = talloc(state, struct ctdb_call);
1474         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1475
1476         state->reqid = ctdb_reqid_new(ctdb, state);
1477         state->ctdb_db = ctdb_db;
1478         talloc_set_destructor(state, ctdb_call_destructor);
1479
1480         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1481         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1482                                            struct ctdb_req_call);
1483         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1484         state->c->hdr.destnode  = header->dmaster;
1485
1486         /* this limits us to 16k outstanding messages - not unreasonable */
1487         state->c->hdr.reqid     = state->reqid;
1488         state->c->hdr.generation = ctdb_db->generation;
1489         state->c->flags         = call->flags;
1490         state->c->db_id         = ctdb_db->db_id;
1491         state->c->callid        = call->call_id;
1492         state->c->hopcount      = 0;
1493         state->c->keylen        = call->key.dsize;
1494         state->c->calldatalen   = call->call_data.dsize;
1495         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1496         memcpy(&state->c->data[call->key.dsize], 
1497                call->call_data.dptr, call->call_data.dsize);
1498         *(state->call)              = *call;
1499         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1500         state->call->key.dptr       = &state->c->data[0];
1501
1502         state->state  = CTDB_CALL_WAIT;
1503         state->generation = ctdb_db->generation;
1504
1505         DLIST_ADD(ctdb_db->pending_calls, state);
1506
1507         ctdb_queue_packet(ctdb, &state->c->hdr);
1508
1509         return state;
1510 }
1511
1512 /*
1513   make a remote ctdb call - async recv - called in daemon context
1514
1515   This is called when the program wants to wait for a ctdb_call to complete and get the 
1516   results. This call will block unless the call has already completed.
1517 */
1518 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1519 {
1520         while (state->state < CTDB_CALL_DONE) {
1521                 event_loop_once(state->ctdb_db->ctdb->ev);
1522         }
1523         if (state->state != CTDB_CALL_DONE) {
1524                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1525                 talloc_free(state);
1526                 return -1;
1527         }
1528
1529         if (state->call->reply_data.dsize) {
1530                 call->reply_data.dptr = talloc_memdup(call,
1531                                                       state->call->reply_data.dptr,
1532                                                       state->call->reply_data.dsize);
1533                 call->reply_data.dsize = state->call->reply_data.dsize;
1534         } else {
1535                 call->reply_data.dptr = NULL;
1536                 call->reply_data.dsize = 0;
1537         }
1538         call->status = state->call->status;
1539         talloc_free(state);
1540         return 0;
1541 }
1542
1543
1544 /* 
1545    send a keepalive packet to the other node
1546 */
1547 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1548 {
1549         struct ctdb_req_keepalive *r;
1550         
1551         if (ctdb->methods == NULL) {
1552                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1553                 return;
1554         }
1555
1556         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1557                                     sizeof(struct ctdb_req_keepalive), 
1558                                     struct ctdb_req_keepalive);
1559         CTDB_NO_MEMORY_FATAL(ctdb, r);
1560         r->hdr.destnode  = destnode;
1561         r->hdr.reqid     = 0;
1562         
1563         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1564
1565         ctdb_queue_packet(ctdb, &r->hdr);
1566
1567         talloc_free(r);
1568 }
1569
1570
1571
1572 struct revokechild_deferred_call {
1573         struct ctdb_context *ctdb;
1574         struct ctdb_req_header *hdr;
1575         deferred_requeue_fn fn;
1576         void *ctx;
1577 };
1578
1579 struct revokechild_handle {
1580         struct revokechild_handle *next, *prev;
1581         struct ctdb_context *ctdb;
1582         struct ctdb_db_context *ctdb_db;
1583         struct fd_event *fde;
1584         int status;
1585         int fd[2];
1586         pid_t child;
1587         TDB_DATA key;
1588 };
1589
1590 struct revokechild_requeue_handle {
1591         struct ctdb_context *ctdb;
1592         struct ctdb_req_header *hdr;
1593         deferred_requeue_fn fn;
1594         void *ctx;
1595 };
1596
1597 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
1598                        struct timeval t, void *private_data)
1599 {
1600         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1601
1602         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1603         talloc_free(requeue_handle);
1604 }
1605
1606 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1607 {
1608         struct ctdb_context *ctdb = deferred_call->ctdb;
1609         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1610         struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1611
1612         requeue_handle->ctdb = ctdb;
1613         requeue_handle->hdr  = deferred_call->hdr;
1614         requeue_handle->fn   = deferred_call->fn;
1615         requeue_handle->ctx  = deferred_call->ctx;
1616         talloc_steal(requeue_handle, requeue_handle->hdr);
1617
1618         /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1619         event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1620
1621         return 0;
1622 }
1623
1624
1625 static int revokechild_destructor(struct revokechild_handle *rc)
1626 {
1627         if (rc->fde != NULL) {
1628                 talloc_free(rc->fde);
1629         }
1630
1631         if (rc->fd[0] != -1) {
1632                 close(rc->fd[0]);
1633         }
1634         if (rc->fd[1] != -1) {
1635                 close(rc->fd[1]);
1636         }
1637         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1638
1639         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1640         return 0;
1641 }
1642
1643 static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
1644                              uint16_t flags, void *private_data)
1645 {
1646         struct revokechild_handle *rc = talloc_get_type(private_data, 
1647                                                      struct revokechild_handle);
1648         int ret;
1649         char c;
1650
1651         ret = sys_read(rc->fd[0], &c, 1);
1652         if (ret != 1) {
1653                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1654                 rc->status = -1;
1655                 talloc_free(rc);
1656                 return;
1657         }
1658         if (c != 0) {
1659                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1660                 rc->status = -1;
1661                 talloc_free(rc);
1662                 return;
1663         }
1664
1665         talloc_free(rc);
1666 }
1667
1668 struct ctdb_revoke_state {
1669         struct ctdb_db_context *ctdb_db;
1670         TDB_DATA key;
1671         struct ctdb_ltdb_header *header;
1672         TDB_DATA data;
1673         int count;
1674         int status;
1675         int finished;
1676 };
1677
1678 static void update_record_cb(struct ctdb_client_control_state *state)
1679 {
1680         struct ctdb_revoke_state *revoke_state;
1681         int ret;
1682         int32_t res;
1683
1684         if (state == NULL) {
1685                 return;
1686         }
1687         revoke_state = state->async.private_data;
1688
1689         state->async.fn = NULL;
1690         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1691         if ((ret != 0) || (res != 0)) {
1692                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1693                 revoke_state->status = -1;
1694         }
1695
1696         revoke_state->count--;
1697         if (revoke_state->count <= 0) {
1698                 revoke_state->finished = 1;
1699         }
1700 }
1701
1702 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1703 {
1704         struct ctdb_revoke_state *revoke_state = private_data;
1705         struct ctdb_client_control_state *state;
1706
1707         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1708         if (state == NULL) {
1709                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1710                 revoke_state->status = -1;
1711                 return;
1712         }
1713         state->async.fn           = update_record_cb;
1714         state->async.private_data = revoke_state;
1715
1716         revoke_state->count++;
1717
1718 }
1719
1720 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
1721                               struct timeval yt, void *private_data)
1722 {
1723         struct ctdb_revoke_state *state = private_data;
1724
1725         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1726         state->finished = 1;
1727         state->status   = -1;
1728 }
1729
1730 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1731 {
1732         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1733         struct ctdb_ltdb_header new_header;
1734         TDB_DATA new_data;
1735
1736         state->ctdb_db = ctdb_db;
1737         state->key     = key;
1738         state->header  = header;
1739         state->data    = data;
1740  
1741         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1742
1743         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1744
1745         while (state->finished == 0) {
1746                 event_loop_once(ctdb->ev);
1747         }
1748
1749         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1750                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1751                 talloc_free(state);
1752                 return -1;
1753         }
1754         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1755                 ctdb_ltdb_unlock(ctdb_db, key);
1756                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1757                 talloc_free(state);
1758                 return -1;
1759         }
1760         header->rsn++;
1761         if (new_header.rsn > header->rsn) {
1762                 ctdb_ltdb_unlock(ctdb_db, key);
1763                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1764                 talloc_free(state);
1765                 return -1;
1766         }
1767         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1768                 ctdb_ltdb_unlock(ctdb_db, key);
1769                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1770                 talloc_free(state);
1771                 return -1;
1772         }
1773
1774         /*
1775          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1776          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1777          */
1778         if (state->status == 0) {
1779                 new_header.rsn++;
1780                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1781         } else {
1782                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1783                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1784         }
1785         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1786                 ctdb_ltdb_unlock(ctdb_db, key);
1787                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1788                 talloc_free(state);
1789                 return -1;
1790         }
1791         ctdb_ltdb_unlock(ctdb_db, key);
1792
1793         talloc_free(state);
1794         return 0;
1795 }
1796
1797
1798 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1799 {
1800         TDB_DATA tdata;
1801         struct revokechild_handle *rc;
1802         pid_t parent = getpid();
1803         int ret;
1804
1805         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1806         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1807         header->rsn   -= 1;
1808
1809         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1810                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1811                 return -1;
1812         }
1813
1814         tdata = tdb_fetch(ctdb_db->rottdb, key);
1815         if (tdata.dsize > 0) {
1816                 uint8_t *tmp;
1817
1818                 tmp = tdata.dptr;
1819                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1820                 free(tmp);
1821         }
1822
1823         rc->status    = 0;
1824         rc->ctdb      = ctdb;
1825         rc->ctdb_db   = ctdb_db;
1826         rc->fd[0]     = -1;
1827         rc->fd[1]     = -1;
1828
1829         talloc_set_destructor(rc, revokechild_destructor);
1830
1831         rc->key.dsize = key.dsize;
1832         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1833         if (rc->key.dptr == NULL) {
1834                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1835                 talloc_free(rc);
1836                 return -1;
1837         }
1838
1839         ret = pipe(rc->fd);
1840         if (ret != 0) {
1841                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1842                 talloc_free(rc);
1843                 return -1;
1844         }
1845
1846
1847         rc->child = ctdb_fork(ctdb);
1848         if (rc->child == (pid_t)-1) {
1849                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1850                 talloc_free(rc);
1851                 return -1;
1852         }
1853
1854         if (rc->child == 0) {
1855                 char c = 0;
1856                 close(rc->fd[0]);
1857                 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1858
1859                 ctdb_set_process_name("ctdb_revokechild");
1860                 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1861                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1862                         c = 1;
1863                         goto child_finished;
1864                 }
1865
1866                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1867
1868 child_finished:
1869                 sys_write(rc->fd[1], &c, 1);
1870                 /* make sure we die when our parent dies */
1871                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1872                         sleep(5);
1873                 }
1874                 _exit(0);
1875         }
1876
1877         close(rc->fd[1]);
1878         rc->fd[1] = -1;
1879         set_close_on_exec(rc->fd[0]);
1880
1881         /* This is an active revokechild child process */
1882         DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1883
1884         rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1885                                    EVENT_FD_READ, revokechild_handler,
1886                                    (void *)rc);
1887         if (rc->fde == NULL) {
1888                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1889                 talloc_free(rc);
1890         }
1891         tevent_fd_set_auto_close(rc->fde);
1892
1893         return 0;
1894 }
1895
1896 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1897 {
1898         struct revokechild_handle *rc;
1899         struct revokechild_deferred_call *deferred_call;
1900
1901         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1902                 if (rc->key.dsize == 0) {
1903                         continue;
1904                 }
1905                 if (rc->key.dsize != key.dsize) {
1906                         continue;
1907                 }
1908                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1909                         break;
1910                 }
1911         }
1912
1913         if (rc == NULL) {
1914                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1915                 return -1;
1916         }
1917
1918         deferred_call = talloc(rc, struct revokechild_deferred_call);
1919         if (deferred_call == NULL) {
1920                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1921                 return -1;
1922         }
1923
1924         deferred_call->ctdb = ctdb;
1925         deferred_call->hdr  = hdr;
1926         deferred_call->fn   = fn;
1927         deferred_call->ctx  = call_context;
1928
1929         talloc_set_destructor(deferred_call, deferred_call_destructor);
1930         talloc_steal(deferred_call, hdr);
1931
1932         return 0;
1933 }