ctdb: Print key as hex string instead of just the hash in hot record message
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/rb_tree.h"
40 #include "common/reqid.h"
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44 #include "common/hash_count.h"
45
46 struct ctdb_sticky_record {
47         struct ctdb_context *ctdb;
48         struct ctdb_db_context *ctdb_db;
49         TDB_CONTEXT *pindown;
50 };
51
52 /*
53   find the ctdb_db from a db index
54  */
55  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
56 {
57         struct ctdb_db_context *ctdb_db;
58
59         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
60                 if (ctdb_db->db_id == id) {
61                         break;
62                 }
63         }
64         return ctdb_db;
65 }
66
67 /*
68   a varient of input packet that can be used in lock requeue
69 */
70 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
71 {
72         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
73         ctdb_input_pkt(ctdb, hdr);
74 }
75
76
77 /*
78   send an error reply
79 */
80 static void ctdb_send_error(struct ctdb_context *ctdb, 
81                             struct ctdb_req_header *hdr, uint32_t status,
82                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
83 static void ctdb_send_error(struct ctdb_context *ctdb, 
84                             struct ctdb_req_header *hdr, uint32_t status,
85                             const char *fmt, ...)
86 {
87         va_list ap;
88         struct ctdb_reply_error_old *r;
89         char *msg;
90         int msglen, len;
91
92         if (ctdb->methods == NULL) {
93                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
94                 return;
95         }
96
97         va_start(ap, fmt);
98         msg = talloc_vasprintf(ctdb, fmt, ap);
99         if (msg == NULL) {
100                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
101         }
102         va_end(ap);
103
104         msglen = strlen(msg)+1;
105         len = offsetof(struct ctdb_reply_error_old, msg);
106         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
107                                     struct ctdb_reply_error_old);
108         CTDB_NO_MEMORY_FATAL(ctdb, r);
109
110         r->hdr.destnode  = hdr->srcnode;
111         r->hdr.reqid     = hdr->reqid;
112         r->status        = status;
113         r->msglen        = msglen;
114         memcpy(&r->msg[0], msg, msglen);
115
116         ctdb_queue_packet(ctdb, &r->hdr);
117
118         talloc_free(msg);
119 }
120
121
122 /**
123  * send a redirect reply
124  *
125  * The logic behind this function is this:
126  *
127  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
128  * to its local ctdb (ctdb_request_call). If the node is not itself
129  * the record's DMASTER, it first redirects the packet to  the
130  * record's LMASTER. The LMASTER then redirects the call packet to
131  * the current DMASTER. Note that this works because of this: When
132  * a record is migrated off a node, then the new DMASTER is stored
133  * in the record's copy on the former DMASTER.
134  */
135 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
136                                     struct ctdb_db_context *ctdb_db,
137                                     TDB_DATA key,
138                                     struct ctdb_req_call_old *c, 
139                                     struct ctdb_ltdb_header *header)
140 {
141         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
142
143         c->hdr.destnode = lmaster;
144         if (ctdb->pnn == lmaster) {
145                 c->hdr.destnode = header->dmaster;
146         }
147         c->hopcount++;
148
149         if (c->hopcount%100 > 95) {
150                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
151                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
152                         "header->dmaster:%d dst:%d\n",
153                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
154                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
155                         header->dmaster, c->hdr.destnode));
156         }
157
158         ctdb_queue_packet(ctdb, &c->hdr);
159 }
160
161
162 /*
163   send a dmaster reply
164
165   caller must have the chainlock before calling this routine. Caller must be
166   the lmaster
167 */
168 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
169                                     struct ctdb_ltdb_header *header,
170                                     TDB_DATA key, TDB_DATA data,
171                                     uint32_t new_dmaster,
172                                     uint32_t reqid)
173 {
174         struct ctdb_context *ctdb = ctdb_db->ctdb;
175         struct ctdb_reply_dmaster_old *r;
176         int ret, len;
177         TALLOC_CTX *tmp_ctx;
178
179         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
180                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181                 return;
182         }
183
184         header->dmaster = new_dmaster;
185         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
186         if (ret != 0) {
187                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188                 return;
189         }
190
191         if (ctdb->methods == NULL) {
192                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193                 return;
194         }
195
196         /* put the packet on a temporary context, allowing us to safely free
197            it below even if ctdb_reply_dmaster() has freed it already */
198         tmp_ctx = talloc_new(ctdb);
199
200         /* send the CTDB_REPLY_DMASTER */
201         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
202         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
203                                     struct ctdb_reply_dmaster_old);
204         CTDB_NO_MEMORY_FATAL(ctdb, r);
205
206         r->hdr.destnode  = new_dmaster;
207         r->hdr.reqid     = reqid;
208         r->hdr.generation = ctdb_db->generation;
209         r->rsn           = header->rsn;
210         r->keylen        = key.dsize;
211         r->datalen       = data.dsize;
212         r->db_id         = ctdb_db->db_id;
213         memcpy(&r->data[0], key.dptr, key.dsize);
214         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
215         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
216
217         ctdb_queue_packet(ctdb, &r->hdr);
218
219         talloc_free(tmp_ctx);
220 }
221
222 /*
223   send a dmaster request (give another node the dmaster for a record)
224
225   This is always sent to the lmaster, which ensures that the lmaster
226   always knows who the dmaster is. The lmaster will then send a
227   CTDB_REPLY_DMASTER to the new dmaster
228 */
229 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
230                                    struct ctdb_req_call_old *c, 
231                                    struct ctdb_ltdb_header *header,
232                                    TDB_DATA *key, TDB_DATA *data)
233 {
234         struct ctdb_req_dmaster_old *r;
235         struct ctdb_context *ctdb = ctdb_db->ctdb;
236         int len;
237         uint32_t lmaster = ctdb_lmaster(ctdb, key);
238
239         if (ctdb->methods == NULL) {
240                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
241                 return;
242         }
243
244         if (data->dsize != 0) {
245                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
246         }
247
248         if (lmaster == ctdb->pnn) {
249                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
250                                         c->hdr.srcnode, c->hdr.reqid);
251                 return;
252         }
253         
254         len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
255                         + sizeof(uint32_t);
256         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
257                                     struct ctdb_req_dmaster_old);
258         CTDB_NO_MEMORY_FATAL(ctdb, r);
259         r->hdr.destnode  = lmaster;
260         r->hdr.reqid     = c->hdr.reqid;
261         r->hdr.generation = ctdb_db->generation;
262         r->db_id         = c->db_id;
263         r->rsn           = header->rsn;
264         r->dmaster       = c->hdr.srcnode;
265         r->keylen        = key->dsize;
266         r->datalen       = data->dsize;
267         memcpy(&r->data[0], key->dptr, key->dsize);
268         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
269         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
270
271         header->dmaster = c->hdr.srcnode;
272         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
273                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
274         }
275         
276         ctdb_queue_packet(ctdb, &r->hdr);
277
278         talloc_free(r);
279 }
280
281 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
282                                         struct tevent_timer *te,
283                                         struct timeval t, void *private_data)
284 {
285         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
286                                                        struct ctdb_sticky_record);
287
288         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
289         if (sr->pindown != NULL) {
290                 talloc_free(sr->pindown);
291                 sr->pindown = NULL;
292         }
293 }
294
295 static int
296 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
297 {
298         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
299         uint32_t *k;
300         struct ctdb_sticky_record *sr;
301
302         k = ctdb_key_to_idkey(tmp_ctx, key);
303         if (k == NULL) {
304                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
305                 talloc_free(tmp_ctx);
306                 return -1;
307         }
308
309         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
310         if (sr == NULL) {
311                 talloc_free(tmp_ctx);
312                 return 0;
313         }
314
315         talloc_free(tmp_ctx);
316
317         if (sr->pindown == NULL) {
318                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
319                 sr->pindown = talloc_new(sr);
320                 if (sr->pindown == NULL) {
321                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
322                         return -1;
323                 }
324                 tevent_add_timer(ctdb->ev, sr->pindown,
325                                  timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
326                                                      (ctdb->tunable.sticky_pindown * 1000) % 1000000),
327                                  ctdb_sticky_pindown_timeout, sr);
328         }
329
330         return 0;
331 }
332
333 /*
334   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
335   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
336
337   must be called with the chainlock held. This function releases the chainlock
338 */
339 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
340                                 struct ctdb_req_header *hdr,
341                                 TDB_DATA key, TDB_DATA data,
342                                 uint64_t rsn, uint32_t record_flags)
343 {
344         struct ctdb_call_state *state;
345         struct ctdb_context *ctdb = ctdb_db->ctdb;
346         struct ctdb_ltdb_header header;
347         int ret;
348
349         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
350
351         ZERO_STRUCT(header);
352         header.rsn = rsn;
353         header.dmaster = ctdb->pnn;
354         header.flags = record_flags;
355
356         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
357
358         if (state) {
359                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
360                         /*
361                          * We temporarily add the VACUUM_MIGRATED flag to
362                          * the record flags, so that ctdb_ltdb_store can
363                          * decide whether the record should be stored or
364                          * deleted.
365                          */
366                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
367                 }
368         }
369
370         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
371                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
372
373                 ret = ctdb_ltdb_unlock(ctdb_db, key);
374                 if (ret != 0) {
375                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
376                 }
377                 return;
378         }
379
380         /* we just became DMASTER and this database is "sticky",
381            see if the record is flagged as "hot" and set up a pin-down
382            context to stop migrations for a little while if so
383         */
384         if (ctdb_db->sticky) {
385                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
386         }
387
388         if (state == NULL) {
389                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
390                          ctdb->pnn, hdr->reqid, hdr->srcnode));
391
392                 ret = ctdb_ltdb_unlock(ctdb_db, key);
393                 if (ret != 0) {
394                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
395                 }
396                 return;
397         }
398
399         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
400                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
401
402                 ret = ctdb_ltdb_unlock(ctdb_db, key);
403                 if (ret != 0) {
404                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
405                 }
406                 return;
407         }
408
409         if (hdr->reqid != state->reqid) {
410                 /* we found a record  but it was the wrong one */
411                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
412
413                 ret = ctdb_ltdb_unlock(ctdb_db, key);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
416                 }
417                 return;
418         }
419
420         (void) hash_count_increment(ctdb_db->migratedb, key);
421
422         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
423
424         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
425         if (ret != 0) {
426                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
427         }
428
429         state->state = CTDB_CALL_DONE;
430         if (state->async.fn) {
431                 state->async.fn(state);
432         }
433 }
434
435 struct dmaster_defer_call {
436         struct dmaster_defer_call *next, *prev;
437         struct ctdb_context *ctdb;
438         struct ctdb_req_header *hdr;
439 };
440
441 struct dmaster_defer_queue {
442         struct ctdb_db_context *ctdb_db;
443         uint32_t generation;
444         struct dmaster_defer_call *deferred_calls;
445 };
446
447 static void dmaster_defer_reprocess(struct tevent_context *ev,
448                                     struct tevent_timer *te,
449                                     struct timeval t,
450                                     void *private_data)
451 {
452         struct dmaster_defer_call *call = talloc_get_type(
453                 private_data, struct dmaster_defer_call);
454
455         ctdb_input_pkt(call->ctdb, call->hdr);
456         talloc_free(call);
457 }
458
459 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
460 {
461         /* Ignore requests, if database recovery happens in-between. */
462         if (ddq->generation != ddq->ctdb_db->generation) {
463                 return 0;
464         }
465
466         while (ddq->deferred_calls != NULL) {
467                 struct dmaster_defer_call *call = ddq->deferred_calls;
468
469                 DLIST_REMOVE(ddq->deferred_calls, call);
470
471                 talloc_steal(call->ctdb, call);
472                 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
473                                  dmaster_defer_reprocess, call);
474         }
475         return 0;
476 }
477
478 static void *insert_ddq_callback(void *parm, void *data)
479 {
480         if (data) {
481                 talloc_free(data);
482         }
483         return parm;
484 }
485
486 /**
487  * This function is used to reigster a key in database that needs to be updated.
488  * Any requests for that key should get deferred till this is completed.
489  */
490 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
491                                struct ctdb_req_header *hdr,
492                                TDB_DATA key)
493 {
494         uint32_t *k;
495         struct dmaster_defer_queue *ddq;
496
497         k = ctdb_key_to_idkey(hdr, key);
498         if (k == NULL) {
499                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
500                 return -1;
501         }
502
503         /* Already exists */
504         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
505         if (ddq != NULL) {
506                 if (ddq->generation == ctdb_db->generation) {
507                         talloc_free(k);
508                         return 0;
509                 }
510
511                 /* Recovery ocurred - get rid of old queue. All the deferred
512                  * requests will be resent anyway from ctdb_call_resend_db.
513                  */
514                 talloc_free(ddq);
515         }
516
517         ddq = talloc(hdr, struct dmaster_defer_queue);
518         if (ddq == NULL) {
519                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
520                 talloc_free(k);
521                 return -1;
522         }
523         ddq->ctdb_db = ctdb_db;
524         ddq->generation = hdr->generation;
525         ddq->deferred_calls = NULL;
526
527         trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
528                                     insert_ddq_callback, ddq);
529         talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
530
531         talloc_free(k);
532         return 0;
533 }
534
535 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
536                              struct ctdb_req_header *hdr,
537                              TDB_DATA key)
538 {
539         struct dmaster_defer_queue *ddq;
540         struct dmaster_defer_call *call;
541         uint32_t *k;
542
543         k = ctdb_key_to_idkey(hdr, key);
544         if (k == NULL) {
545                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
546                 return -1;
547         }
548
549         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
550         if (ddq == NULL) {
551                 talloc_free(k);
552                 return -1;
553         }
554
555         talloc_free(k);
556
557         if (ddq->generation != hdr->generation) {
558                 talloc_set_destructor(ddq, NULL);
559                 talloc_free(ddq);
560                 return -1;
561         }
562
563         call = talloc(ddq, struct dmaster_defer_call);
564         if (call == NULL) {
565                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
566                 return -1;
567         }
568
569         call->ctdb = ctdb_db->ctdb;
570         call->hdr = talloc_steal(call, hdr);
571
572         DLIST_ADD_END(ddq->deferred_calls, call);
573
574         return 0;
575 }
576
577 /*
578   called when a CTDB_REQ_DMASTER packet comes in
579
580   this comes into the lmaster for a record when the current dmaster
581   wants to give up the dmaster role and give it to someone else
582 */
583 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
584 {
585         struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
586         TDB_DATA key, data, data2;
587         struct ctdb_ltdb_header header;
588         struct ctdb_db_context *ctdb_db;
589         uint32_t record_flags = 0;
590         size_t len;
591         int ret;
592
593         key.dptr = c->data;
594         key.dsize = c->keylen;
595         data.dptr = c->data + c->keylen;
596         data.dsize = c->datalen;
597         len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
598                         + sizeof(uint32_t);
599         if (len <= c->hdr.length) {
600                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
601                        sizeof(record_flags));
602         }
603
604         ctdb_db = find_ctdb_db(ctdb, c->db_id);
605         if (!ctdb_db) {
606                 ctdb_send_error(ctdb, hdr, -1,
607                                 "Unknown database in request. db_id==0x%08x",
608                                 c->db_id);
609                 return;
610         }
611
612         dmaster_defer_setup(ctdb_db, hdr, key);
613
614         /* fetch the current record */
615         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
616                                            ctdb_call_input_pkt, ctdb, false);
617         if (ret == -1) {
618                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
619                 return;
620         }
621         if (ret == -2) {
622                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
623                 return;
624         }
625
626         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
627                 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
628                                   "db=%s lmaster=%u gen=%u curgen=%u\n",
629                                   ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
630                                   hdr->generation, ctdb_db->generation));
631                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
632         }
633
634         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
635                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
636
637         /* its a protocol error if the sending node is not the current dmaster */
638         if (header.dmaster != hdr->srcnode) {
639                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
640                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
641                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
642                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
643                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
644                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
645                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
646
647                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
648                         ctdb_ltdb_unlock(ctdb_db, key);
649                         return;
650                 }
651         }
652
653         if (header.rsn > c->rsn) {
654                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
655                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
656                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
657                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
658         }
659
660         /* use the rsn from the sending node */
661         header.rsn = c->rsn;
662
663         /* store the record flags from the sending node */
664         header.flags = record_flags;
665
666         /* check if the new dmaster is the lmaster, in which case we
667            skip the dmaster reply */
668         if (c->dmaster == ctdb->pnn) {
669                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
670         } else {
671                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
672
673                 ret = ctdb_ltdb_unlock(ctdb_db, key);
674                 if (ret != 0) {
675                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
676                 }
677         }
678 }
679
680 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
681                                        struct tevent_timer *te,
682                                        struct timeval t, void *private_data)
683 {
684         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
685                                                        struct ctdb_sticky_record);
686         talloc_free(sr);
687 }
688
689 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
690 {
691         if (data) {
692                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
693                 talloc_free(data);
694         }
695         return parm;
696 }
697
698 static int
699 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
700 {
701         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
702         uint32_t *k;
703         struct ctdb_sticky_record *sr;
704
705         k = ctdb_key_to_idkey(tmp_ctx, key);
706         if (k == NULL) {
707                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
708                 talloc_free(tmp_ctx);
709                 return -1;
710         }
711
712         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
713         if (sr != NULL) {
714                 talloc_free(tmp_ctx);
715                 return 0;
716         }
717
718         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
719         if (sr == NULL) {
720                 talloc_free(tmp_ctx);
721                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
722                 return -1;
723         }
724
725         sr->ctdb    = ctdb;
726         sr->ctdb_db = ctdb_db;
727         sr->pindown = NULL;
728
729         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
730                          ctdb->tunable.sticky_duration,
731                          ctdb_db->db_name, ctdb_hash(&key)));
732
733         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
734
735         tevent_add_timer(ctdb->ev, sr,
736                          timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
737                          ctdb_sticky_record_timeout, sr);
738
739         talloc_free(tmp_ctx);
740         return 0;
741 }
742
743 struct pinned_down_requeue_handle {
744         struct ctdb_context *ctdb;
745         struct ctdb_req_header *hdr;
746 };
747
748 struct pinned_down_deferred_call {
749         struct ctdb_context *ctdb;
750         struct ctdb_req_header *hdr;
751 };
752
753 static void pinned_down_requeue(struct tevent_context *ev,
754                                 struct tevent_timer *te,
755                                 struct timeval t, void *private_data)
756 {
757         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
758         struct ctdb_context *ctdb = handle->ctdb;
759
760         talloc_steal(ctdb, handle->hdr);
761         ctdb_call_input_pkt(ctdb, handle->hdr);
762
763         talloc_free(handle);
764 }
765
766 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
767 {
768         struct ctdb_context *ctdb = pinned_down->ctdb;
769         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
770
771         handle->ctdb = pinned_down->ctdb;
772         handle->hdr  = pinned_down->hdr;
773         talloc_steal(handle, handle->hdr);
774
775         tevent_add_timer(ctdb->ev, handle, timeval_zero(),
776                          pinned_down_requeue, handle);
777
778         return 0;
779 }
780
781 static int
782 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
783 {
784         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
785         uint32_t *k;
786         struct ctdb_sticky_record *sr;
787         struct pinned_down_deferred_call *pinned_down;
788
789         k = ctdb_key_to_idkey(tmp_ctx, key);
790         if (k == NULL) {
791                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
792                 talloc_free(tmp_ctx);
793                 return -1;
794         }
795
796         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
797         if (sr == NULL) {
798                 talloc_free(tmp_ctx);
799                 return -1;
800         }
801
802         talloc_free(tmp_ctx);
803
804         if (sr->pindown == NULL) {
805                 return -1;
806         }
807         
808         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
809         if (pinned_down == NULL) {
810                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
811                 return -1;
812         }
813
814         pinned_down->ctdb = ctdb;
815         pinned_down->hdr  = hdr;
816
817         talloc_set_destructor(pinned_down, pinned_down_destructor);
818         talloc_steal(pinned_down, hdr);
819
820         return 0;
821 }
822
823 static void
824 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key,
825                              int count)
826 {
827         int i, id;
828         char *keystr;
829
830         /* smallest value is always at index 0 */
831         if (count <= ctdb_db->statistics.hot_keys[0].count) {
832                 return;
833         }
834
835         /* see if we already know this key */
836         for (i = 0; i < MAX_HOT_KEYS; i++) {
837                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
838                         continue;
839                 }
840                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
841                         continue;
842                 }
843                 /* found an entry for this key */
844                 if (count <= ctdb_db->statistics.hot_keys[i].count) {
845                         return;
846                 }
847                 ctdb_db->statistics.hot_keys[i].count = count;
848                 goto sort_keys;
849         }
850
851         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
852                 id = ctdb_db->statistics.num_hot_keys;
853                 ctdb_db->statistics.num_hot_keys++;
854         } else {
855                 id = 0;
856         }
857
858         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
859                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
860         }
861         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
862         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
863         ctdb_db->statistics.hot_keys[id].count = count;
864
865         keystr = hex_encode_talloc(ctdb_db,
866                                    (unsigned char *)key.dptr, key.dsize);
867         DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=%s id=%d "
868                             "count=%d\n", ctdb_db->db_name,
869                             keystr ? keystr : "" , id, count));
870         talloc_free(keystr);
871
872 sort_keys:
873         for (i = 1; i < MAX_HOT_KEYS; i++) {
874                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
875                         continue;
876                 }
877                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
878                         count = ctdb_db->statistics.hot_keys[i].count;
879                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
880                         ctdb_db->statistics.hot_keys[0].count = count;
881
882                         key = ctdb_db->statistics.hot_keys[i].key;
883                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
884                         ctdb_db->statistics.hot_keys[0].key = key;
885                 }
886         }
887 }
888
889 /*
890   called when a CTDB_REQ_CALL packet comes in
891 */
892 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
893 {
894         struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
895         TDB_DATA data;
896         struct ctdb_reply_call_old *r;
897         int ret, len;
898         struct ctdb_ltdb_header header;
899         struct ctdb_call *call;
900         struct ctdb_db_context *ctdb_db;
901         int tmp_count, bucket;
902
903         if (ctdb->methods == NULL) {
904                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
905                 return;
906         }
907
908
909         ctdb_db = find_ctdb_db(ctdb, c->db_id);
910         if (!ctdb_db) {
911                 ctdb_send_error(ctdb, hdr, -1,
912                                 "Unknown database in request. db_id==0x%08x",
913                                 c->db_id);
914                 return;
915         }
916
917         call = talloc(hdr, struct ctdb_call);
918         CTDB_NO_MEMORY_FATAL(ctdb, call);
919
920         call->call_id  = c->callid;
921         call->key.dptr = c->data;
922         call->key.dsize = c->keylen;
923         call->call_data.dptr = c->data + c->keylen;
924         call->call_data.dsize = c->calldatalen;
925         call->reply_data.dptr  = NULL;
926         call->reply_data.dsize = 0;
927
928
929         /* If this record is pinned down we should defer the
930            request until the pindown times out
931         */
932         if (ctdb_db->sticky) {
933                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
934                         DEBUG(DEBUG_WARNING,
935                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
936                         talloc_free(call);
937                         return;
938                 }
939         }
940
941         if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
942                 talloc_free(call);
943                 return;
944         }
945
946         /* determine if we are the dmaster for this key. This also
947            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
948            if the call will be answered locally */
949
950         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
951                                            ctdb_call_input_pkt, ctdb, false);
952         if (ret == -1) {
953                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
954                 talloc_free(call);
955                 return;
956         }
957         if (ret == -2) {
958                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
959                 talloc_free(call);
960                 return;
961         }
962
963         /* Dont do READONLY if we don't have a tracking database */
964         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
965                 c->flags &= ~CTDB_WANT_READONLY;
966         }
967
968         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
969                 header.flags &= ~CTDB_REC_RO_FLAGS;
970                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
971                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
972                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
973                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
974                 }
975                 /* and clear out the tracking data */
976                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
977                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
978                 }
979         }
980
981         /* if we are revoking, we must defer all other calls until the revoke
982          * had completed.
983          */
984         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
985                 talloc_free(data.dptr);
986                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
987
988                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
989                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
990                 }
991                 talloc_free(call);
992                 return;
993         }
994
995         /*
996          * If we are not the dmaster and are not hosting any delegations,
997          * then we redirect the request to the node than can answer it
998          * (the lmaster or the dmaster).
999          */
1000         if ((header.dmaster != ctdb->pnn) 
1001             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
1002                 talloc_free(data.dptr);
1003                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
1004
1005                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1006                 if (ret != 0) {
1007                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1008                 }
1009                 talloc_free(call);
1010                 return;
1011         }
1012
1013         if ( (!(c->flags & CTDB_WANT_READONLY))
1014         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1015                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
1016                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1017                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1018                 }
1019                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1020
1021                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1022                         ctdb_fatal(ctdb, "Failed to start record revoke");
1023                 }
1024                 talloc_free(data.dptr);
1025
1026                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1027                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1028                 }
1029                 talloc_free(call);
1030
1031                 return;
1032         }               
1033
1034         /* If this is the first request for delegation. bump rsn and set
1035          * the delegations flag
1036          */
1037         if ((c->flags & CTDB_WANT_READONLY)
1038         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1039         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1040                 header.rsn     += 3;
1041                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
1042                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1043                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1044                 }
1045         }
1046         if ((c->flags & CTDB_WANT_READONLY) 
1047         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1048                 TDB_DATA tdata;
1049
1050                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1051                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1052                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1053                 }
1054                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1055                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
1056                 }
1057                 free(tdata.dptr);
1058
1059                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1060                 if (ret != 0) {
1061                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1062                 }
1063
1064                 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1065                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1066                                             struct ctdb_reply_call_old);
1067                 CTDB_NO_MEMORY_FATAL(ctdb, r);
1068                 r->hdr.destnode  = c->hdr.srcnode;
1069                 r->hdr.reqid     = c->hdr.reqid;
1070                 r->hdr.generation = ctdb_db->generation;
1071                 r->status        = 0;
1072                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
1073                 header.rsn      -= 2;
1074                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
1075                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1076                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1077
1078                 if (data.dsize) {
1079                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1080                 }
1081
1082                 ctdb_queue_packet(ctdb, &r->hdr);
1083                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1084                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1085
1086                 talloc_free(r);
1087                 talloc_free(call);
1088                 return;
1089         }
1090
1091         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1092         tmp_count = c->hopcount;
1093         bucket = 0;
1094         while (tmp_count) {
1095                 tmp_count >>= 2;
1096                 bucket++;
1097         }
1098         if (bucket >= MAX_COUNT_BUCKETS) {
1099                 bucket = MAX_COUNT_BUCKETS - 1;
1100         }
1101         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1102         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1103
1104         /* If this database supports sticky records, then check if the
1105            hopcount is big. If it is it means the record is hot and we
1106            should make it sticky.
1107         */
1108         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1109                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1110         }
1111
1112
1113         /* Try if possible to migrate the record off to the caller node.
1114          * From the clients perspective a fetch of the data is just as 
1115          * expensive as a migration.
1116          */
1117         if (c->hdr.srcnode != ctdb->pnn) {
1118                 if (ctdb_db->persistent_state) {
1119                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1120                               " of key %s while transaction is active\n",
1121                               (char *)call->key.dptr));
1122                 } else {
1123                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1124                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1125                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1126                         talloc_free(data.dptr);
1127
1128                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1129                         if (ret != 0) {
1130                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1131                         }
1132                 }
1133                 talloc_free(call);
1134                 return;
1135         }
1136
1137         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1138         if (ret != 0) {
1139                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1140                 call->status = -1;
1141         }
1142
1143         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1144         if (ret != 0) {
1145                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1146         }
1147
1148         len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1149         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1150                                     struct ctdb_reply_call_old);
1151         CTDB_NO_MEMORY_FATAL(ctdb, r);
1152         r->hdr.destnode  = hdr->srcnode;
1153         r->hdr.reqid     = hdr->reqid;
1154         r->hdr.generation = ctdb_db->generation;
1155         r->status        = call->status;
1156         r->datalen       = call->reply_data.dsize;
1157         if (call->reply_data.dsize) {
1158                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1159         }
1160
1161         ctdb_queue_packet(ctdb, &r->hdr);
1162
1163         talloc_free(r);
1164         talloc_free(call);
1165 }
1166
1167 /**
1168  * called when a CTDB_REPLY_CALL packet comes in
1169  *
1170  * This packet comes in response to a CTDB_REQ_CALL request packet. It
1171  * contains any reply data from the call
1172  */
1173 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1174 {
1175         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1176         struct ctdb_call_state *state;
1177
1178         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1179         if (state == NULL) {
1180                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1181                 return;
1182         }
1183
1184         if (hdr->reqid != state->reqid) {
1185                 /* we found a record  but it was the wrong one */
1186                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1187                 return;
1188         }
1189
1190
1191         /* read only delegation processing */
1192         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1193          * delegation since we may need to update the record header
1194          */
1195         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1196                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1197                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1198                 struct ctdb_ltdb_header oldheader;
1199                 TDB_DATA key, data, olddata;
1200                 int ret;
1201
1202                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1203                         goto finished_ro;
1204                         return;
1205                 }
1206
1207                 key.dsize = state->c->keylen;
1208                 key.dptr  = state->c->data;
1209                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1210                                      ctdb_call_input_pkt, ctdb, false);
1211                 if (ret == -2) {
1212                         return;
1213                 }
1214                 if (ret != 0) {
1215                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1216                         return;
1217                 }
1218
1219                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1220                 if (ret != 0) {
1221                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1222                         ctdb_ltdb_unlock(ctdb_db, key);
1223                         goto finished_ro;
1224                 }                       
1225
1226                 if (header->rsn <= oldheader.rsn) {
1227                         ctdb_ltdb_unlock(ctdb_db, key);
1228                         goto finished_ro;
1229                 }
1230
1231                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1232                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1233                         ctdb_ltdb_unlock(ctdb_db, key);
1234                         goto finished_ro;
1235                 }
1236
1237                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1238                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1239                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1240                 if (ret != 0) {
1241                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1242                         ctdb_ltdb_unlock(ctdb_db, key);
1243                         goto finished_ro;
1244                 }                       
1245
1246                 ctdb_ltdb_unlock(ctdb_db, key);
1247         }
1248 finished_ro:
1249
1250         state->call->reply_data.dptr = c->data;
1251         state->call->reply_data.dsize = c->datalen;
1252         state->call->status = c->status;
1253
1254         talloc_steal(state, c);
1255
1256         state->state = CTDB_CALL_DONE;
1257         if (state->async.fn) {
1258                 state->async.fn(state);
1259         }
1260 }
1261
1262
1263 /**
1264  * called when a CTDB_REPLY_DMASTER packet comes in
1265  *
1266  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1267  * request packet. It means that the current dmaster wants to give us
1268  * the dmaster role.
1269  */
1270 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1271 {
1272         struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1273         struct ctdb_db_context *ctdb_db;
1274         TDB_DATA key, data;
1275         uint32_t record_flags = 0;
1276         size_t len;
1277         int ret;
1278
1279         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1280         if (ctdb_db == NULL) {
1281                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1282                 return;
1283         }
1284         
1285         key.dptr = c->data;
1286         key.dsize = c->keylen;
1287         data.dptr = &c->data[key.dsize];
1288         data.dsize = c->datalen;
1289         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1290                 + sizeof(uint32_t);
1291         if (len <= c->hdr.length) {
1292                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1293                        sizeof(record_flags));
1294         }
1295
1296         dmaster_defer_setup(ctdb_db, hdr, key);
1297
1298         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1299                                      ctdb_call_input_pkt, ctdb, false);
1300         if (ret == -2) {
1301                 return;
1302         }
1303         if (ret != 0) {
1304                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1305                 return;
1306         }
1307
1308         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1309 }
1310
1311
1312 /*
1313   called when a CTDB_REPLY_ERROR packet comes in
1314 */
1315 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1316 {
1317         struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1318         struct ctdb_call_state *state;
1319
1320         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1321         if (state == NULL) {
1322                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1323                          ctdb->pnn, hdr->reqid));
1324                 return;
1325         }
1326
1327         if (hdr->reqid != state->reqid) {
1328                 /* we found a record  but it was the wrong one */
1329                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1330                 return;
1331         }
1332
1333         talloc_steal(state, c);
1334
1335         state->state  = CTDB_CALL_ERROR;
1336         state->errmsg = (char *)c->msg;
1337         if (state->async.fn) {
1338                 state->async.fn(state);
1339         }
1340 }
1341
1342
1343 /*
1344   destroy a ctdb_call
1345 */
1346 static int ctdb_call_destructor(struct ctdb_call_state *state)
1347 {
1348         DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1349         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1350         return 0;
1351 }
1352
1353
1354 /*
1355   called when a ctdb_call needs to be resent after a reconfigure event
1356 */
1357 static void ctdb_call_resend(struct ctdb_call_state *state)
1358 {
1359         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1360
1361         state->generation = state->ctdb_db->generation;
1362
1363         /* use a new reqid, in case the old reply does eventually come in */
1364         reqid_remove(ctdb->idr, state->reqid);
1365         state->reqid = reqid_new(ctdb->idr, state);
1366         state->c->hdr.reqid = state->reqid;
1367
1368         /* update the generation count for this request, so its valid with the new vnn_map */
1369         state->c->hdr.generation = state->generation;
1370
1371         /* send the packet to ourselves, it will be redirected appropriately */
1372         state->c->hdr.destnode = ctdb->pnn;
1373
1374         ctdb_queue_packet(ctdb, &state->c->hdr);
1375         DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1376                             state->ctdb_db->db_name, state->reqid, state->generation));
1377 }
1378
1379 /*
1380   resend all pending calls on recovery
1381  */
1382 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1383 {
1384         struct ctdb_call_state *state, *next;
1385
1386         for (state = ctdb_db->pending_calls; state; state = next) {
1387                 next = state->next;
1388                 ctdb_call_resend(state);
1389         }
1390 }
1391
1392 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1393 {
1394         struct ctdb_db_context *ctdb_db;
1395
1396         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1397                 ctdb_call_resend_db(ctdb_db);
1398         }
1399 }
1400
1401 /*
1402   this allows the caller to setup a async.fn 
1403 */
1404 static void call_local_trigger(struct tevent_context *ev,
1405                                struct tevent_timer *te,
1406                                struct timeval t, void *private_data)
1407 {
1408         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1409         if (state->async.fn) {
1410                 state->async.fn(state);
1411         }
1412 }       
1413
1414
1415 /*
1416   construct an event driven local ctdb_call
1417
1418   this is used so that locally processed ctdb_call requests are processed
1419   in an event driven manner
1420 */
1421 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1422                                              struct ctdb_call *call,
1423                                              struct ctdb_ltdb_header *header,
1424                                              TDB_DATA *data)
1425 {
1426         struct ctdb_call_state *state;
1427         struct ctdb_context *ctdb = ctdb_db->ctdb;
1428         int ret;
1429
1430         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1431         CTDB_NO_MEMORY_NULL(ctdb, state);
1432
1433         talloc_steal(state, data->dptr);
1434
1435         state->state = CTDB_CALL_DONE;
1436         state->call  = talloc(state, struct ctdb_call);
1437         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1438         *(state->call) = *call;
1439         state->ctdb_db = ctdb_db;
1440
1441         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1442         if (ret != 0) {
1443                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1444         }
1445
1446         tevent_add_timer(ctdb->ev, state, timeval_zero(),
1447                          call_local_trigger, state);
1448
1449         return state;
1450 }
1451
1452
1453 /*
1454   make a remote ctdb call - async send. Called in daemon context.
1455
1456   This constructs a ctdb_call request and queues it for processing. 
1457   This call never blocks.
1458 */
1459 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1460                                                      struct ctdb_call *call, 
1461                                                      struct ctdb_ltdb_header *header)
1462 {
1463         uint32_t len;
1464         struct ctdb_call_state *state;
1465         struct ctdb_context *ctdb = ctdb_db->ctdb;
1466
1467         if (ctdb->methods == NULL) {
1468                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1469                 return NULL;
1470         }
1471
1472         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1473         CTDB_NO_MEMORY_NULL(ctdb, state);
1474         state->call = talloc(state, struct ctdb_call);
1475         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1476
1477         state->reqid = reqid_new(ctdb->idr, state);
1478         state->ctdb_db = ctdb_db;
1479         talloc_set_destructor(state, ctdb_call_destructor);
1480
1481         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1482         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1483                                            struct ctdb_req_call_old);
1484         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1485         state->c->hdr.destnode  = header->dmaster;
1486
1487         /* this limits us to 16k outstanding messages - not unreasonable */
1488         state->c->hdr.reqid     = state->reqid;
1489         state->c->hdr.generation = ctdb_db->generation;
1490         state->c->flags         = call->flags;
1491         state->c->db_id         = ctdb_db->db_id;
1492         state->c->callid        = call->call_id;
1493         state->c->hopcount      = 0;
1494         state->c->keylen        = call->key.dsize;
1495         state->c->calldatalen   = call->call_data.dsize;
1496         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1497         memcpy(&state->c->data[call->key.dsize], 
1498                call->call_data.dptr, call->call_data.dsize);
1499         *(state->call)              = *call;
1500         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1501         state->call->key.dptr       = &state->c->data[0];
1502
1503         state->state  = CTDB_CALL_WAIT;
1504         state->generation = ctdb_db->generation;
1505
1506         DLIST_ADD(ctdb_db->pending_calls, state);
1507
1508         ctdb_queue_packet(ctdb, &state->c->hdr);
1509
1510         return state;
1511 }
1512
1513 /*
1514   make a remote ctdb call - async recv - called in daemon context
1515
1516   This is called when the program wants to wait for a ctdb_call to complete and get the 
1517   results. This call will block unless the call has already completed.
1518 */
1519 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1520 {
1521         while (state->state < CTDB_CALL_DONE) {
1522                 tevent_loop_once(state->ctdb_db->ctdb->ev);
1523         }
1524         if (state->state != CTDB_CALL_DONE) {
1525                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1526                 talloc_free(state);
1527                 return -1;
1528         }
1529
1530         if (state->call->reply_data.dsize) {
1531                 call->reply_data.dptr = talloc_memdup(call,
1532                                                       state->call->reply_data.dptr,
1533                                                       state->call->reply_data.dsize);
1534                 call->reply_data.dsize = state->call->reply_data.dsize;
1535         } else {
1536                 call->reply_data.dptr = NULL;
1537                 call->reply_data.dsize = 0;
1538         }
1539         call->status = state->call->status;
1540         talloc_free(state);
1541         return 0;
1542 }
1543
1544
1545 /* 
1546    send a keepalive packet to the other node
1547 */
1548 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1549 {
1550         struct ctdb_req_keepalive_old *r;
1551         
1552         if (ctdb->methods == NULL) {
1553                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1554                 return;
1555         }
1556
1557         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1558                                     sizeof(struct ctdb_req_keepalive_old), 
1559                                     struct ctdb_req_keepalive_old);
1560         CTDB_NO_MEMORY_FATAL(ctdb, r);
1561         r->hdr.destnode  = destnode;
1562         r->hdr.reqid     = 0;
1563         
1564         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1565
1566         ctdb_queue_packet(ctdb, &r->hdr);
1567
1568         talloc_free(r);
1569 }
1570
1571
1572
1573 struct revokechild_deferred_call {
1574         struct ctdb_context *ctdb;
1575         struct ctdb_req_header *hdr;
1576         deferred_requeue_fn fn;
1577         void *ctx;
1578 };
1579
1580 struct revokechild_handle {
1581         struct revokechild_handle *next, *prev;
1582         struct ctdb_context *ctdb;
1583         struct ctdb_db_context *ctdb_db;
1584         struct tevent_fd *fde;
1585         int status;
1586         int fd[2];
1587         pid_t child;
1588         TDB_DATA key;
1589 };
1590
1591 struct revokechild_requeue_handle {
1592         struct ctdb_context *ctdb;
1593         struct ctdb_req_header *hdr;
1594         deferred_requeue_fn fn;
1595         void *ctx;
1596 };
1597
1598 static void deferred_call_requeue(struct tevent_context *ev,
1599                                   struct tevent_timer *te,
1600                                   struct timeval t, void *private_data)
1601 {
1602         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1603
1604         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1605         talloc_free(requeue_handle);
1606 }
1607
1608 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1609 {
1610         struct ctdb_context *ctdb = deferred_call->ctdb;
1611         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1612
1613         requeue_handle->ctdb = ctdb;
1614         requeue_handle->hdr  = deferred_call->hdr;
1615         requeue_handle->fn   = deferred_call->fn;
1616         requeue_handle->ctx  = deferred_call->ctx;
1617         talloc_steal(requeue_handle, requeue_handle->hdr);
1618
1619         /* Always delay revoke requests.  Either wait for the read/write
1620          * operation to complete, or if revoking failed wait for recovery to
1621          * complete
1622          */
1623         tevent_add_timer(ctdb->ev, requeue_handle,
1624                          timeval_current_ofs(1, 0),
1625                          deferred_call_requeue, requeue_handle);
1626
1627         return 0;
1628 }
1629
1630
1631 static int revokechild_destructor(struct revokechild_handle *rc)
1632 {
1633         if (rc->fde != NULL) {
1634                 talloc_free(rc->fde);
1635         }
1636
1637         if (rc->fd[0] != -1) {
1638                 close(rc->fd[0]);
1639         }
1640         if (rc->fd[1] != -1) {
1641                 close(rc->fd[1]);
1642         }
1643         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1644
1645         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1646         return 0;
1647 }
1648
1649 static void revokechild_handler(struct tevent_context *ev,
1650                                 struct tevent_fd *fde,
1651                                 uint16_t flags, void *private_data)
1652 {
1653         struct revokechild_handle *rc = talloc_get_type(private_data, 
1654                                                      struct revokechild_handle);
1655         int ret;
1656         char c;
1657
1658         ret = sys_read(rc->fd[0], &c, 1);
1659         if (ret != 1) {
1660                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1661                 rc->status = -1;
1662                 talloc_free(rc);
1663                 return;
1664         }
1665         if (c != 0) {
1666                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1667                 rc->status = -1;
1668                 talloc_free(rc);
1669                 return;
1670         }
1671
1672         talloc_free(rc);
1673 }
1674
1675 struct ctdb_revoke_state {
1676         struct ctdb_db_context *ctdb_db;
1677         TDB_DATA key;
1678         struct ctdb_ltdb_header *header;
1679         TDB_DATA data;
1680         int count;
1681         int status;
1682         int finished;
1683 };
1684
1685 static void update_record_cb(struct ctdb_client_control_state *state)
1686 {
1687         struct ctdb_revoke_state *revoke_state;
1688         int ret;
1689         int32_t res;
1690
1691         if (state == NULL) {
1692                 return;
1693         }
1694         revoke_state = state->async.private_data;
1695
1696         state->async.fn = NULL;
1697         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1698         if ((ret != 0) || (res != 0)) {
1699                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1700                 revoke_state->status = -1;
1701         }
1702
1703         revoke_state->count--;
1704         if (revoke_state->count <= 0) {
1705                 revoke_state->finished = 1;
1706         }
1707 }
1708
1709 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1710 {
1711         struct ctdb_revoke_state *revoke_state = private_data;
1712         struct ctdb_client_control_state *state;
1713
1714         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1715         if (state == NULL) {
1716                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1717                 revoke_state->status = -1;
1718                 return;
1719         }
1720         state->async.fn           = update_record_cb;
1721         state->async.private_data = revoke_state;
1722
1723         revoke_state->count++;
1724
1725 }
1726
1727 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1728                                         struct tevent_timer *te,
1729                                         struct timeval yt, void *private_data)
1730 {
1731         struct ctdb_revoke_state *state = private_data;
1732
1733         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1734         state->finished = 1;
1735         state->status   = -1;
1736 }
1737
1738 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1739 {
1740         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1741         struct ctdb_ltdb_header new_header;
1742         TDB_DATA new_data;
1743
1744         state->ctdb_db = ctdb_db;
1745         state->key     = key;
1746         state->header  = header;
1747         state->data    = data;
1748  
1749         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1750
1751         tevent_add_timer(ctdb->ev, state,
1752                          timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1753                          ctdb_revoke_timeout_handler, state);
1754
1755         while (state->finished == 0) {
1756                 tevent_loop_once(ctdb->ev);
1757         }
1758
1759         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1760                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1761                 talloc_free(state);
1762                 return -1;
1763         }
1764         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1765                 ctdb_ltdb_unlock(ctdb_db, key);
1766                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1767                 talloc_free(state);
1768                 return -1;
1769         }
1770         header->rsn++;
1771         if (new_header.rsn > header->rsn) {
1772                 ctdb_ltdb_unlock(ctdb_db, key);
1773                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1774                 talloc_free(state);
1775                 return -1;
1776         }
1777         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1778                 ctdb_ltdb_unlock(ctdb_db, key);
1779                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1780                 talloc_free(state);
1781                 return -1;
1782         }
1783
1784         /*
1785          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1786          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1787          */
1788         if (state->status == 0) {
1789                 new_header.rsn++;
1790                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1791         } else {
1792                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1793                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1794         }
1795         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1796                 ctdb_ltdb_unlock(ctdb_db, key);
1797                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1798                 talloc_free(state);
1799                 return -1;
1800         }
1801         ctdb_ltdb_unlock(ctdb_db, key);
1802
1803         talloc_free(state);
1804         return 0;
1805 }
1806
1807
1808 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1809 {
1810         TDB_DATA tdata;
1811         struct revokechild_handle *rc;
1812         pid_t parent = getpid();
1813         int ret;
1814
1815         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1816         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1817         header->rsn   -= 1;
1818
1819         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1820                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1821                 return -1;
1822         }
1823
1824         tdata = tdb_fetch(ctdb_db->rottdb, key);
1825         if (tdata.dsize > 0) {
1826                 uint8_t *tmp;
1827
1828                 tmp = tdata.dptr;
1829                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1830                 free(tmp);
1831         }
1832
1833         rc->status    = 0;
1834         rc->ctdb      = ctdb;
1835         rc->ctdb_db   = ctdb_db;
1836         rc->fd[0]     = -1;
1837         rc->fd[1]     = -1;
1838
1839         talloc_set_destructor(rc, revokechild_destructor);
1840
1841         rc->key.dsize = key.dsize;
1842         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1843         if (rc->key.dptr == NULL) {
1844                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1845                 talloc_free(rc);
1846                 return -1;
1847         }
1848
1849         ret = pipe(rc->fd);
1850         if (ret != 0) {
1851                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1852                 talloc_free(rc);
1853                 return -1;
1854         }
1855
1856
1857         rc->child = ctdb_fork(ctdb);
1858         if (rc->child == (pid_t)-1) {
1859                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1860                 talloc_free(rc);
1861                 return -1;
1862         }
1863
1864         if (rc->child == 0) {
1865                 char c = 0;
1866                 close(rc->fd[0]);
1867
1868                 prctl_set_comment("ctdb_revokechild");
1869                 if (switch_from_server_to_client(ctdb) != 0) {
1870                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1871                         c = 1;
1872                         goto child_finished;
1873                 }
1874
1875                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1876
1877 child_finished:
1878                 sys_write(rc->fd[1], &c, 1);
1879                 ctdb_wait_for_process_to_exit(parent);
1880                 _exit(0);
1881         }
1882
1883         close(rc->fd[1]);
1884         rc->fd[1] = -1;
1885         set_close_on_exec(rc->fd[0]);
1886
1887         /* This is an active revokechild child process */
1888         DLIST_ADD_END(ctdb_db->revokechild_active, rc);
1889
1890         rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1891                                 revokechild_handler, (void *)rc);
1892         if (rc->fde == NULL) {
1893                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1894                 talloc_free(rc);
1895         }
1896         tevent_fd_set_auto_close(rc->fde);
1897
1898         return 0;
1899 }
1900
1901 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1902 {
1903         struct revokechild_handle *rc;
1904         struct revokechild_deferred_call *deferred_call;
1905
1906         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1907                 if (rc->key.dsize == 0) {
1908                         continue;
1909                 }
1910                 if (rc->key.dsize != key.dsize) {
1911                         continue;
1912                 }
1913                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1914                         break;
1915                 }
1916         }
1917
1918         if (rc == NULL) {
1919                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1920                 return -1;
1921         }
1922
1923         deferred_call = talloc(rc, struct revokechild_deferred_call);
1924         if (deferred_call == NULL) {
1925                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1926                 return -1;
1927         }
1928
1929         deferred_call->ctdb = ctdb;
1930         deferred_call->hdr  = hdr;
1931         deferred_call->fn   = fn;
1932         deferred_call->ctx  = call_context;
1933
1934         talloc_set_destructor(deferred_call, deferred_call_destructor);
1935         talloc_steal(deferred_call, hdr);
1936
1937         return 0;
1938 }
1939
1940 static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter,
1941                                          void *private_data)
1942 {
1943         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1944                 private_data, struct ctdb_db_context);
1945         int value;
1946
1947         value = (counter < INT_MAX ? counter : INT_MAX);
1948         ctdb_update_db_stat_hot_keys(ctdb_db, key, value);
1949 }
1950
1951 static void ctdb_migration_cleandb_event(struct tevent_context *ev,
1952                                          struct tevent_timer *te,
1953                                          struct timeval current_time,
1954                                          void *private_data)
1955 {
1956         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1957                 private_data, struct ctdb_db_context);
1958
1959         if (ctdb_db->migratedb == NULL) {
1960                 return;
1961         }
1962
1963         hash_count_expire(ctdb_db->migratedb, NULL);
1964
1965         te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
1966                               tevent_timeval_current_ofs(10, 0),
1967                               ctdb_migration_cleandb_event, ctdb_db);
1968         if (te == NULL) {
1969                 DEBUG(DEBUG_ERR,
1970                       ("Memory error in migration cleandb event for %s\n",
1971                        ctdb_db->db_name));
1972                 TALLOC_FREE(ctdb_db->migratedb);
1973         }
1974 }
1975
1976 int ctdb_migration_init(struct ctdb_db_context *ctdb_db)
1977 {
1978         struct timeval one_second = { 1, 0 };
1979         struct tevent_timer *te;
1980         int ret;
1981
1982         if (ctdb_db->persistent) {
1983                 return 0;
1984         }
1985
1986         ret = hash_count_init(ctdb_db, one_second,
1987                               ctdb_migration_count_handler, ctdb_db,
1988                               &ctdb_db->migratedb);
1989         if (ret != 0) {
1990                 DEBUG(DEBUG_ERR,
1991                       ("Memory error in migration init for %s\n",
1992                        ctdb_db->db_name));
1993                 return -1;
1994         }
1995
1996         te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
1997                               tevent_timeval_current_ofs(10, 0),
1998                               ctdb_migration_cleandb_event, ctdb_db);
1999         if (te == NULL) {
2000                 DEBUG(DEBUG_ERR,
2001                       ("Memory error in migration init for %s\n",
2002                        ctdb_db->db_name));
2003                 TALLOC_FREE(ctdb_db->migratedb);
2004                 return -1;
2005         }
2006
2007         return 0;
2008 }