ctdb-daemon: Use reqid abstraction
[samba.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30 #include "common/reqid.h"
31
32 struct ctdb_sticky_record {
33         struct ctdb_context *ctdb;
34         struct ctdb_db_context *ctdb_db;
35         TDB_CONTEXT *pindown;
36 };
37
38 /*
39   find the ctdb_db from a db index
40  */
41  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 {
43         struct ctdb_db_context *ctdb_db;
44
45         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
46                 if (ctdb_db->db_id == id) {
47                         break;
48                 }
49         }
50         return ctdb_db;
51 }
52
53 /*
54   a varient of input packet that can be used in lock requeue
55 */
56 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 {
58         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
59         ctdb_input_pkt(ctdb, hdr);
60 }
61
62
63 /*
64   send an error reply
65 */
66 static void ctdb_send_error(struct ctdb_context *ctdb, 
67                             struct ctdb_req_header *hdr, uint32_t status,
68                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
69 static void ctdb_send_error(struct ctdb_context *ctdb, 
70                             struct ctdb_req_header *hdr, uint32_t status,
71                             const char *fmt, ...)
72 {
73         va_list ap;
74         struct ctdb_reply_error *r;
75         char *msg;
76         int msglen, len;
77
78         if (ctdb->methods == NULL) {
79                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
80                 return;
81         }
82
83         va_start(ap, fmt);
84         msg = talloc_vasprintf(ctdb, fmt, ap);
85         if (msg == NULL) {
86                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
87         }
88         va_end(ap);
89
90         msglen = strlen(msg)+1;
91         len = offsetof(struct ctdb_reply_error, msg);
92         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
93                                     struct ctdb_reply_error);
94         CTDB_NO_MEMORY_FATAL(ctdb, r);
95
96         r->hdr.destnode  = hdr->srcnode;
97         r->hdr.reqid     = hdr->reqid;
98         r->status        = status;
99         r->msglen        = msglen;
100         memcpy(&r->msg[0], msg, msglen);
101
102         ctdb_queue_packet(ctdb, &r->hdr);
103
104         talloc_free(msg);
105 }
106
107
108 /**
109  * send a redirect reply
110  *
111  * The logic behind this function is this:
112  *
113  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
114  * to its local ctdb (ctdb_request_call). If the node is not itself
115  * the record's DMASTER, it first redirects the packet to  the
116  * record's LMASTER. The LMASTER then redirects the call packet to
117  * the current DMASTER. Note that this works because of this: When
118  * a record is migrated off a node, then the new DMASTER is stored
119  * in the record's copy on the former DMASTER.
120  */
121 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
122                                     struct ctdb_db_context *ctdb_db,
123                                     TDB_DATA key,
124                                     struct ctdb_req_call *c, 
125                                     struct ctdb_ltdb_header *header)
126 {
127         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
128
129         c->hdr.destnode = lmaster;
130         if (ctdb->pnn == lmaster) {
131                 c->hdr.destnode = header->dmaster;
132         }
133         c->hopcount++;
134
135         if (c->hopcount%100 > 95) {
136                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
137                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
138                         "header->dmaster:%d dst:%d\n",
139                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
140                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
141                         header->dmaster, c->hdr.destnode));
142         }
143
144         ctdb_queue_packet(ctdb, &c->hdr);
145 }
146
147
148 /*
149   send a dmaster reply
150
151   caller must have the chainlock before calling this routine. Caller must be
152   the lmaster
153 */
154 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
155                                     struct ctdb_ltdb_header *header,
156                                     TDB_DATA key, TDB_DATA data,
157                                     uint32_t new_dmaster,
158                                     uint32_t reqid)
159 {
160         struct ctdb_context *ctdb = ctdb_db->ctdb;
161         struct ctdb_reply_dmaster *r;
162         int ret, len;
163         TALLOC_CTX *tmp_ctx;
164
165         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
166                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
167                 return;
168         }
169
170         header->dmaster = new_dmaster;
171         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
172         if (ret != 0) {
173                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
174                 return;
175         }
176
177         if (ctdb->methods == NULL) {
178                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
179                 return;
180         }
181
182         /* put the packet on a temporary context, allowing us to safely free
183            it below even if ctdb_reply_dmaster() has freed it already */
184         tmp_ctx = talloc_new(ctdb);
185
186         /* send the CTDB_REPLY_DMASTER */
187         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
188         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
189                                     struct ctdb_reply_dmaster);
190         CTDB_NO_MEMORY_FATAL(ctdb, r);
191
192         r->hdr.destnode  = new_dmaster;
193         r->hdr.reqid     = reqid;
194         r->hdr.generation = ctdb_db->generation;
195         r->rsn           = header->rsn;
196         r->keylen        = key.dsize;
197         r->datalen       = data.dsize;
198         r->db_id         = ctdb_db->db_id;
199         memcpy(&r->data[0], key.dptr, key.dsize);
200         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
201         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
202
203         ctdb_queue_packet(ctdb, &r->hdr);
204
205         talloc_free(tmp_ctx);
206 }
207
208 /*
209   send a dmaster request (give another node the dmaster for a record)
210
211   This is always sent to the lmaster, which ensures that the lmaster
212   always knows who the dmaster is. The lmaster will then send a
213   CTDB_REPLY_DMASTER to the new dmaster
214 */
215 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
216                                    struct ctdb_req_call *c, 
217                                    struct ctdb_ltdb_header *header,
218                                    TDB_DATA *key, TDB_DATA *data)
219 {
220         struct ctdb_req_dmaster *r;
221         struct ctdb_context *ctdb = ctdb_db->ctdb;
222         int len;
223         uint32_t lmaster = ctdb_lmaster(ctdb, key);
224
225         if (ctdb->methods == NULL) {
226                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
227                 return;
228         }
229
230         if (data->dsize != 0) {
231                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
232         }
233
234         if (lmaster == ctdb->pnn) {
235                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
236                                         c->hdr.srcnode, c->hdr.reqid);
237                 return;
238         }
239         
240         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
241                         + sizeof(uint32_t);
242         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
243                                     struct ctdb_req_dmaster);
244         CTDB_NO_MEMORY_FATAL(ctdb, r);
245         r->hdr.destnode  = lmaster;
246         r->hdr.reqid     = c->hdr.reqid;
247         r->hdr.generation = ctdb_db->generation;
248         r->db_id         = c->db_id;
249         r->rsn           = header->rsn;
250         r->dmaster       = c->hdr.srcnode;
251         r->keylen        = key->dsize;
252         r->datalen       = data->dsize;
253         memcpy(&r->data[0], key->dptr, key->dsize);
254         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
255         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
256
257         header->dmaster = c->hdr.srcnode;
258         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
259                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
260         }
261         
262         ctdb_queue_packet(ctdb, &r->hdr);
263
264         talloc_free(r);
265 }
266
267 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
268                                        struct timeval t, void *private_data)
269 {
270         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
271                                                        struct ctdb_sticky_record);
272
273         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
274         if (sr->pindown != NULL) {
275                 talloc_free(sr->pindown);
276                 sr->pindown = NULL;
277         }
278 }
279
280 static int
281 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
282 {
283         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
284         uint32_t *k;
285         struct ctdb_sticky_record *sr;
286
287         k = ctdb_key_to_idkey(tmp_ctx, key);
288         if (k == NULL) {
289                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
290                 talloc_free(tmp_ctx);
291                 return -1;
292         }
293
294         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
295         if (sr == NULL) {
296                 talloc_free(tmp_ctx);
297                 return 0;
298         }
299
300         talloc_free(tmp_ctx);
301
302         if (sr->pindown == NULL) {
303                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
304                 sr->pindown = talloc_new(sr);
305                 if (sr->pindown == NULL) {
306                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
307                         return -1;
308                 }
309                 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
310         }
311
312         return 0;
313 }
314
315 /*
316   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
318
319   must be called with the chainlock held. This function releases the chainlock
320 */
321 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
322                                 struct ctdb_req_header *hdr,
323                                 TDB_DATA key, TDB_DATA data,
324                                 uint64_t rsn, uint32_t record_flags)
325 {
326         struct ctdb_call_state *state;
327         struct ctdb_context *ctdb = ctdb_db->ctdb;
328         struct ctdb_ltdb_header header;
329         int ret;
330
331         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
332
333         ZERO_STRUCT(header);
334         header.rsn = rsn;
335         header.dmaster = ctdb->pnn;
336         header.flags = record_flags;
337
338         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
339
340         if (state) {
341                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
342                         /*
343                          * We temporarily add the VACUUM_MIGRATED flag to
344                          * the record flags, so that ctdb_ltdb_store can
345                          * decide whether the record should be stored or
346                          * deleted.
347                          */
348                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
349                 }
350         }
351
352         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
353                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
354
355                 ret = ctdb_ltdb_unlock(ctdb_db, key);
356                 if (ret != 0) {
357                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
358                 }
359                 return;
360         }
361
362         /* we just became DMASTER and this database is "sticky",
363            see if the record is flagged as "hot" and set up a pin-down
364            context to stop migrations for a little while if so
365         */
366         if (ctdb_db->sticky) {
367                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
368         }
369
370         if (state == NULL) {
371                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372                          ctdb->pnn, hdr->reqid, hdr->srcnode));
373
374                 ret = ctdb_ltdb_unlock(ctdb_db, key);
375                 if (ret != 0) {
376                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
377                 }
378                 return;
379         }
380
381         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
382                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
383
384                 ret = ctdb_ltdb_unlock(ctdb_db, key);
385                 if (ret != 0) {
386                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
387                 }
388                 return;
389         }
390
391         if (hdr->reqid != state->reqid) {
392                 /* we found a record  but it was the wrong one */
393                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
394
395                 ret = ctdb_ltdb_unlock(ctdb_db, key);
396                 if (ret != 0) {
397                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
398                 }
399                 return;
400         }
401
402         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
403
404         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
405         if (ret != 0) {
406                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
407         }
408
409         state->state = CTDB_CALL_DONE;
410         if (state->async.fn) {
411                 state->async.fn(state);
412         }
413 }
414
415 struct dmaster_defer_call {
416         struct dmaster_defer_call *next, *prev;
417         struct ctdb_context *ctdb;
418         struct ctdb_req_header *hdr;
419 };
420
421 struct dmaster_defer_queue {
422         struct ctdb_db_context *ctdb_db;
423         uint32_t generation;
424         struct dmaster_defer_call *deferred_calls;
425 };
426
427 static void dmaster_defer_reprocess(struct tevent_context *ev,
428                                     struct tevent_timer *te,
429                                     struct timeval t,
430                                     void *private_data)
431 {
432         struct dmaster_defer_call *call = talloc_get_type(
433                 private_data, struct dmaster_defer_call);
434
435         ctdb_input_pkt(call->ctdb, call->hdr);
436         talloc_free(call);
437 }
438
439 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
440 {
441         /* Ignore requests, if database recovery happens in-between. */
442         if (ddq->generation != ddq->ctdb_db->generation) {
443                 return 0;
444         }
445
446         while (ddq->deferred_calls != NULL) {
447                 struct dmaster_defer_call *call = ddq->deferred_calls;
448
449                 DLIST_REMOVE(ddq->deferred_calls, call);
450
451                 talloc_steal(call->ctdb, call);
452                 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
453                                  dmaster_defer_reprocess, call);
454         }
455         return 0;
456 }
457
458 static void *insert_ddq_callback(void *parm, void *data)
459 {
460         if (data) {
461                 talloc_free(data);
462         }
463         return parm;
464 }
465
466 /**
467  * This function is used to reigster a key in database that needs to be updated.
468  * Any requests for that key should get deferred till this is completed.
469  */
470 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
471                                struct ctdb_req_header *hdr,
472                                TDB_DATA key)
473 {
474         uint32_t *k;
475         struct dmaster_defer_queue *ddq;
476
477         k = ctdb_key_to_idkey(hdr, key);
478         if (k == NULL) {
479                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
480                 return -1;
481         }
482
483         /* Already exists */
484         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
485         if (ddq != NULL) {
486                 if (ddq->generation == ctdb_db->generation) {
487                         talloc_free(k);
488                         return 0;
489                 }
490
491                 /* Recovery ocurred - get rid of old queue. All the deferred
492                  * requests will be resent anyway from ctdb_call_resend_db.
493                  */
494                 talloc_free(ddq);
495         }
496
497         ddq = talloc(hdr, struct dmaster_defer_queue);
498         if (ddq == NULL) {
499                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
500                 talloc_free(k);
501                 return -1;
502         }
503         ddq->ctdb_db = ctdb_db;
504         ddq->generation = hdr->generation;
505         ddq->deferred_calls = NULL;
506
507         trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
508                                     insert_ddq_callback, ddq);
509         talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
510
511         talloc_free(k);
512         return 0;
513 }
514
515 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
516                              struct ctdb_req_header *hdr,
517                              TDB_DATA key)
518 {
519         struct dmaster_defer_queue *ddq;
520         struct dmaster_defer_call *call;
521         uint32_t *k;
522
523         k = ctdb_key_to_idkey(hdr, key);
524         if (k == NULL) {
525                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
526                 return -1;
527         }
528
529         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
530         if (ddq == NULL) {
531                 talloc_free(k);
532                 return -1;
533         }
534
535         talloc_free(k);
536
537         if (ddq->generation != hdr->generation) {
538                 talloc_set_destructor(ddq, NULL);
539                 talloc_free(ddq);
540                 return -1;
541         }
542
543         call = talloc(ddq, struct dmaster_defer_call);
544         if (call == NULL) {
545                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
546                 return -1;
547         }
548
549         call->ctdb = ctdb_db->ctdb;
550         call->hdr = talloc_steal(call, hdr);
551
552         DLIST_ADD_END(ddq->deferred_calls, call, NULL);
553
554         return 0;
555 }
556
557 /*
558   called when a CTDB_REQ_DMASTER packet comes in
559
560   this comes into the lmaster for a record when the current dmaster
561   wants to give up the dmaster role and give it to someone else
562 */
563 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
564 {
565         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
566         TDB_DATA key, data, data2;
567         struct ctdb_ltdb_header header;
568         struct ctdb_db_context *ctdb_db;
569         uint32_t record_flags = 0;
570         size_t len;
571         int ret;
572
573         ctdb_db = find_ctdb_db(ctdb, c->db_id);
574         if (!ctdb_db) {
575                 ctdb_send_error(ctdb, hdr, -1,
576                                 "Unknown database in request. db_id==0x%08x",
577                                 c->db_id);
578                 return;
579         }
580
581         if (hdr->generation != ctdb_db->generation) {
582                 DEBUG(DEBUG_DEBUG,
583                       ("ctdb operation %u request %u from node %u to %u had an"
584                        " invalid generation:%u while our generation is:%u\n",
585                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
586                        hdr->generation, ctdb_db->generation));
587                 return;
588         }
589
590         key.dptr = c->data;
591         key.dsize = c->keylen;
592         data.dptr = c->data + c->keylen;
593         data.dsize = c->datalen;
594         len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
595                         + sizeof(uint32_t);
596         if (len <= c->hdr.length) {
597                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
598                        sizeof(record_flags));
599         }
600
601         dmaster_defer_setup(ctdb_db, hdr, key);
602
603         /* fetch the current record */
604         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
605                                            ctdb_call_input_pkt, ctdb, false);
606         if (ret == -1) {
607                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
608                 return;
609         }
610         if (ret == -2) {
611                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
612                 return;
613         }
614
615         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
616                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
617                          ctdb->pnn, ctdb_lmaster(ctdb, &key), 
618                          hdr->generation, ctdb->vnn_map->generation));
619                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
620         }
621
622         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
623                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
624
625         /* its a protocol error if the sending node is not the current dmaster */
626         if (header.dmaster != hdr->srcnode) {
627                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
628                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
629                          ctdb_db->db_id, hdr->generation, ctdb_db->generation,
630                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
631                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
632                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
633                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
634
635                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
636                         ctdb_ltdb_unlock(ctdb_db, key);
637                         return;
638                 }
639         }
640
641         if (header.rsn > c->rsn) {
642                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
643                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
644                          ctdb_db->db_id, hdr->generation, ctdb_db->generation,
645                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
646         }
647
648         /* use the rsn from the sending node */
649         header.rsn = c->rsn;
650
651         /* store the record flags from the sending node */
652         header.flags = record_flags;
653
654         /* check if the new dmaster is the lmaster, in which case we
655            skip the dmaster reply */
656         if (c->dmaster == ctdb->pnn) {
657                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
658         } else {
659                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
660
661                 ret = ctdb_ltdb_unlock(ctdb_db, key);
662                 if (ret != 0) {
663                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
664                 }
665         }
666 }
667
668 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
669                                        struct timeval t, void *private_data)
670 {
671         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
672                                                        struct ctdb_sticky_record);
673         talloc_free(sr);
674 }
675
676 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
677 {
678         if (data) {
679                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
680                 talloc_free(data);
681         }
682         return parm;
683 }
684
685 static int
686 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
687 {
688         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
689         uint32_t *k;
690         struct ctdb_sticky_record *sr;
691
692         k = ctdb_key_to_idkey(tmp_ctx, key);
693         if (k == NULL) {
694                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
695                 talloc_free(tmp_ctx);
696                 return -1;
697         }
698
699         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
700         if (sr != NULL) {
701                 talloc_free(tmp_ctx);
702                 return 0;
703         }
704
705         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
706         if (sr == NULL) {
707                 talloc_free(tmp_ctx);
708                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
709                 return -1;
710         }
711
712         sr->ctdb    = ctdb;
713         sr->ctdb_db = ctdb_db;
714         sr->pindown = NULL;
715
716         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
717                          ctdb->tunable.sticky_duration,
718                          ctdb_db->db_name, ctdb_hash(&key)));
719
720         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
721
722         event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
723
724         talloc_free(tmp_ctx);
725         return 0;
726 }
727
728 struct pinned_down_requeue_handle {
729         struct ctdb_context *ctdb;
730         struct ctdb_req_header *hdr;
731 };
732
733 struct pinned_down_deferred_call {
734         struct ctdb_context *ctdb;
735         struct ctdb_req_header *hdr;
736 };
737
738 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
739                        struct timeval t, void *private_data)
740 {
741         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
742         struct ctdb_context *ctdb = handle->ctdb;
743
744         talloc_steal(ctdb, handle->hdr);
745         ctdb_call_input_pkt(ctdb, handle->hdr);
746
747         talloc_free(handle);
748 }
749
750 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
751 {
752         struct ctdb_context *ctdb = pinned_down->ctdb;
753         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
754
755         handle->ctdb = pinned_down->ctdb;
756         handle->hdr  = pinned_down->hdr;
757         talloc_steal(handle, handle->hdr);
758
759         event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
760
761         return 0;
762 }
763
764 static int
765 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
766 {
767         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
768         uint32_t *k;
769         struct ctdb_sticky_record *sr;
770         struct pinned_down_deferred_call *pinned_down;
771
772         k = ctdb_key_to_idkey(tmp_ctx, key);
773         if (k == NULL) {
774                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
775                 talloc_free(tmp_ctx);
776                 return -1;
777         }
778
779         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
780         if (sr == NULL) {
781                 talloc_free(tmp_ctx);
782                 return -1;
783         }
784
785         talloc_free(tmp_ctx);
786
787         if (sr->pindown == NULL) {
788                 return -1;
789         }
790         
791         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
792         if (pinned_down == NULL) {
793                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
794                 return -1;
795         }
796
797         pinned_down->ctdb = ctdb;
798         pinned_down->hdr  = hdr;
799
800         talloc_set_destructor(pinned_down, pinned_down_destructor);
801         talloc_steal(pinned_down, hdr);
802
803         return 0;
804 }
805
806 static void
807 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
808 {
809         int i, id;
810
811         /* smallest value is always at index 0 */
812         if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
813                 return;
814         }
815
816         /* see if we already know this key */
817         for (i = 0; i < MAX_HOT_KEYS; i++) {
818                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
819                         continue;
820                 }
821                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
822                         continue;
823                 }
824                 /* found an entry for this key */
825                 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
826                         return;
827                 }
828                 ctdb_db->statistics.hot_keys[i].count = hopcount;
829                 goto sort_keys;
830         }
831
832         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
833                 id = ctdb_db->statistics.num_hot_keys;
834                 ctdb_db->statistics.num_hot_keys++;
835         } else {
836                 id = 0;
837         }
838
839         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
840                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
841         }
842         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
843         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
844         ctdb_db->statistics.hot_keys[id].count = hopcount;
845         DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
846                             ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
847
848 sort_keys:
849         for (i = 1; i < MAX_HOT_KEYS; i++) {
850                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
851                         continue;
852                 }
853                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
854                         hopcount = ctdb_db->statistics.hot_keys[i].count;
855                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
856                         ctdb_db->statistics.hot_keys[0].count = hopcount;
857
858                         key = ctdb_db->statistics.hot_keys[i].key;
859                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
860                         ctdb_db->statistics.hot_keys[0].key = key;
861                 }
862         }
863 }
864
865 /*
866   called when a CTDB_REQ_CALL packet comes in
867 */
868 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
869 {
870         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
871         TDB_DATA data;
872         struct ctdb_reply_call *r;
873         int ret, len;
874         struct ctdb_ltdb_header header;
875         struct ctdb_call *call;
876         struct ctdb_db_context *ctdb_db;
877         int tmp_count, bucket;
878
879         if (ctdb->methods == NULL) {
880                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
881                 return;
882         }
883
884         ctdb_db = find_ctdb_db(ctdb, c->db_id);
885         if (!ctdb_db) {
886                 ctdb_send_error(ctdb, hdr, -1,
887                                 "Unknown database in request. db_id==0x%08x",
888                                 c->db_id);
889                 return;
890         }
891
892         if (hdr->generation != ctdb_db->generation) {
893                 DEBUG(DEBUG_DEBUG,
894                       ("ctdb operation %u request %u from node %u to %u had an"
895                        " invalid generation:%u while our generation is:%u\n",
896                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
897                        hdr->generation, ctdb_db->generation));
898                 return;
899         }
900
901         call = talloc(hdr, struct ctdb_call);
902         CTDB_NO_MEMORY_FATAL(ctdb, call);
903
904         call->call_id  = c->callid;
905         call->key.dptr = c->data;
906         call->key.dsize = c->keylen;
907         call->call_data.dptr = c->data + c->keylen;
908         call->call_data.dsize = c->calldatalen;
909         call->reply_data.dptr  = NULL;
910         call->reply_data.dsize = 0;
911
912
913         /* If this record is pinned down we should defer the
914            request until the pindown times out
915         */
916         if (ctdb_db->sticky) {
917                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
918                         DEBUG(DEBUG_WARNING,
919                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
920                         talloc_free(call);
921                         return;
922                 }
923         }
924
925         if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
926                 talloc_free(call);
927                 return;
928         }
929
930         /* determine if we are the dmaster for this key. This also
931            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
932            if the call will be answered locally */
933
934         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
935                                            ctdb_call_input_pkt, ctdb, false);
936         if (ret == -1) {
937                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
938                 talloc_free(call);
939                 return;
940         }
941         if (ret == -2) {
942                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
943                 talloc_free(call);
944                 return;
945         }
946
947         /* Dont do READONLY if we dont have a tracking database */
948         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
949                 c->flags &= ~CTDB_WANT_READONLY;
950         }
951
952         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
953                 header.flags &= ~CTDB_REC_RO_FLAGS;
954                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
955                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
956                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
957                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
958                 }
959                 /* and clear out the tracking data */
960                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
961                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
962                 }
963         }
964
965         /* if we are revoking, we must defer all other calls until the revoke
966          * had completed.
967          */
968         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
969                 talloc_free(data.dptr);
970                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
971
972                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
973                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
974                 }
975                 talloc_free(call);
976                 return;
977         }
978
979         /*
980          * If we are not the dmaster and are not hosting any delegations,
981          * then we redirect the request to the node than can answer it
982          * (the lmaster or the dmaster).
983          */
984         if ((header.dmaster != ctdb->pnn) 
985             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
986                 talloc_free(data.dptr);
987                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
988
989                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
990                 if (ret != 0) {
991                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
992                 }
993                 talloc_free(call);
994                 return;
995         }
996
997         if ( (!(c->flags & CTDB_WANT_READONLY))
998         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
999                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
1000                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1001                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1002                 }
1003                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1004
1005                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1006                         ctdb_fatal(ctdb, "Failed to start record revoke");
1007                 }
1008                 talloc_free(data.dptr);
1009
1010                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1011                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1012                 }
1013                 talloc_free(call);
1014
1015                 return;
1016         }               
1017
1018         /* If this is the first request for delegation. bump rsn and set
1019          * the delegations flag
1020          */
1021         if ((c->flags & CTDB_WANT_READONLY)
1022         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1023         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1024                 header.rsn     += 3;
1025                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
1026                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1027                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1028                 }
1029         }
1030         if ((c->flags & CTDB_WANT_READONLY) 
1031         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1032                 TDB_DATA tdata;
1033
1034                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1035                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1036                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1037                 }
1038                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1039                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
1040                 }
1041                 free(tdata.dptr);
1042
1043                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1044                 if (ret != 0) {
1045                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1046                 }
1047
1048                 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1049                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1050                                             struct ctdb_reply_call);
1051                 CTDB_NO_MEMORY_FATAL(ctdb, r);
1052                 r->hdr.destnode  = c->hdr.srcnode;
1053                 r->hdr.reqid     = c->hdr.reqid;
1054                 r->hdr.generation = ctdb_db->generation;
1055                 r->status        = 0;
1056                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
1057                 header.rsn      -= 2;
1058                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
1059                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1060                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1061
1062                 if (data.dsize) {
1063                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1064                 }
1065
1066                 ctdb_queue_packet(ctdb, &r->hdr);
1067                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1068                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1069
1070                 talloc_free(r);
1071                 talloc_free(call);
1072                 return;
1073         }
1074
1075         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1076         tmp_count = c->hopcount;
1077         bucket = 0;
1078         while (tmp_count) {
1079                 tmp_count >>= 2;
1080                 bucket++;
1081         }
1082         if (bucket >= MAX_COUNT_BUCKETS) {
1083                 bucket = MAX_COUNT_BUCKETS - 1;
1084         }
1085         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1086         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1087         ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1088
1089         /* If this database supports sticky records, then check if the
1090            hopcount is big. If it is it means the record is hot and we
1091            should make it sticky.
1092         */
1093         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1094                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1095         }
1096
1097
1098         /* Try if possible to migrate the record off to the caller node.
1099          * From the clients perspective a fetch of the data is just as 
1100          * expensive as a migration.
1101          */
1102         if (c->hdr.srcnode != ctdb->pnn) {
1103                 if (ctdb_db->persistent_state) {
1104                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1105                               " of key %s while transaction is active\n",
1106                               (char *)call->key.dptr));
1107                 } else {
1108                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1109                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1110                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1111                         talloc_free(data.dptr);
1112
1113                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1114                         if (ret != 0) {
1115                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1116                         }
1117                 }
1118                 talloc_free(call);
1119                 return;
1120         }
1121
1122         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1123         if (ret != 0) {
1124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1125                 call->status = -1;
1126         }
1127
1128         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1129         if (ret != 0) {
1130                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1131         }
1132
1133         len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
1134         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1135                                     struct ctdb_reply_call);
1136         CTDB_NO_MEMORY_FATAL(ctdb, r);
1137         r->hdr.destnode  = hdr->srcnode;
1138         r->hdr.reqid     = hdr->reqid;
1139         r->hdr.generation = ctdb_db->generation;
1140         r->status        = call->status;
1141         r->datalen       = call->reply_data.dsize;
1142         if (call->reply_data.dsize) {
1143                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1144         }
1145
1146         ctdb_queue_packet(ctdb, &r->hdr);
1147
1148         talloc_free(r);
1149         talloc_free(call);
1150 }
1151
1152 /**
1153  * called when a CTDB_REPLY_CALL packet comes in
1154  *
1155  * This packet comes in response to a CTDB_REQ_CALL request packet. It
1156  * contains any reply data from the call
1157  */
1158 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1159 {
1160         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1161         struct ctdb_call_state *state;
1162
1163         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1164         if (state == NULL) {
1165                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1166                 return;
1167         }
1168
1169         if (hdr->reqid != state->reqid) {
1170                 /* we found a record  but it was the wrong one */
1171                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1172                 return;
1173         }
1174
1175         if (hdr->generation != state->generation) {
1176                 DEBUG(DEBUG_DEBUG,
1177                       ("ctdb operation %u request %u from node %u to %u had an"
1178                        " invalid generation:%u while our generation is:%u\n",
1179                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1180                        hdr->generation, state->generation));
1181                 return;
1182         }
1183
1184
1185         /* read only delegation processing */
1186         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1187          * delegation since we may need to update the record header
1188          */
1189         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1190                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1191                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1192                 struct ctdb_ltdb_header oldheader;
1193                 TDB_DATA key, data, olddata;
1194                 int ret;
1195
1196                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1197                         goto finished_ro;
1198                         return;
1199                 }
1200
1201                 key.dsize = state->c->keylen;
1202                 key.dptr  = state->c->data;
1203                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1204                                      ctdb_call_input_pkt, ctdb, false);
1205                 if (ret == -2) {
1206                         return;
1207                 }
1208                 if (ret != 0) {
1209                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1210                         return;
1211                 }
1212
1213                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1214                 if (ret != 0) {
1215                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1216                         ctdb_ltdb_unlock(ctdb_db, key);
1217                         goto finished_ro;
1218                 }                       
1219
1220                 if (header->rsn <= oldheader.rsn) {
1221                         ctdb_ltdb_unlock(ctdb_db, key);
1222                         goto finished_ro;
1223                 }
1224
1225                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1226                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1227                         ctdb_ltdb_unlock(ctdb_db, key);
1228                         goto finished_ro;
1229                 }
1230
1231                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1232                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1233                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1234                 if (ret != 0) {
1235                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1236                         ctdb_ltdb_unlock(ctdb_db, key);
1237                         goto finished_ro;
1238                 }                       
1239
1240                 ctdb_ltdb_unlock(ctdb_db, key);
1241         }
1242 finished_ro:
1243
1244         state->call->reply_data.dptr = c->data;
1245         state->call->reply_data.dsize = c->datalen;
1246         state->call->status = c->status;
1247
1248         talloc_steal(state, c);
1249
1250         state->state = CTDB_CALL_DONE;
1251         if (state->async.fn) {
1252                 state->async.fn(state);
1253         }
1254 }
1255
1256
1257 /**
1258  * called when a CTDB_REPLY_DMASTER packet comes in
1259  *
1260  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1261  * request packet. It means that the current dmaster wants to give us
1262  * the dmaster role.
1263  */
1264 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1265 {
1266         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1267         struct ctdb_db_context *ctdb_db;
1268         TDB_DATA key, data;
1269         uint32_t record_flags = 0;
1270         size_t len;
1271         int ret;
1272
1273         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1274         if (ctdb_db == NULL) {
1275                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1276                 return;
1277         }
1278
1279         if (hdr->generation != ctdb_db->generation) {
1280                 DEBUG(DEBUG_DEBUG,
1281                       ("ctdb operation %u request %u from node %u to %u had an"
1282                        " invalid generation:%u while our generation is:%u\n",
1283                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1284                        hdr->generation, ctdb_db->generation));
1285                 return;
1286         }
1287
1288         key.dptr = c->data;
1289         key.dsize = c->keylen;
1290         data.dptr = &c->data[key.dsize];
1291         data.dsize = c->datalen;
1292         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1293                 + sizeof(uint32_t);
1294         if (len <= c->hdr.length) {
1295                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1296                        sizeof(record_flags));
1297         }
1298
1299         dmaster_defer_setup(ctdb_db, hdr, key);
1300
1301         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1302                                      ctdb_call_input_pkt, ctdb, false);
1303         if (ret == -2) {
1304                 return;
1305         }
1306         if (ret != 0) {
1307                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1308                 return;
1309         }
1310
1311         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1312 }
1313
1314
1315 /*
1316   called when a CTDB_REPLY_ERROR packet comes in
1317 */
1318 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1319 {
1320         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1321         struct ctdb_call_state *state;
1322
1323         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1324         if (state == NULL) {
1325                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1326                          ctdb->pnn, hdr->reqid));
1327                 return;
1328         }
1329
1330         if (hdr->reqid != state->reqid) {
1331                 /* we found a record  but it was the wrong one */
1332                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1333                 return;
1334         }
1335
1336         talloc_steal(state, c);
1337
1338         state->state  = CTDB_CALL_ERROR;
1339         state->errmsg = (char *)c->msg;
1340         if (state->async.fn) {
1341                 state->async.fn(state);
1342         }
1343 }
1344
1345
1346 /*
1347   destroy a ctdb_call
1348 */
1349 static int ctdb_call_destructor(struct ctdb_call_state *state)
1350 {
1351         DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1352         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1353         return 0;
1354 }
1355
1356
1357 /*
1358   called when a ctdb_call needs to be resent after a reconfigure event
1359 */
1360 static void ctdb_call_resend(struct ctdb_call_state *state)
1361 {
1362         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1363
1364         state->generation = state->ctdb_db->generation;
1365
1366         /* use a new reqid, in case the old reply does eventually come in */
1367         reqid_remove(ctdb->idr, state->reqid);
1368         state->reqid = reqid_new(ctdb->idr, state);
1369         state->c->hdr.reqid = state->reqid;
1370
1371         /* update the generation count for this request, so its valid with the new vnn_map */
1372         state->c->hdr.generation = state->generation;
1373
1374         /* send the packet to ourselves, it will be redirected appropriately */
1375         state->c->hdr.destnode = ctdb->pnn;
1376
1377         ctdb_queue_packet(ctdb, &state->c->hdr);
1378         DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1379 }
1380
1381 /*
1382   resend all pending calls on recovery
1383  */
1384 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1385 {
1386         struct ctdb_call_state *state, *next;
1387
1388         for (state = ctdb_db->pending_calls; state; state = next) {
1389                 next = state->next;
1390                 ctdb_call_resend(state);
1391         }
1392 }
1393
1394 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1395 {
1396         struct ctdb_db_context *ctdb_db;
1397
1398         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1399                 ctdb_call_resend_db(ctdb_db);
1400         }
1401 }
1402
1403 /*
1404   this allows the caller to setup a async.fn 
1405 */
1406 static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
1407                        struct timeval t, void *private_data)
1408 {
1409         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1410         if (state->async.fn) {
1411                 state->async.fn(state);
1412         }
1413 }       
1414
1415
1416 /*
1417   construct an event driven local ctdb_call
1418
1419   this is used so that locally processed ctdb_call requests are processed
1420   in an event driven manner
1421 */
1422 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1423                                              struct ctdb_call *call,
1424                                              struct ctdb_ltdb_header *header,
1425                                              TDB_DATA *data)
1426 {
1427         struct ctdb_call_state *state;
1428         struct ctdb_context *ctdb = ctdb_db->ctdb;
1429         int ret;
1430
1431         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1432         CTDB_NO_MEMORY_NULL(ctdb, state);
1433
1434         talloc_steal(state, data->dptr);
1435
1436         state->state = CTDB_CALL_DONE;
1437         state->call  = talloc(state, struct ctdb_call);
1438         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1439         *(state->call) = *call;
1440         state->ctdb_db = ctdb_db;
1441
1442         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1443         if (ret != 0) {
1444                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1445         }
1446
1447         event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1448
1449         return state;
1450 }
1451
1452
1453 /*
1454   make a remote ctdb call - async send. Called in daemon context.
1455
1456   This constructs a ctdb_call request and queues it for processing. 
1457   This call never blocks.
1458 */
1459 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1460                                                      struct ctdb_call *call, 
1461                                                      struct ctdb_ltdb_header *header)
1462 {
1463         uint32_t len;
1464         struct ctdb_call_state *state;
1465         struct ctdb_context *ctdb = ctdb_db->ctdb;
1466
1467         if (ctdb->methods == NULL) {
1468                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1469                 return NULL;
1470         }
1471
1472         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1473         CTDB_NO_MEMORY_NULL(ctdb, state);
1474         state->call = talloc(state, struct ctdb_call);
1475         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1476
1477         state->reqid = reqid_new(ctdb->idr, state);
1478         state->ctdb_db = ctdb_db;
1479         talloc_set_destructor(state, ctdb_call_destructor);
1480
1481         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1482         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1483                                            struct ctdb_req_call);
1484         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1485         state->c->hdr.destnode  = header->dmaster;
1486
1487         /* this limits us to 16k outstanding messages - not unreasonable */
1488         state->c->hdr.reqid     = state->reqid;
1489         state->c->hdr.generation = ctdb_db->generation;
1490         state->c->flags         = call->flags;
1491         state->c->db_id         = ctdb_db->db_id;
1492         state->c->callid        = call->call_id;
1493         state->c->hopcount      = 0;
1494         state->c->keylen        = call->key.dsize;
1495         state->c->calldatalen   = call->call_data.dsize;
1496         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1497         memcpy(&state->c->data[call->key.dsize], 
1498                call->call_data.dptr, call->call_data.dsize);
1499         *(state->call)              = *call;
1500         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1501         state->call->key.dptr       = &state->c->data[0];
1502
1503         state->state  = CTDB_CALL_WAIT;
1504         state->generation = ctdb_db->generation;
1505
1506         DLIST_ADD(ctdb_db->pending_calls, state);
1507
1508         ctdb_queue_packet(ctdb, &state->c->hdr);
1509
1510         return state;
1511 }
1512
1513 /*
1514   make a remote ctdb call - async recv - called in daemon context
1515
1516   This is called when the program wants to wait for a ctdb_call to complete and get the 
1517   results. This call will block unless the call has already completed.
1518 */
1519 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1520 {
1521         while (state->state < CTDB_CALL_DONE) {
1522                 event_loop_once(state->ctdb_db->ctdb->ev);
1523         }
1524         if (state->state != CTDB_CALL_DONE) {
1525                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1526                 talloc_free(state);
1527                 return -1;
1528         }
1529
1530         if (state->call->reply_data.dsize) {
1531                 call->reply_data.dptr = talloc_memdup(call,
1532                                                       state->call->reply_data.dptr,
1533                                                       state->call->reply_data.dsize);
1534                 call->reply_data.dsize = state->call->reply_data.dsize;
1535         } else {
1536                 call->reply_data.dptr = NULL;
1537                 call->reply_data.dsize = 0;
1538         }
1539         call->status = state->call->status;
1540         talloc_free(state);
1541         return 0;
1542 }
1543
1544
1545 /* 
1546    send a keepalive packet to the other node
1547 */
1548 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1549 {
1550         struct ctdb_req_keepalive *r;
1551         
1552         if (ctdb->methods == NULL) {
1553                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1554                 return;
1555         }
1556
1557         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1558                                     sizeof(struct ctdb_req_keepalive), 
1559                                     struct ctdb_req_keepalive);
1560         CTDB_NO_MEMORY_FATAL(ctdb, r);
1561         r->hdr.destnode  = destnode;
1562         r->hdr.reqid     = 0;
1563         
1564         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1565
1566         ctdb_queue_packet(ctdb, &r->hdr);
1567
1568         talloc_free(r);
1569 }
1570
1571
1572
1573 struct revokechild_deferred_call {
1574         struct ctdb_context *ctdb;
1575         struct ctdb_req_header *hdr;
1576         deferred_requeue_fn fn;
1577         void *ctx;
1578 };
1579
1580 struct revokechild_handle {
1581         struct revokechild_handle *next, *prev;
1582         struct ctdb_context *ctdb;
1583         struct ctdb_db_context *ctdb_db;
1584         struct fd_event *fde;
1585         int status;
1586         int fd[2];
1587         pid_t child;
1588         TDB_DATA key;
1589 };
1590
1591 struct revokechild_requeue_handle {
1592         struct ctdb_context *ctdb;
1593         struct ctdb_req_header *hdr;
1594         deferred_requeue_fn fn;
1595         void *ctx;
1596 };
1597
1598 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
1599                        struct timeval t, void *private_data)
1600 {
1601         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1602
1603         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1604         talloc_free(requeue_handle);
1605 }
1606
1607 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1608 {
1609         struct ctdb_context *ctdb = deferred_call->ctdb;
1610         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1611         struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1612
1613         requeue_handle->ctdb = ctdb;
1614         requeue_handle->hdr  = deferred_call->hdr;
1615         requeue_handle->fn   = deferred_call->fn;
1616         requeue_handle->ctx  = deferred_call->ctx;
1617         talloc_steal(requeue_handle, requeue_handle->hdr);
1618
1619         /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1620         event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1621
1622         return 0;
1623 }
1624
1625
1626 static int revokechild_destructor(struct revokechild_handle *rc)
1627 {
1628         if (rc->fde != NULL) {
1629                 talloc_free(rc->fde);
1630         }
1631
1632         if (rc->fd[0] != -1) {
1633                 close(rc->fd[0]);
1634         }
1635         if (rc->fd[1] != -1) {
1636                 close(rc->fd[1]);
1637         }
1638         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1639
1640         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1641         return 0;
1642 }
1643
1644 static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
1645                              uint16_t flags, void *private_data)
1646 {
1647         struct revokechild_handle *rc = talloc_get_type(private_data, 
1648                                                      struct revokechild_handle);
1649         int ret;
1650         char c;
1651
1652         ret = sys_read(rc->fd[0], &c, 1);
1653         if (ret != 1) {
1654                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1655                 rc->status = -1;
1656                 talloc_free(rc);
1657                 return;
1658         }
1659         if (c != 0) {
1660                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1661                 rc->status = -1;
1662                 talloc_free(rc);
1663                 return;
1664         }
1665
1666         talloc_free(rc);
1667 }
1668
1669 struct ctdb_revoke_state {
1670         struct ctdb_db_context *ctdb_db;
1671         TDB_DATA key;
1672         struct ctdb_ltdb_header *header;
1673         TDB_DATA data;
1674         int count;
1675         int status;
1676         int finished;
1677 };
1678
1679 static void update_record_cb(struct ctdb_client_control_state *state)
1680 {
1681         struct ctdb_revoke_state *revoke_state;
1682         int ret;
1683         int32_t res;
1684
1685         if (state == NULL) {
1686                 return;
1687         }
1688         revoke_state = state->async.private_data;
1689
1690         state->async.fn = NULL;
1691         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1692         if ((ret != 0) || (res != 0)) {
1693                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1694                 revoke_state->status = -1;
1695         }
1696
1697         revoke_state->count--;
1698         if (revoke_state->count <= 0) {
1699                 revoke_state->finished = 1;
1700         }
1701 }
1702
1703 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1704 {
1705         struct ctdb_revoke_state *revoke_state = private_data;
1706         struct ctdb_client_control_state *state;
1707
1708         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1709         if (state == NULL) {
1710                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1711                 revoke_state->status = -1;
1712                 return;
1713         }
1714         state->async.fn           = update_record_cb;
1715         state->async.private_data = revoke_state;
1716
1717         revoke_state->count++;
1718
1719 }
1720
1721 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
1722                               struct timeval yt, void *private_data)
1723 {
1724         struct ctdb_revoke_state *state = private_data;
1725
1726         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1727         state->finished = 1;
1728         state->status   = -1;
1729 }
1730
1731 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1732 {
1733         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1734         struct ctdb_ltdb_header new_header;
1735         TDB_DATA new_data;
1736
1737         state->ctdb_db = ctdb_db;
1738         state->key     = key;
1739         state->header  = header;
1740         state->data    = data;
1741  
1742         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1743
1744         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1745
1746         while (state->finished == 0) {
1747                 event_loop_once(ctdb->ev);
1748         }
1749
1750         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1751                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1752                 talloc_free(state);
1753                 return -1;
1754         }
1755         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1756                 ctdb_ltdb_unlock(ctdb_db, key);
1757                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1758                 talloc_free(state);
1759                 return -1;
1760         }
1761         header->rsn++;
1762         if (new_header.rsn > header->rsn) {
1763                 ctdb_ltdb_unlock(ctdb_db, key);
1764                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1765                 talloc_free(state);
1766                 return -1;
1767         }
1768         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1769                 ctdb_ltdb_unlock(ctdb_db, key);
1770                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1771                 talloc_free(state);
1772                 return -1;
1773         }
1774
1775         /*
1776          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1777          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1778          */
1779         if (state->status == 0) {
1780                 new_header.rsn++;
1781                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1782         } else {
1783                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1784                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1785         }
1786         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1787                 ctdb_ltdb_unlock(ctdb_db, key);
1788                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1789                 talloc_free(state);
1790                 return -1;
1791         }
1792         ctdb_ltdb_unlock(ctdb_db, key);
1793
1794         talloc_free(state);
1795         return 0;
1796 }
1797
1798
1799 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1800 {
1801         TDB_DATA tdata;
1802         struct revokechild_handle *rc;
1803         pid_t parent = getpid();
1804         int ret;
1805
1806         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1807         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1808         header->rsn   -= 1;
1809
1810         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1811                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1812                 return -1;
1813         }
1814
1815         tdata = tdb_fetch(ctdb_db->rottdb, key);
1816         if (tdata.dsize > 0) {
1817                 uint8_t *tmp;
1818
1819                 tmp = tdata.dptr;
1820                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1821                 free(tmp);
1822         }
1823
1824         rc->status    = 0;
1825         rc->ctdb      = ctdb;
1826         rc->ctdb_db   = ctdb_db;
1827         rc->fd[0]     = -1;
1828         rc->fd[1]     = -1;
1829
1830         talloc_set_destructor(rc, revokechild_destructor);
1831
1832         rc->key.dsize = key.dsize;
1833         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1834         if (rc->key.dptr == NULL) {
1835                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1836                 talloc_free(rc);
1837                 return -1;
1838         }
1839
1840         ret = pipe(rc->fd);
1841         if (ret != 0) {
1842                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1843                 talloc_free(rc);
1844                 return -1;
1845         }
1846
1847
1848         rc->child = ctdb_fork(ctdb);
1849         if (rc->child == (pid_t)-1) {
1850                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1851                 talloc_free(rc);
1852                 return -1;
1853         }
1854
1855         if (rc->child == 0) {
1856                 char c = 0;
1857                 close(rc->fd[0]);
1858                 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1859
1860                 ctdb_set_process_name("ctdb_revokechild");
1861                 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1862                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1863                         c = 1;
1864                         goto child_finished;
1865                 }
1866
1867                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1868
1869 child_finished:
1870                 sys_write(rc->fd[1], &c, 1);
1871                 /* make sure we die when our parent dies */
1872                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1873                         sleep(5);
1874                 }
1875                 _exit(0);
1876         }
1877
1878         close(rc->fd[1]);
1879         rc->fd[1] = -1;
1880         set_close_on_exec(rc->fd[0]);
1881
1882         /* This is an active revokechild child process */
1883         DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1884
1885         rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1886                                    EVENT_FD_READ, revokechild_handler,
1887                                    (void *)rc);
1888         if (rc->fde == NULL) {
1889                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1890                 talloc_free(rc);
1891         }
1892         tevent_fd_set_auto_close(rc->fde);
1893
1894         return 0;
1895 }
1896
1897 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1898 {
1899         struct revokechild_handle *rc;
1900         struct revokechild_deferred_call *deferred_call;
1901
1902         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1903                 if (rc->key.dsize == 0) {
1904                         continue;
1905                 }
1906                 if (rc->key.dsize != key.dsize) {
1907                         continue;
1908                 }
1909                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1910                         break;
1911                 }
1912         }
1913
1914         if (rc == NULL) {
1915                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1916                 return -1;
1917         }
1918
1919         deferred_call = talloc(rc, struct revokechild_deferred_call);
1920         if (deferred_call == NULL) {
1921                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1922                 return -1;
1923         }
1924
1925         deferred_call->ctdb = ctdb;
1926         deferred_call->hdr  = hdr;
1927         deferred_call->fn   = fn;
1928         deferred_call->ctx  = call_context;
1929
1930         talloc_set_destructor(deferred_call, deferred_call_destructor);
1931         talloc_steal(deferred_call, hdr);
1932
1933         return 0;
1934 }