ctdb-server: Only set destructor if required
[kai/samba-autobuild/.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/rb_tree.h"
40 #include "common/reqid.h"
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44 #include "common/hash_count.h"
45
46 struct ctdb_sticky_record {
47         struct ctdb_context *ctdb;
48         struct ctdb_db_context *ctdb_db;
49         TDB_CONTEXT *pindown;
50 };
51
52 /*
53   find the ctdb_db from a db index
54  */
55  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
56 {
57         struct ctdb_db_context *ctdb_db;
58
59         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
60                 if (ctdb_db->db_id == id) {
61                         break;
62                 }
63         }
64         return ctdb_db;
65 }
66
67 /*
68   a varient of input packet that can be used in lock requeue
69 */
70 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
71 {
72         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
73         ctdb_input_pkt(ctdb, hdr);
74 }
75
76
77 /*
78   send an error reply
79 */
80 static void ctdb_send_error(struct ctdb_context *ctdb, 
81                             struct ctdb_req_header *hdr, uint32_t status,
82                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
83 static void ctdb_send_error(struct ctdb_context *ctdb, 
84                             struct ctdb_req_header *hdr, uint32_t status,
85                             const char *fmt, ...)
86 {
87         va_list ap;
88         struct ctdb_reply_error_old *r;
89         char *msg;
90         int msglen, len;
91
92         if (ctdb->methods == NULL) {
93                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
94                 return;
95         }
96
97         va_start(ap, fmt);
98         msg = talloc_vasprintf(ctdb, fmt, ap);
99         if (msg == NULL) {
100                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
101         }
102         va_end(ap);
103
104         msglen = strlen(msg)+1;
105         len = offsetof(struct ctdb_reply_error_old, msg);
106         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
107                                     struct ctdb_reply_error_old);
108         CTDB_NO_MEMORY_FATAL(ctdb, r);
109
110         r->hdr.destnode  = hdr->srcnode;
111         r->hdr.reqid     = hdr->reqid;
112         r->status        = status;
113         r->msglen        = msglen;
114         memcpy(&r->msg[0], msg, msglen);
115
116         ctdb_queue_packet(ctdb, &r->hdr);
117
118         talloc_free(msg);
119 }
120
121
122 /**
123  * send a redirect reply
124  *
125  * The logic behind this function is this:
126  *
127  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
128  * to its local ctdb (ctdb_request_call). If the node is not itself
129  * the record's DMASTER, it first redirects the packet to  the
130  * record's LMASTER. The LMASTER then redirects the call packet to
131  * the current DMASTER. Note that this works because of this: When
132  * a record is migrated off a node, then the new DMASTER is stored
133  * in the record's copy on the former DMASTER.
134  */
135 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
136                                     struct ctdb_db_context *ctdb_db,
137                                     TDB_DATA key,
138                                     struct ctdb_req_call_old *c, 
139                                     struct ctdb_ltdb_header *header)
140 {
141         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
142
143         c->hdr.destnode = lmaster;
144         if (ctdb->pnn == lmaster) {
145                 c->hdr.destnode = header->dmaster;
146         }
147         c->hopcount++;
148
149         if (c->hopcount%100 > 95) {
150                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
151                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
152                         "header->dmaster:%d dst:%d\n",
153                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
154                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
155                         header->dmaster, c->hdr.destnode));
156         }
157
158         ctdb_queue_packet(ctdb, &c->hdr);
159 }
160
161
162 /*
163   send a dmaster reply
164
165   caller must have the chainlock before calling this routine. Caller must be
166   the lmaster
167 */
168 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
169                                     struct ctdb_ltdb_header *header,
170                                     TDB_DATA key, TDB_DATA data,
171                                     uint32_t new_dmaster,
172                                     uint32_t reqid)
173 {
174         struct ctdb_context *ctdb = ctdb_db->ctdb;
175         struct ctdb_reply_dmaster_old *r;
176         int ret, len;
177         TALLOC_CTX *tmp_ctx;
178
179         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
180                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181                 return;
182         }
183
184         header->dmaster = new_dmaster;
185         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
186         if (ret != 0) {
187                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188                 return;
189         }
190
191         if (ctdb->methods == NULL) {
192                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193                 return;
194         }
195
196         /* put the packet on a temporary context, allowing us to safely free
197            it below even if ctdb_reply_dmaster() has freed it already */
198         tmp_ctx = talloc_new(ctdb);
199
200         /* send the CTDB_REPLY_DMASTER */
201         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
202         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
203                                     struct ctdb_reply_dmaster_old);
204         CTDB_NO_MEMORY_FATAL(ctdb, r);
205
206         r->hdr.destnode  = new_dmaster;
207         r->hdr.reqid     = reqid;
208         r->hdr.generation = ctdb_db->generation;
209         r->rsn           = header->rsn;
210         r->keylen        = key.dsize;
211         r->datalen       = data.dsize;
212         r->db_id         = ctdb_db->db_id;
213         memcpy(&r->data[0], key.dptr, key.dsize);
214         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
215         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
216
217         ctdb_queue_packet(ctdb, &r->hdr);
218
219         talloc_free(tmp_ctx);
220 }
221
222 /*
223   send a dmaster request (give another node the dmaster for a record)
224
225   This is always sent to the lmaster, which ensures that the lmaster
226   always knows who the dmaster is. The lmaster will then send a
227   CTDB_REPLY_DMASTER to the new dmaster
228 */
229 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
230                                    struct ctdb_req_call_old *c, 
231                                    struct ctdb_ltdb_header *header,
232                                    TDB_DATA *key, TDB_DATA *data)
233 {
234         struct ctdb_req_dmaster_old *r;
235         struct ctdb_context *ctdb = ctdb_db->ctdb;
236         int len;
237         uint32_t lmaster = ctdb_lmaster(ctdb, key);
238
239         if (ctdb->methods == NULL) {
240                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
241                 return;
242         }
243
244         if (data->dsize != 0) {
245                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
246         }
247
248         if (lmaster == ctdb->pnn) {
249                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
250                                         c->hdr.srcnode, c->hdr.reqid);
251                 return;
252         }
253         
254         len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
255                         + sizeof(uint32_t);
256         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
257                                     struct ctdb_req_dmaster_old);
258         CTDB_NO_MEMORY_FATAL(ctdb, r);
259         r->hdr.destnode  = lmaster;
260         r->hdr.reqid     = c->hdr.reqid;
261         r->hdr.generation = ctdb_db->generation;
262         r->db_id         = c->db_id;
263         r->rsn           = header->rsn;
264         r->dmaster       = c->hdr.srcnode;
265         r->keylen        = key->dsize;
266         r->datalen       = data->dsize;
267         memcpy(&r->data[0], key->dptr, key->dsize);
268         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
269         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
270
271         header->dmaster = c->hdr.srcnode;
272         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
273                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
274         }
275         
276         ctdb_queue_packet(ctdb, &r->hdr);
277
278         talloc_free(r);
279 }
280
281 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
282                                         struct tevent_timer *te,
283                                         struct timeval t, void *private_data)
284 {
285         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
286                                                        struct ctdb_sticky_record);
287
288         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
289         if (sr->pindown != NULL) {
290                 talloc_free(sr->pindown);
291                 sr->pindown = NULL;
292         }
293 }
294
295 static int
296 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
297 {
298         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
299         uint32_t *k;
300         struct ctdb_sticky_record *sr;
301
302         k = ctdb_key_to_idkey(tmp_ctx, key);
303         if (k == NULL) {
304                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
305                 talloc_free(tmp_ctx);
306                 return -1;
307         }
308
309         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
310         if (sr == NULL) {
311                 talloc_free(tmp_ctx);
312                 return 0;
313         }
314
315         talloc_free(tmp_ctx);
316
317         if (sr->pindown == NULL) {
318                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
319                 sr->pindown = talloc_new(sr);
320                 if (sr->pindown == NULL) {
321                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
322                         return -1;
323                 }
324                 tevent_add_timer(ctdb->ev, sr->pindown,
325                                  timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
326                                                      (ctdb->tunable.sticky_pindown * 1000) % 1000000),
327                                  ctdb_sticky_pindown_timeout, sr);
328         }
329
330         return 0;
331 }
332
333 /*
334   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
335   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
336
337   must be called with the chainlock held. This function releases the chainlock
338 */
339 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
340                                 struct ctdb_req_header *hdr,
341                                 TDB_DATA key, TDB_DATA data,
342                                 uint64_t rsn, uint32_t record_flags)
343 {
344         struct ctdb_call_state *state;
345         struct ctdb_context *ctdb = ctdb_db->ctdb;
346         struct ctdb_ltdb_header header;
347         int ret;
348
349         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
350
351         ZERO_STRUCT(header);
352         header.rsn = rsn;
353         header.dmaster = ctdb->pnn;
354         header.flags = record_flags;
355
356         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
357
358         if (state) {
359                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
360                         /*
361                          * We temporarily add the VACUUM_MIGRATED flag to
362                          * the record flags, so that ctdb_ltdb_store can
363                          * decide whether the record should be stored or
364                          * deleted.
365                          */
366                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
367                 }
368         }
369
370         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
371                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
372
373                 ret = ctdb_ltdb_unlock(ctdb_db, key);
374                 if (ret != 0) {
375                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
376                 }
377                 return;
378         }
379
380         /* we just became DMASTER and this database is "sticky",
381            see if the record is flagged as "hot" and set up a pin-down
382            context to stop migrations for a little while if so
383         */
384         if (ctdb_db_sticky(ctdb_db)) {
385                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
386         }
387
388         if (state == NULL) {
389                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
390                          ctdb->pnn, hdr->reqid, hdr->srcnode));
391
392                 ret = ctdb_ltdb_unlock(ctdb_db, key);
393                 if (ret != 0) {
394                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
395                 }
396                 return;
397         }
398
399         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
400                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
401
402                 ret = ctdb_ltdb_unlock(ctdb_db, key);
403                 if (ret != 0) {
404                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
405                 }
406                 return;
407         }
408
409         if (hdr->reqid != state->reqid) {
410                 /* we found a record  but it was the wrong one */
411                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
412
413                 ret = ctdb_ltdb_unlock(ctdb_db, key);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
416                 }
417                 return;
418         }
419
420         (void) hash_count_increment(ctdb_db->migratedb, key);
421
422         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
423
424         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
425         if (ret != 0) {
426                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
427         }
428
429         state->state = CTDB_CALL_DONE;
430         if (state->async.fn) {
431                 state->async.fn(state);
432         }
433 }
434
435 struct dmaster_defer_call {
436         struct dmaster_defer_call *next, *prev;
437         struct ctdb_context *ctdb;
438         struct ctdb_req_header *hdr;
439 };
440
441 struct dmaster_defer_queue {
442         struct ctdb_db_context *ctdb_db;
443         uint32_t generation;
444         struct dmaster_defer_call *deferred_calls;
445 };
446
447 static void dmaster_defer_reprocess(struct tevent_context *ev,
448                                     struct tevent_timer *te,
449                                     struct timeval t,
450                                     void *private_data)
451 {
452         struct dmaster_defer_call *call = talloc_get_type(
453                 private_data, struct dmaster_defer_call);
454
455         ctdb_input_pkt(call->ctdb, call->hdr);
456         talloc_free(call);
457 }
458
459 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
460 {
461         /* Ignore requests, if database recovery happens in-between. */
462         if (ddq->generation != ddq->ctdb_db->generation) {
463                 return 0;
464         }
465
466         while (ddq->deferred_calls != NULL) {
467                 struct dmaster_defer_call *call = ddq->deferred_calls;
468
469                 DLIST_REMOVE(ddq->deferred_calls, call);
470
471                 talloc_steal(call->ctdb, call);
472                 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
473                                  dmaster_defer_reprocess, call);
474         }
475         return 0;
476 }
477
478 static void *insert_ddq_callback(void *parm, void *data)
479 {
480         if (data) {
481                 talloc_free(data);
482         }
483         return parm;
484 }
485
486 /**
487  * This function is used to reigster a key in database that needs to be updated.
488  * Any requests for that key should get deferred till this is completed.
489  */
490 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
491                                struct ctdb_req_header *hdr,
492                                TDB_DATA key)
493 {
494         uint32_t *k;
495         struct dmaster_defer_queue *ddq;
496
497         k = ctdb_key_to_idkey(hdr, key);
498         if (k == NULL) {
499                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
500                 return -1;
501         }
502
503         /* Already exists */
504         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
505         if (ddq != NULL) {
506                 if (ddq->generation == ctdb_db->generation) {
507                         talloc_free(k);
508                         return 0;
509                 }
510
511                 /* Recovery ocurred - get rid of old queue. All the deferred
512                  * requests will be resent anyway from ctdb_call_resend_db.
513                  */
514                 talloc_free(ddq);
515         }
516
517         ddq = talloc(hdr, struct dmaster_defer_queue);
518         if (ddq == NULL) {
519                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
520                 talloc_free(k);
521                 return -1;
522         }
523         ddq->ctdb_db = ctdb_db;
524         ddq->generation = hdr->generation;
525         ddq->deferred_calls = NULL;
526
527         trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
528                                     insert_ddq_callback, ddq);
529         talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
530
531         talloc_free(k);
532         return 0;
533 }
534
535 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
536                              struct ctdb_req_header *hdr,
537                              TDB_DATA key)
538 {
539         struct dmaster_defer_queue *ddq;
540         struct dmaster_defer_call *call;
541         uint32_t *k;
542
543         k = ctdb_key_to_idkey(hdr, key);
544         if (k == NULL) {
545                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
546                 return -1;
547         }
548
549         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
550         if (ddq == NULL) {
551                 talloc_free(k);
552                 return -1;
553         }
554
555         talloc_free(k);
556
557         if (ddq->generation != hdr->generation) {
558                 talloc_set_destructor(ddq, NULL);
559                 talloc_free(ddq);
560                 return -1;
561         }
562
563         call = talloc(ddq, struct dmaster_defer_call);
564         if (call == NULL) {
565                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
566                 return -1;
567         }
568
569         call->ctdb = ctdb_db->ctdb;
570         call->hdr = talloc_steal(call, hdr);
571
572         DLIST_ADD_END(ddq->deferred_calls, call);
573
574         return 0;
575 }
576
577 /*
578   called when a CTDB_REQ_DMASTER packet comes in
579
580   this comes into the lmaster for a record when the current dmaster
581   wants to give up the dmaster role and give it to someone else
582 */
583 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
584 {
585         struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
586         TDB_DATA key, data, data2;
587         struct ctdb_ltdb_header header;
588         struct ctdb_db_context *ctdb_db;
589         uint32_t record_flags = 0;
590         size_t len;
591         int ret;
592
593         key.dptr = c->data;
594         key.dsize = c->keylen;
595         data.dptr = c->data + c->keylen;
596         data.dsize = c->datalen;
597         len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
598                         + sizeof(uint32_t);
599         if (len <= c->hdr.length) {
600                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
601                        sizeof(record_flags));
602         }
603
604         ctdb_db = find_ctdb_db(ctdb, c->db_id);
605         if (!ctdb_db) {
606                 ctdb_send_error(ctdb, hdr, -1,
607                                 "Unknown database in request. db_id==0x%08x",
608                                 c->db_id);
609                 return;
610         }
611
612         dmaster_defer_setup(ctdb_db, hdr, key);
613
614         /* fetch the current record */
615         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
616                                            ctdb_call_input_pkt, ctdb, false);
617         if (ret == -1) {
618                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
619                 return;
620         }
621         if (ret == -2) {
622                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
623                 return;
624         }
625
626         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
627                 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
628                                   "db=%s lmaster=%u gen=%u curgen=%u\n",
629                                   ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
630                                   hdr->generation, ctdb_db->generation));
631                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
632         }
633
634         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
635                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
636
637         /* its a protocol error if the sending node is not the current dmaster */
638         if (header.dmaster != hdr->srcnode) {
639                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
640                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
641                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
642                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
643                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
644                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
645                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
646
647                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
648                         ctdb_ltdb_unlock(ctdb_db, key);
649                         return;
650                 }
651         }
652
653         if (header.rsn > c->rsn) {
654                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
655                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
656                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
657                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
658         }
659
660         /* use the rsn from the sending node */
661         header.rsn = c->rsn;
662
663         /* store the record flags from the sending node */
664         header.flags = record_flags;
665
666         /* check if the new dmaster is the lmaster, in which case we
667            skip the dmaster reply */
668         if (c->dmaster == ctdb->pnn) {
669                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
670         } else {
671                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
672
673                 ret = ctdb_ltdb_unlock(ctdb_db, key);
674                 if (ret != 0) {
675                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
676                 }
677         }
678 }
679
680 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
681                                        struct tevent_timer *te,
682                                        struct timeval t, void *private_data)
683 {
684         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
685                                                        struct ctdb_sticky_record);
686         talloc_free(sr);
687 }
688
689 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
690 {
691         if (data) {
692                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
693                 talloc_free(data);
694         }
695         return parm;
696 }
697
698 static int
699 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
700 {
701         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
702         uint32_t *k;
703         struct ctdb_sticky_record *sr;
704
705         k = ctdb_key_to_idkey(tmp_ctx, key);
706         if (k == NULL) {
707                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
708                 talloc_free(tmp_ctx);
709                 return -1;
710         }
711
712         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
713         if (sr != NULL) {
714                 talloc_free(tmp_ctx);
715                 return 0;
716         }
717
718         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
719         if (sr == NULL) {
720                 talloc_free(tmp_ctx);
721                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
722                 return -1;
723         }
724
725         sr->ctdb    = ctdb;
726         sr->ctdb_db = ctdb_db;
727         sr->pindown = NULL;
728
729         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
730                          ctdb->tunable.sticky_duration,
731                          ctdb_db->db_name, ctdb_hash(&key)));
732
733         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
734
735         tevent_add_timer(ctdb->ev, sr,
736                          timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
737                          ctdb_sticky_record_timeout, sr);
738
739         talloc_free(tmp_ctx);
740         return 0;
741 }
742
743 struct pinned_down_requeue_handle {
744         struct ctdb_context *ctdb;
745         struct ctdb_req_header *hdr;
746 };
747
748 struct pinned_down_deferred_call {
749         struct ctdb_context *ctdb;
750         struct ctdb_req_header *hdr;
751 };
752
753 static void pinned_down_requeue(struct tevent_context *ev,
754                                 struct tevent_timer *te,
755                                 struct timeval t, void *private_data)
756 {
757         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
758         struct ctdb_context *ctdb = handle->ctdb;
759
760         talloc_steal(ctdb, handle->hdr);
761         ctdb_call_input_pkt(ctdb, handle->hdr);
762
763         talloc_free(handle);
764 }
765
766 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
767 {
768         struct ctdb_context *ctdb = pinned_down->ctdb;
769         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
770
771         handle->ctdb = pinned_down->ctdb;
772         handle->hdr  = pinned_down->hdr;
773         talloc_steal(handle, handle->hdr);
774
775         tevent_add_timer(ctdb->ev, handle, timeval_zero(),
776                          pinned_down_requeue, handle);
777
778         return 0;
779 }
780
781 static int
782 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
783 {
784         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
785         uint32_t *k;
786         struct ctdb_sticky_record *sr;
787         struct pinned_down_deferred_call *pinned_down;
788
789         k = ctdb_key_to_idkey(tmp_ctx, key);
790         if (k == NULL) {
791                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
792                 talloc_free(tmp_ctx);
793                 return -1;
794         }
795
796         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
797         if (sr == NULL) {
798                 talloc_free(tmp_ctx);
799                 return -1;
800         }
801
802         talloc_free(tmp_ctx);
803
804         if (sr->pindown == NULL) {
805                 return -1;
806         }
807         
808         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
809         if (pinned_down == NULL) {
810                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
811                 return -1;
812         }
813
814         pinned_down->ctdb = ctdb;
815         pinned_down->hdr  = hdr;
816
817         talloc_set_destructor(pinned_down, pinned_down_destructor);
818         talloc_steal(pinned_down, hdr);
819
820         return 0;
821 }
822
823 static void
824 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key,
825                              int count)
826 {
827         int i, id;
828         char *keystr;
829
830         /* smallest value is always at index 0 */
831         if (count <= ctdb_db->statistics.hot_keys[0].count) {
832                 return;
833         }
834
835         /* see if we already know this key */
836         for (i = 0; i < MAX_HOT_KEYS; i++) {
837                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
838                         continue;
839                 }
840                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
841                         continue;
842                 }
843                 /* found an entry for this key */
844                 if (count <= ctdb_db->statistics.hot_keys[i].count) {
845                         return;
846                 }
847                 ctdb_db->statistics.hot_keys[i].count = count;
848                 goto sort_keys;
849         }
850
851         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
852                 id = ctdb_db->statistics.num_hot_keys;
853                 ctdb_db->statistics.num_hot_keys++;
854         } else {
855                 id = 0;
856         }
857
858         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
859                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
860         }
861         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
862         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
863         ctdb_db->statistics.hot_keys[id].count = count;
864
865         keystr = hex_encode_talloc(ctdb_db,
866                                    (unsigned char *)key.dptr, key.dsize);
867         DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=%s id=%d "
868                             "count=%d\n", ctdb_db->db_name,
869                             keystr ? keystr : "" , id, count));
870         talloc_free(keystr);
871
872 sort_keys:
873         for (i = 1; i < MAX_HOT_KEYS; i++) {
874                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
875                         continue;
876                 }
877                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
878                         count = ctdb_db->statistics.hot_keys[i].count;
879                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
880                         ctdb_db->statistics.hot_keys[0].count = count;
881
882                         key = ctdb_db->statistics.hot_keys[i].key;
883                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
884                         ctdb_db->statistics.hot_keys[0].key = key;
885                 }
886         }
887 }
888
889 /*
890   called when a CTDB_REQ_CALL packet comes in
891 */
892 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
893 {
894         struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
895         TDB_DATA data;
896         struct ctdb_reply_call_old *r;
897         int ret, len;
898         struct ctdb_ltdb_header header;
899         struct ctdb_call *call;
900         struct ctdb_db_context *ctdb_db;
901         int tmp_count, bucket;
902
903         if (ctdb->methods == NULL) {
904                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
905                 return;
906         }
907
908
909         ctdb_db = find_ctdb_db(ctdb, c->db_id);
910         if (!ctdb_db) {
911                 ctdb_send_error(ctdb, hdr, -1,
912                                 "Unknown database in request. db_id==0x%08x",
913                                 c->db_id);
914                 return;
915         }
916
917         call = talloc(hdr, struct ctdb_call);
918         CTDB_NO_MEMORY_FATAL(ctdb, call);
919
920         call->call_id  = c->callid;
921         call->key.dptr = c->data;
922         call->key.dsize = c->keylen;
923         call->call_data.dptr = c->data + c->keylen;
924         call->call_data.dsize = c->calldatalen;
925         call->reply_data.dptr  = NULL;
926         call->reply_data.dsize = 0;
927
928
929         /* If this record is pinned down we should defer the
930            request until the pindown times out
931         */
932         if (ctdb_db_sticky(ctdb_db)) {
933                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
934                         DEBUG(DEBUG_WARNING,
935                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
936                         talloc_free(call);
937                         return;
938                 }
939         }
940
941         if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
942                 talloc_free(call);
943                 return;
944         }
945
946         /* determine if we are the dmaster for this key. This also
947            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
948            if the call will be answered locally */
949
950         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
951                                            ctdb_call_input_pkt, ctdb, false);
952         if (ret == -1) {
953                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
954                 talloc_free(call);
955                 return;
956         }
957         if (ret == -2) {
958                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
959                 talloc_free(call);
960                 return;
961         }
962
963         /* Dont do READONLY if we don't have a tracking database */
964         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
965                 c->flags &= ~CTDB_WANT_READONLY;
966         }
967
968         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
969                 header.flags &= ~CTDB_REC_RO_FLAGS;
970                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
971                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
972                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
973                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
974                 }
975                 /* and clear out the tracking data */
976                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
977                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
978                 }
979         }
980
981         /* if we are revoking, we must defer all other calls until the revoke
982          * had completed.
983          */
984         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
985                 talloc_free(data.dptr);
986                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
987
988                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
989                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
990                 }
991                 talloc_free(call);
992                 return;
993         }
994
995         /*
996          * If we are not the dmaster and are not hosting any delegations,
997          * then we redirect the request to the node than can answer it
998          * (the lmaster or the dmaster).
999          */
1000         if ((header.dmaster != ctdb->pnn) 
1001             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
1002                 talloc_free(data.dptr);
1003                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
1004
1005                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1006                 if (ret != 0) {
1007                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1008                 }
1009                 talloc_free(call);
1010                 return;
1011         }
1012
1013         if ( (!(c->flags & CTDB_WANT_READONLY))
1014         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1015                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
1016                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1017                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1018                 }
1019                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1020
1021                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1022                         ctdb_fatal(ctdb, "Failed to start record revoke");
1023                 }
1024                 talloc_free(data.dptr);
1025
1026                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1027                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1028                 }
1029                 talloc_free(call);
1030
1031                 return;
1032         }               
1033
1034         /* If this is the first request for delegation. bump rsn and set
1035          * the delegations flag
1036          */
1037         if ((c->flags & CTDB_WANT_READONLY)
1038         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1039         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1040                 header.rsn     += 3;
1041                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
1042                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1043                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1044                 }
1045         }
1046         if ((c->flags & CTDB_WANT_READONLY) 
1047         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1048                 TDB_DATA tdata;
1049
1050                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1051                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1052                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1053                 }
1054                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1055                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
1056                 }
1057                 free(tdata.dptr);
1058
1059                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1060                 if (ret != 0) {
1061                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1062                 }
1063
1064                 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1065                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1066                                             struct ctdb_reply_call_old);
1067                 CTDB_NO_MEMORY_FATAL(ctdb, r);
1068                 r->hdr.destnode  = c->hdr.srcnode;
1069                 r->hdr.reqid     = c->hdr.reqid;
1070                 r->hdr.generation = ctdb_db->generation;
1071                 r->status        = 0;
1072                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
1073                 header.rsn      -= 2;
1074                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
1075                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1076                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1077
1078                 if (data.dsize) {
1079                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1080                 }
1081
1082                 ctdb_queue_packet(ctdb, &r->hdr);
1083                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1084                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1085
1086                 talloc_free(r);
1087                 talloc_free(call);
1088                 return;
1089         }
1090
1091         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1092         tmp_count = c->hopcount;
1093         bucket = 0;
1094         while (tmp_count) {
1095                 tmp_count >>= 2;
1096                 bucket++;
1097         }
1098         if (bucket >= MAX_COUNT_BUCKETS) {
1099                 bucket = MAX_COUNT_BUCKETS - 1;
1100         }
1101         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1102         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1103
1104         /* If this database supports sticky records, then check if the
1105            hopcount is big. If it is it means the record is hot and we
1106            should make it sticky.
1107         */
1108         if (ctdb_db_sticky(ctdb_db) &&
1109             c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1110                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1111         }
1112
1113
1114         /* Try if possible to migrate the record off to the caller node.
1115          * From the clients perspective a fetch of the data is just as 
1116          * expensive as a migration.
1117          */
1118         if (c->hdr.srcnode != ctdb->pnn) {
1119                 if (ctdb_db->persistent_state) {
1120                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1121                               " of key %s while transaction is active\n",
1122                               (char *)call->key.dptr));
1123                 } else {
1124                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1125                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1126                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1127                         talloc_free(data.dptr);
1128
1129                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1130                         if (ret != 0) {
1131                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1132                         }
1133                 }
1134                 talloc_free(call);
1135                 return;
1136         }
1137
1138         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1139         if (ret != 0) {
1140                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1141                 call->status = -1;
1142         }
1143
1144         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1145         if (ret != 0) {
1146                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1147         }
1148
1149         len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1150         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1151                                     struct ctdb_reply_call_old);
1152         CTDB_NO_MEMORY_FATAL(ctdb, r);
1153         r->hdr.destnode  = hdr->srcnode;
1154         r->hdr.reqid     = hdr->reqid;
1155         r->hdr.generation = ctdb_db->generation;
1156         r->status        = call->status;
1157         r->datalen       = call->reply_data.dsize;
1158         if (call->reply_data.dsize) {
1159                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1160         }
1161
1162         ctdb_queue_packet(ctdb, &r->hdr);
1163
1164         talloc_free(r);
1165         talloc_free(call);
1166 }
1167
1168 /**
1169  * called when a CTDB_REPLY_CALL packet comes in
1170  *
1171  * This packet comes in response to a CTDB_REQ_CALL request packet. It
1172  * contains any reply data from the call
1173  */
1174 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1175 {
1176         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1177         struct ctdb_call_state *state;
1178
1179         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1180         if (state == NULL) {
1181                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1182                 return;
1183         }
1184
1185         if (hdr->reqid != state->reqid) {
1186                 /* we found a record  but it was the wrong one */
1187                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1188                 return;
1189         }
1190
1191
1192         /* read only delegation processing */
1193         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1194          * delegation since we may need to update the record header
1195          */
1196         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1197                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1198                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1199                 struct ctdb_ltdb_header oldheader;
1200                 TDB_DATA key, data, olddata;
1201                 int ret;
1202
1203                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1204                         goto finished_ro;
1205                         return;
1206                 }
1207
1208                 key.dsize = state->c->keylen;
1209                 key.dptr  = state->c->data;
1210                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1211                                      ctdb_call_input_pkt, ctdb, false);
1212                 if (ret == -2) {
1213                         return;
1214                 }
1215                 if (ret != 0) {
1216                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1217                         return;
1218                 }
1219
1220                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1221                 if (ret != 0) {
1222                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1223                         ctdb_ltdb_unlock(ctdb_db, key);
1224                         goto finished_ro;
1225                 }                       
1226
1227                 if (header->rsn <= oldheader.rsn) {
1228                         ctdb_ltdb_unlock(ctdb_db, key);
1229                         goto finished_ro;
1230                 }
1231
1232                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1233                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1234                         ctdb_ltdb_unlock(ctdb_db, key);
1235                         goto finished_ro;
1236                 }
1237
1238                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1239                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1240                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1241                 if (ret != 0) {
1242                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1243                         ctdb_ltdb_unlock(ctdb_db, key);
1244                         goto finished_ro;
1245                 }                       
1246
1247                 ctdb_ltdb_unlock(ctdb_db, key);
1248         }
1249 finished_ro:
1250
1251         state->call->reply_data.dptr = c->data;
1252         state->call->reply_data.dsize = c->datalen;
1253         state->call->status = c->status;
1254
1255         talloc_steal(state, c);
1256
1257         state->state = CTDB_CALL_DONE;
1258         if (state->async.fn) {
1259                 state->async.fn(state);
1260         }
1261 }
1262
1263
1264 /**
1265  * called when a CTDB_REPLY_DMASTER packet comes in
1266  *
1267  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1268  * request packet. It means that the current dmaster wants to give us
1269  * the dmaster role.
1270  */
1271 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1272 {
1273         struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1274         struct ctdb_db_context *ctdb_db;
1275         TDB_DATA key, data;
1276         uint32_t record_flags = 0;
1277         size_t len;
1278         int ret;
1279
1280         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1281         if (ctdb_db == NULL) {
1282                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1283                 return;
1284         }
1285         
1286         key.dptr = c->data;
1287         key.dsize = c->keylen;
1288         data.dptr = &c->data[key.dsize];
1289         data.dsize = c->datalen;
1290         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1291                 + sizeof(uint32_t);
1292         if (len <= c->hdr.length) {
1293                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1294                        sizeof(record_flags));
1295         }
1296
1297         dmaster_defer_setup(ctdb_db, hdr, key);
1298
1299         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1300                                      ctdb_call_input_pkt, ctdb, false);
1301         if (ret == -2) {
1302                 return;
1303         }
1304         if (ret != 0) {
1305                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1306                 return;
1307         }
1308
1309         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1310 }
1311
1312
1313 /*
1314   called when a CTDB_REPLY_ERROR packet comes in
1315 */
1316 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1317 {
1318         struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1319         struct ctdb_call_state *state;
1320
1321         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1322         if (state == NULL) {
1323                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1324                          ctdb->pnn, hdr->reqid));
1325                 return;
1326         }
1327
1328         if (hdr->reqid != state->reqid) {
1329                 /* we found a record  but it was the wrong one */
1330                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1331                 return;
1332         }
1333
1334         talloc_steal(state, c);
1335
1336         state->state  = CTDB_CALL_ERROR;
1337         state->errmsg = (char *)c->msg;
1338         if (state->async.fn) {
1339                 state->async.fn(state);
1340         }
1341 }
1342
1343
1344 /*
1345   destroy a ctdb_call
1346 */
1347 static int ctdb_call_destructor(struct ctdb_call_state *state)
1348 {
1349         DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1350         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1351         return 0;
1352 }
1353
1354
1355 /*
1356   called when a ctdb_call needs to be resent after a reconfigure event
1357 */
1358 static void ctdb_call_resend(struct ctdb_call_state *state)
1359 {
1360         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1361
1362         state->generation = state->ctdb_db->generation;
1363
1364         /* use a new reqid, in case the old reply does eventually come in */
1365         reqid_remove(ctdb->idr, state->reqid);
1366         state->reqid = reqid_new(ctdb->idr, state);
1367         state->c->hdr.reqid = state->reqid;
1368
1369         /* update the generation count for this request, so its valid with the new vnn_map */
1370         state->c->hdr.generation = state->generation;
1371
1372         /* send the packet to ourselves, it will be redirected appropriately */
1373         state->c->hdr.destnode = ctdb->pnn;
1374
1375         ctdb_queue_packet(ctdb, &state->c->hdr);
1376         DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1377                             state->ctdb_db->db_name, state->reqid, state->generation));
1378 }
1379
1380 /*
1381   resend all pending calls on recovery
1382  */
1383 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1384 {
1385         struct ctdb_call_state *state, *next;
1386
1387         for (state = ctdb_db->pending_calls; state; state = next) {
1388                 next = state->next;
1389                 ctdb_call_resend(state);
1390         }
1391 }
1392
1393 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1394 {
1395         struct ctdb_db_context *ctdb_db;
1396
1397         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1398                 ctdb_call_resend_db(ctdb_db);
1399         }
1400 }
1401
1402 /*
1403   this allows the caller to setup a async.fn 
1404 */
1405 static void call_local_trigger(struct tevent_context *ev,
1406                                struct tevent_timer *te,
1407                                struct timeval t, void *private_data)
1408 {
1409         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1410         if (state->async.fn) {
1411                 state->async.fn(state);
1412         }
1413 }       
1414
1415
1416 /*
1417   construct an event driven local ctdb_call
1418
1419   this is used so that locally processed ctdb_call requests are processed
1420   in an event driven manner
1421 */
1422 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1423                                              struct ctdb_call *call,
1424                                              struct ctdb_ltdb_header *header,
1425                                              TDB_DATA *data)
1426 {
1427         struct ctdb_call_state *state;
1428         struct ctdb_context *ctdb = ctdb_db->ctdb;
1429         int ret;
1430
1431         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1432         CTDB_NO_MEMORY_NULL(ctdb, state);
1433
1434         talloc_steal(state, data->dptr);
1435
1436         state->state = CTDB_CALL_DONE;
1437         state->call  = talloc(state, struct ctdb_call);
1438         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1439         *(state->call) = *call;
1440         state->ctdb_db = ctdb_db;
1441
1442         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1443         if (ret != 0) {
1444                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1445         }
1446
1447         tevent_add_timer(ctdb->ev, state, timeval_zero(),
1448                          call_local_trigger, state);
1449
1450         return state;
1451 }
1452
1453
1454 /*
1455   make a remote ctdb call - async send. Called in daemon context.
1456
1457   This constructs a ctdb_call request and queues it for processing. 
1458   This call never blocks.
1459 */
1460 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1461                                                      struct ctdb_call *call, 
1462                                                      struct ctdb_ltdb_header *header)
1463 {
1464         uint32_t len;
1465         struct ctdb_call_state *state;
1466         struct ctdb_context *ctdb = ctdb_db->ctdb;
1467         struct ctdb_req_call_old *c;
1468
1469         if (ctdb->methods == NULL) {
1470                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1471                 return NULL;
1472         }
1473
1474         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1475         CTDB_NO_MEMORY_NULL(ctdb, state);
1476         state->call = talloc(state, struct ctdb_call);
1477         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1478
1479         state->reqid = reqid_new(ctdb->idr, state);
1480         state->ctdb_db = ctdb_db;
1481         state->state  = CTDB_CALL_WAIT;
1482         state->generation = ctdb_db->generation;
1483
1484         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize +
1485                        call->call_data.dsize;
1486
1487         c = ctdb_transport_allocate(ctdb,
1488                                     state,
1489                                     CTDB_REQ_CALL,
1490                                     len,
1491                                     struct ctdb_req_call_old);
1492
1493         CTDB_NO_MEMORY_NULL(ctdb, c);
1494         state->c = c;
1495
1496         c->hdr.destnode  = header->dmaster;
1497         c->hdr.reqid     = state->reqid;
1498         c->hdr.generation = ctdb_db->generation;
1499         c->flags         = call->flags;
1500         c->db_id         = ctdb_db->db_id;
1501         c->callid        = call->call_id;
1502         c->hopcount      = 0;
1503         c->keylen        = call->key.dsize;
1504         c->calldatalen   = call->call_data.dsize;
1505
1506         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
1507         memcpy(&c->data[call->key.dsize],
1508                call->call_data.dptr,
1509                call->call_data.dsize);
1510
1511         *(state->call) = *call;
1512         state->call->call_data.dptr = &c->data[call->key.dsize];
1513         state->call->key.dptr       = &c->data[0];
1514
1515         DLIST_ADD(ctdb_db->pending_calls, state);
1516
1517         talloc_set_destructor(state, ctdb_call_destructor);
1518         ctdb_queue_packet(ctdb, &state->c->hdr);
1519
1520         return state;
1521 }
1522
1523 /*
1524   make a remote ctdb call - async recv - called in daemon context
1525
1526   This is called when the program wants to wait for a ctdb_call to complete and get the 
1527   results. This call will block unless the call has already completed.
1528 */
1529 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1530 {
1531         while (state->state < CTDB_CALL_DONE) {
1532                 tevent_loop_once(state->ctdb_db->ctdb->ev);
1533         }
1534         if (state->state != CTDB_CALL_DONE) {
1535                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1536                 talloc_free(state);
1537                 return -1;
1538         }
1539
1540         if (state->call->reply_data.dsize) {
1541                 call->reply_data.dptr = talloc_memdup(call,
1542                                                       state->call->reply_data.dptr,
1543                                                       state->call->reply_data.dsize);
1544                 call->reply_data.dsize = state->call->reply_data.dsize;
1545         } else {
1546                 call->reply_data.dptr = NULL;
1547                 call->reply_data.dsize = 0;
1548         }
1549         call->status = state->call->status;
1550         talloc_free(state);
1551         return 0;
1552 }
1553
1554
1555 struct revokechild_deferred_call {
1556         struct revokechild_deferred_call *prev, *next;
1557         struct ctdb_context *ctdb;
1558         struct ctdb_req_header *hdr;
1559         deferred_requeue_fn fn;
1560         void *ctx;
1561         struct revokechild_handle *rev_hdl;
1562 };
1563
1564 struct revokechild_handle {
1565         struct revokechild_handle *next, *prev;
1566         struct ctdb_context *ctdb;
1567         struct ctdb_db_context *ctdb_db;
1568         struct tevent_fd *fde;
1569         int status;
1570         int fd[2];
1571         pid_t child;
1572         TDB_DATA key;
1573         struct revokechild_deferred_call *deferred_call_list;
1574 };
1575
1576 static void deferred_call_requeue(struct tevent_context *ev,
1577                                   struct tevent_timer *te,
1578                                   struct timeval t, void *private_data)
1579 {
1580         struct revokechild_deferred_call *dlist = talloc_get_type_abort(
1581                 private_data, struct revokechild_deferred_call);
1582
1583         while (dlist != NULL) {
1584                 struct revokechild_deferred_call *dcall = dlist;
1585
1586                 talloc_set_destructor(dcall, NULL);
1587                 DLIST_REMOVE(dlist, dcall);
1588                 dcall->fn(dcall->ctx, dcall->hdr);
1589                 talloc_free(dcall);
1590         }
1591 }
1592
1593 static int deferred_call_destructor(struct revokechild_deferred_call *dcall)
1594 {
1595         struct revokechild_handle *rev_hdl = dcall->rev_hdl;
1596
1597         DLIST_REMOVE(rev_hdl->deferred_call_list, dcall);
1598         return 0;
1599 }
1600
1601 static int revokechild_destructor(struct revokechild_handle *rev_hdl)
1602 {
1603         struct revokechild_deferred_call *now_list = NULL;
1604         struct revokechild_deferred_call *delay_list = NULL;
1605
1606         if (rev_hdl->fde != NULL) {
1607                 talloc_free(rev_hdl->fde);
1608         }
1609
1610         if (rev_hdl->fd[0] != -1) {
1611                 close(rev_hdl->fd[0]);
1612         }
1613         if (rev_hdl->fd[1] != -1) {
1614                 close(rev_hdl->fd[1]);
1615         }
1616         ctdb_kill(rev_hdl->ctdb, rev_hdl->child, SIGKILL);
1617
1618         DLIST_REMOVE(rev_hdl->ctdb_db->revokechild_active, rev_hdl);
1619
1620         while (rev_hdl->deferred_call_list != NULL) {
1621                 struct revokechild_deferred_call *dcall;
1622
1623                 dcall = rev_hdl->deferred_call_list;
1624                 DLIST_REMOVE(rev_hdl->deferred_call_list, dcall);
1625
1626                 /* If revoke is successful, then first process all the calls
1627                  * that need write access, and delay readonly requests by 1
1628                  * second grace.
1629                  *
1630                  * If revoke is unsuccessful, most likely because of node
1631                  * failure, delay all the pending requests, so database can
1632                  * be recovered.
1633                  */
1634
1635                 if (rev_hdl->status == 0) {
1636                         struct ctdb_req_call_old *c;
1637
1638                         c = (struct ctdb_req_call_old *)dcall->hdr;
1639                         if (c->flags & CTDB_WANT_READONLY) {
1640                                 DLIST_ADD(delay_list, dcall);
1641                         } else {
1642                                 DLIST_ADD(now_list, dcall);
1643                         }
1644                 } else {
1645                         DLIST_ADD(delay_list, dcall);
1646                 }
1647         }
1648
1649         if (now_list != NULL) {
1650                 tevent_add_timer(rev_hdl->ctdb->ev,
1651                                  rev_hdl->ctdb_db,
1652                                  tevent_timeval_current_ofs(0, 0),
1653                                  deferred_call_requeue,
1654                                  now_list);
1655         }
1656
1657         if (delay_list != NULL) {
1658                 tevent_add_timer(rev_hdl->ctdb->ev,
1659                                  rev_hdl->ctdb_db,
1660                                  tevent_timeval_current_ofs(1, 0),
1661                                  deferred_call_requeue,
1662                                  delay_list);
1663         }
1664
1665         return 0;
1666 }
1667
1668 static void revokechild_handler(struct tevent_context *ev,
1669                                 struct tevent_fd *fde,
1670                                 uint16_t flags, void *private_data)
1671 {
1672         struct revokechild_handle *rev_hdl =
1673                 talloc_get_type(private_data, struct revokechild_handle);
1674         int ret;
1675         char c;
1676
1677         ret = sys_read(rev_hdl->fd[0], &c, 1);
1678         if (ret != 1) {
1679                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1680                 rev_hdl->status = -1;
1681                 talloc_free(rev_hdl);
1682                 return;
1683         }
1684         if (c != 0) {
1685                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1686                 rev_hdl->status = -1;
1687                 talloc_free(rev_hdl);
1688                 return;
1689         }
1690
1691         talloc_free(rev_hdl);
1692 }
1693
1694 struct ctdb_revoke_state {
1695         struct ctdb_db_context *ctdb_db;
1696         TDB_DATA key;
1697         struct ctdb_ltdb_header *header;
1698         TDB_DATA data;
1699         int count;
1700         int status;
1701         int finished;
1702 };
1703
1704 static void update_record_cb(struct ctdb_client_control_state *state)
1705 {
1706         struct ctdb_revoke_state *revoke_state;
1707         int ret;
1708         int32_t res;
1709
1710         if (state == NULL) {
1711                 return;
1712         }
1713         revoke_state = state->async.private_data;
1714
1715         state->async.fn = NULL;
1716         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1717         if ((ret != 0) || (res != 0)) {
1718                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1719                 revoke_state->status = -1;
1720         }
1721
1722         revoke_state->count--;
1723         if (revoke_state->count <= 0) {
1724                 revoke_state->finished = 1;
1725         }
1726 }
1727
1728 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1729 {
1730         struct ctdb_revoke_state *revoke_state = private_data;
1731         struct ctdb_client_control_state *state;
1732
1733         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1734         if (state == NULL) {
1735                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1736                 revoke_state->status = -1;
1737                 return;
1738         }
1739         state->async.fn           = update_record_cb;
1740         state->async.private_data = revoke_state;
1741
1742         revoke_state->count++;
1743
1744 }
1745
1746 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1747                                         struct tevent_timer *te,
1748                                         struct timeval yt, void *private_data)
1749 {
1750         struct ctdb_revoke_state *state = private_data;
1751
1752         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1753         state->finished = 1;
1754         state->status   = -1;
1755 }
1756
1757 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1758 {
1759         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1760         struct ctdb_ltdb_header new_header;
1761         TDB_DATA new_data;
1762
1763         state->ctdb_db = ctdb_db;
1764         state->key     = key;
1765         state->header  = header;
1766         state->data    = data;
1767  
1768         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1769
1770         tevent_add_timer(ctdb->ev, state,
1771                          timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1772                          ctdb_revoke_timeout_handler, state);
1773
1774         while (state->finished == 0) {
1775                 tevent_loop_once(ctdb->ev);
1776         }
1777
1778         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1779                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1780                 talloc_free(state);
1781                 return -1;
1782         }
1783         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1784                 ctdb_ltdb_unlock(ctdb_db, key);
1785                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1786                 talloc_free(state);
1787                 return -1;
1788         }
1789         header->rsn++;
1790         if (new_header.rsn > header->rsn) {
1791                 ctdb_ltdb_unlock(ctdb_db, key);
1792                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1793                 talloc_free(state);
1794                 return -1;
1795         }
1796         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1797                 ctdb_ltdb_unlock(ctdb_db, key);
1798                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1799                 talloc_free(state);
1800                 return -1;
1801         }
1802
1803         /*
1804          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1805          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1806          */
1807         if (state->status == 0) {
1808                 new_header.rsn++;
1809                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1810         } else {
1811                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1812                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1813         }
1814         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1815                 ctdb_ltdb_unlock(ctdb_db, key);
1816                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1817                 talloc_free(state);
1818                 return -1;
1819         }
1820         ctdb_ltdb_unlock(ctdb_db, key);
1821
1822         talloc_free(state);
1823         return 0;
1824 }
1825
1826
1827 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb,
1828                                 struct ctdb_db_context *ctdb_db,
1829                                 TDB_DATA key,
1830                                 struct ctdb_ltdb_header *header,
1831                                 TDB_DATA data)
1832 {
1833         TDB_DATA tdata;
1834         struct revokechild_handle *rev_hdl;
1835         pid_t parent = getpid();
1836         int ret;
1837
1838         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY |
1839                            CTDB_REC_RO_HAVE_DELEGATIONS |
1840                            CTDB_REC_RO_HAVE_READONLY);
1841
1842         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1843         header->rsn   -= 1;
1844
1845         rev_hdl = talloc_zero(ctdb_db, struct revokechild_handle);
1846         if (rev_hdl == NULL) {
1847                 D_ERR("Failed to allocate revokechild_handle\n");
1848                 return -1;
1849         }
1850
1851         tdata = tdb_fetch(ctdb_db->rottdb, key);
1852         if (tdata.dsize > 0) {
1853                 uint8_t *tmp;
1854
1855                 tmp = tdata.dptr;
1856                 tdata.dptr = talloc_memdup(rev_hdl, tdata.dptr, tdata.dsize);
1857                 free(tmp);
1858         }
1859
1860         rev_hdl->status    = 0;
1861         rev_hdl->ctdb      = ctdb;
1862         rev_hdl->ctdb_db   = ctdb_db;
1863         rev_hdl->fd[0]     = -1;
1864         rev_hdl->fd[1]     = -1;
1865
1866         rev_hdl->key.dsize = key.dsize;
1867         rev_hdl->key.dptr  = talloc_memdup(rev_hdl, key.dptr, key.dsize);
1868         if (rev_hdl->key.dptr == NULL) {
1869                 D_ERR("Failed to allocate key for revokechild_handle\n");
1870                 goto err_out;
1871         }
1872
1873         ret = pipe(rev_hdl->fd);
1874         if (ret != 0) {
1875                 D_ERR("Failed to allocate key for revokechild_handle\n");
1876                 goto err_out;
1877         }
1878
1879
1880         rev_hdl->child = ctdb_fork(ctdb);
1881         if (rev_hdl->child == (pid_t)-1) {
1882                 D_ERR("Failed to fork child for revokechild\n");
1883                 goto err_out;
1884         }
1885
1886         if (rev_hdl->child == 0) {
1887                 char c = 0;
1888                 close(rev_hdl->fd[0]);
1889
1890                 prctl_set_comment("ctdb_revokechild");
1891                 if (switch_from_server_to_client(ctdb) != 0) {
1892                         D_ERR("Failed to switch from server to client "
1893                               "for revokechild process\n");
1894                         c = 1;
1895                         goto child_finished;
1896                 }
1897
1898                 c = ctdb_revoke_all_delegations(ctdb,
1899                                                 ctdb_db,
1900                                                 tdata,
1901                                                 key,
1902                                                 header,
1903                                                 data);
1904
1905 child_finished:
1906                 sys_write(rev_hdl->fd[1], &c, 1);
1907                 ctdb_wait_for_process_to_exit(parent);
1908                 _exit(0);
1909         }
1910
1911         close(rev_hdl->fd[1]);
1912         rev_hdl->fd[1] = -1;
1913         set_close_on_exec(rev_hdl->fd[0]);
1914
1915         rev_hdl->fde = tevent_add_fd(ctdb->ev,
1916                                      rev_hdl,
1917                                      rev_hdl->fd[0],
1918                                      TEVENT_FD_READ,
1919                                      revokechild_handler,
1920                                      (void *)rev_hdl);
1921
1922         if (rev_hdl->fde == NULL) {
1923                 D_ERR("Failed to set up fd event for revokechild process\n");
1924                 talloc_free(rev_hdl);
1925         }
1926         tevent_fd_set_auto_close(rev_hdl->fde);
1927
1928         /* This is an active revokechild child process */
1929         DLIST_ADD_END(ctdb_db->revokechild_active, rev_hdl);
1930         talloc_set_destructor(rev_hdl, revokechild_destructor);
1931
1932         return 0;
1933 err_out:
1934         talloc_free(rev_hdl);
1935         return -1;
1936 }
1937
1938 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1939 {
1940         struct revokechild_handle *rev_hdl;
1941         struct revokechild_deferred_call *deferred_call;
1942
1943         for (rev_hdl = ctdb_db->revokechild_active;
1944              rev_hdl;
1945              rev_hdl = rev_hdl->next) {
1946                 if (rev_hdl->key.dsize == 0) {
1947                         continue;
1948                 }
1949                 if (rev_hdl->key.dsize != key.dsize) {
1950                         continue;
1951                 }
1952                 if (!memcmp(rev_hdl->key.dptr, key.dptr, key.dsize)) {
1953                         break;
1954                 }
1955         }
1956
1957         if (rev_hdl == NULL) {
1958                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1959                 return -1;
1960         }
1961
1962         deferred_call = talloc(call_context, struct revokechild_deferred_call);
1963         if (deferred_call == NULL) {
1964                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1965                 return -1;
1966         }
1967
1968         deferred_call->ctdb = ctdb;
1969         deferred_call->hdr  = talloc_steal(deferred_call, hdr);
1970         deferred_call->fn   = fn;
1971         deferred_call->ctx  = call_context;
1972         deferred_call->rev_hdl   = rev_hdl;
1973
1974         talloc_set_destructor(deferred_call, deferred_call_destructor);
1975
1976         DLIST_ADD(rev_hdl->deferred_call_list, deferred_call);
1977
1978         return 0;
1979 }
1980
1981 static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter,
1982                                          void *private_data)
1983 {
1984         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1985                 private_data, struct ctdb_db_context);
1986         int value;
1987
1988         value = (counter < INT_MAX ? counter : INT_MAX);
1989         ctdb_update_db_stat_hot_keys(ctdb_db, key, value);
1990 }
1991
1992 static void ctdb_migration_cleandb_event(struct tevent_context *ev,
1993                                          struct tevent_timer *te,
1994                                          struct timeval current_time,
1995                                          void *private_data)
1996 {
1997         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1998                 private_data, struct ctdb_db_context);
1999
2000         if (ctdb_db->migratedb == NULL) {
2001                 return;
2002         }
2003
2004         hash_count_expire(ctdb_db->migratedb, NULL);
2005
2006         te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
2007                               tevent_timeval_current_ofs(10, 0),
2008                               ctdb_migration_cleandb_event, ctdb_db);
2009         if (te == NULL) {
2010                 DEBUG(DEBUG_ERR,
2011                       ("Memory error in migration cleandb event for %s\n",
2012                        ctdb_db->db_name));
2013                 TALLOC_FREE(ctdb_db->migratedb);
2014         }
2015 }
2016
2017 int ctdb_migration_init(struct ctdb_db_context *ctdb_db)
2018 {
2019         struct timeval one_second = { 1, 0 };
2020         struct tevent_timer *te;
2021         int ret;
2022
2023         if (! ctdb_db_volatile(ctdb_db)) {
2024                 return 0;
2025         }
2026
2027         ret = hash_count_init(ctdb_db, one_second,
2028                               ctdb_migration_count_handler, ctdb_db,
2029                               &ctdb_db->migratedb);
2030         if (ret != 0) {
2031                 DEBUG(DEBUG_ERR,
2032                       ("Memory error in migration init for %s\n",
2033                        ctdb_db->db_name));
2034                 return -1;
2035         }
2036
2037         te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
2038                               tevent_timeval_current_ofs(10, 0),
2039                               ctdb_migration_cleandb_event, ctdb_db);
2040         if (te == NULL) {
2041                 DEBUG(DEBUG_ERR,
2042                       ("Memory error in migration init for %s\n",
2043                        ctdb_db->db_name));
2044                 TALLOC_FREE(ctdb_db->migratedb);
2045                 return -1;
2046         }
2047
2048         return 0;
2049 }