Remove explicit include of lib/tevent/tevent.h.
[samba.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30
31 struct ctdb_sticky_record {
32         struct ctdb_context *ctdb;
33         struct ctdb_db_context *ctdb_db;
34         TDB_CONTEXT *pindown;
35 };
36
37 /*
38   find the ctdb_db from a db index
39  */
40  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
41 {
42         struct ctdb_db_context *ctdb_db;
43
44         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45                 if (ctdb_db->db_id == id) {
46                         break;
47                 }
48         }
49         return ctdb_db;
50 }
51
52 /*
53   a varient of input packet that can be used in lock requeue
54 */
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
56 {
57         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58         ctdb_input_pkt(ctdb, hdr);
59 }
60
61
62 /*
63   send an error reply
64 */
65 static void ctdb_send_error(struct ctdb_context *ctdb, 
66                             struct ctdb_req_header *hdr, uint32_t status,
67                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb, 
69                             struct ctdb_req_header *hdr, uint32_t status,
70                             const char *fmt, ...)
71 {
72         va_list ap;
73         struct ctdb_reply_error *r;
74         char *msg;
75         int msglen, len;
76
77         if (ctdb->methods == NULL) {
78                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79                 return;
80         }
81
82         va_start(ap, fmt);
83         msg = talloc_vasprintf(ctdb, fmt, ap);
84         if (msg == NULL) {
85                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
86         }
87         va_end(ap);
88
89         msglen = strlen(msg)+1;
90         len = offsetof(struct ctdb_reply_error, msg);
91         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
92                                     struct ctdb_reply_error);
93         CTDB_NO_MEMORY_FATAL(ctdb, r);
94
95         r->hdr.destnode  = hdr->srcnode;
96         r->hdr.reqid     = hdr->reqid;
97         r->status        = status;
98         r->msglen        = msglen;
99         memcpy(&r->msg[0], msg, msglen);
100
101         ctdb_queue_packet(ctdb, &r->hdr);
102
103         talloc_free(msg);
104 }
105
106
107 /**
108  * send a redirect reply
109  *
110  * The logic behind this function is this:
111  *
112  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113  * to its local ctdb (ctdb_request_call). If the node is not itself
114  * the record's DMASTER, it first redirects the packet to  the
115  * record's LMASTER. The LMASTER then redirects the call packet to
116  * the current DMASTER. But there is a race: The record may have
117  * been migrated off the DMASTER while the redirected packet is
118  * on the wire (or in the local queue). So in case the record has
119  * migrated off the new destinaton of the call packet, instead of
120  * going back to the LMASTER to get the new DMASTER, we try to
121  * reduce round-trips by first chasing the record a couple of times
122  * before giving up the direct chase and finally going back to the
123  * LMASTER (again). Note that this works because of this: When
124  * a record is migrated off a node, then the new DMASTER is stored
125  * in the record's copy on the former DMASTER.
126  *
127  * The maximum number of attempts for direct chase to make before
128  * going back to the LMASTER is configurable by the tunable
129  * "MaxRedirectCount".
130  */
131 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
132                                     struct ctdb_db_context *ctdb_db,
133                                     TDB_DATA key,
134                                     struct ctdb_req_call *c, 
135                                     struct ctdb_ltdb_header *header)
136 {
137         
138         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
139
140         c->hdr.destnode = lmaster;
141         if (ctdb->pnn == lmaster) {
142                 c->hdr.destnode = header->dmaster;
143         }
144         c->hopcount++;
145
146         if (c->hopcount%100 == 99) {
147                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:0x%08x "
148                         "key:0x%08x pnn:%d src:%d lmaster:%d "
149                         "header->dmaster:%d dst:%d\n",
150                         c->hopcount, ctdb_db->db_id, ctdb_hash(&key),
151                         ctdb->pnn, c->hdr.srcnode, lmaster,
152                         header->dmaster, c->hdr.destnode));
153         }
154
155         ctdb_queue_packet(ctdb, &c->hdr);
156 }
157
158
159 /*
160   send a dmaster reply
161
162   caller must have the chainlock before calling this routine. Caller must be
163   the lmaster
164 */
165 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
166                                     struct ctdb_ltdb_header *header,
167                                     TDB_DATA key, TDB_DATA data,
168                                     uint32_t new_dmaster,
169                                     uint32_t reqid)
170 {
171         struct ctdb_context *ctdb = ctdb_db->ctdb;
172         struct ctdb_reply_dmaster *r;
173         int ret, len;
174         TALLOC_CTX *tmp_ctx;
175
176         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
177                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
178                 return;
179         }
180
181         header->dmaster = new_dmaster;
182         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
183         if (ret != 0) {
184                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
185                 return;
186         }
187
188         if (ctdb->methods == NULL) {
189                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
190                 return;
191         }
192
193         /* put the packet on a temporary context, allowing us to safely free
194            it below even if ctdb_reply_dmaster() has freed it already */
195         tmp_ctx = talloc_new(ctdb);
196
197         /* send the CTDB_REPLY_DMASTER */
198         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
199         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
200                                     struct ctdb_reply_dmaster);
201         CTDB_NO_MEMORY_FATAL(ctdb, r);
202
203         r->hdr.destnode  = new_dmaster;
204         r->hdr.reqid     = reqid;
205         r->rsn           = header->rsn;
206         r->keylen        = key.dsize;
207         r->datalen       = data.dsize;
208         r->db_id         = ctdb_db->db_id;
209         memcpy(&r->data[0], key.dptr, key.dsize);
210         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
211         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
212
213         ctdb_queue_packet(ctdb, &r->hdr);
214
215         talloc_free(tmp_ctx);
216 }
217
218 /*
219   send a dmaster request (give another node the dmaster for a record)
220
221   This is always sent to the lmaster, which ensures that the lmaster
222   always knows who the dmaster is. The lmaster will then send a
223   CTDB_REPLY_DMASTER to the new dmaster
224 */
225 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
226                                    struct ctdb_req_call *c, 
227                                    struct ctdb_ltdb_header *header,
228                                    TDB_DATA *key, TDB_DATA *data)
229 {
230         struct ctdb_req_dmaster *r;
231         struct ctdb_context *ctdb = ctdb_db->ctdb;
232         int len;
233         uint32_t lmaster = ctdb_lmaster(ctdb, key);
234
235         if (ctdb->methods == NULL) {
236                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
237                 return;
238         }
239
240         if (data->dsize != 0) {
241                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
242         }
243
244         if (lmaster == ctdb->pnn) {
245                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
246                                         c->hdr.srcnode, c->hdr.reqid);
247                 return;
248         }
249         
250         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
251                         + sizeof(uint32_t);
252         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
253                                     struct ctdb_req_dmaster);
254         CTDB_NO_MEMORY_FATAL(ctdb, r);
255         r->hdr.destnode  = lmaster;
256         r->hdr.reqid     = c->hdr.reqid;
257         r->db_id         = c->db_id;
258         r->rsn           = header->rsn;
259         r->dmaster       = c->hdr.srcnode;
260         r->keylen        = key->dsize;
261         r->datalen       = data->dsize;
262         memcpy(&r->data[0], key->dptr, key->dsize);
263         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
264         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
265
266         header->dmaster = c->hdr.srcnode;
267         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
268                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
269         }
270         
271         ctdb_queue_packet(ctdb, &r->hdr);
272
273         talloc_free(r);
274 }
275
276 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
277                                        struct timeval t, void *private_data)
278 {
279         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
280                                                        struct ctdb_sticky_record);
281
282         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
283         if (sr->pindown != NULL) {
284                 talloc_free(sr->pindown);
285                 sr->pindown = NULL;
286         }
287 }
288
289 static int
290 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
291 {
292         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
293         uint32_t *k;
294         struct ctdb_sticky_record *sr;
295
296         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
297         if (k == NULL) {
298                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
299                 talloc_free(tmp_ctx);
300                 return -1;
301         }
302
303         k[0] = (key.dsize + 3) / 4 + 1;
304         memcpy(&k[1], key.dptr, key.dsize);
305
306         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
307         if (sr == NULL) {
308                 talloc_free(tmp_ctx);
309                 return 0;
310         }
311
312         talloc_free(tmp_ctx);
313
314         if (sr->pindown == NULL) {
315                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
316                 sr->pindown = talloc_new(sr);
317                 if (sr->pindown == NULL) {
318                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
319                         return -1;
320                 }
321                 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
322         }
323
324         return 0;
325 }
326
327 /*
328   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
329   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
330
331   must be called with the chainlock held. This function releases the chainlock
332 */
333 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
334                                 struct ctdb_req_header *hdr,
335                                 TDB_DATA key, TDB_DATA data,
336                                 uint64_t rsn, uint32_t record_flags)
337 {
338         struct ctdb_call_state *state;
339         struct ctdb_context *ctdb = ctdb_db->ctdb;
340         struct ctdb_ltdb_header header;
341         int ret;
342
343         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
344
345         ZERO_STRUCT(header);
346         header.rsn = rsn + 1;
347         header.dmaster = ctdb->pnn;
348         header.flags = record_flags;
349
350         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
351
352         if (state) {
353                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
354                         /*
355                          * We temporarily add the VACUUM_MIGRATED flag to
356                          * the record flags, so that ctdb_ltdb_store can
357                          * decide whether the record should be stored or
358                          * deleted.
359                          */
360                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
361                 }
362         }
363
364         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
365                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
366
367                 ret = ctdb_ltdb_unlock(ctdb_db, key);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
370                 }
371                 return;
372         }
373
374         /* we just became DMASTER and this database is "sticky",
375            see if the record is flagged as "hot" and set up a pin-down
376            context to stop migrations for a little while if so
377         */
378         if (ctdb_db->sticky) {
379                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
380         }
381
382         if (state == NULL) {
383                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
384                          ctdb->pnn, hdr->reqid, hdr->srcnode));
385
386                 ret = ctdb_ltdb_unlock(ctdb_db, key);
387                 if (ret != 0) {
388                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
389                 }
390                 return;
391         }
392
393         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
394                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
395
396                 ret = ctdb_ltdb_unlock(ctdb_db, key);
397                 if (ret != 0) {
398                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
399                 }
400                 return;
401         }
402
403         if (hdr->reqid != state->reqid) {
404                 /* we found a record  but it was the wrong one */
405                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
406
407                 ret = ctdb_ltdb_unlock(ctdb_db, key);
408                 if (ret != 0) {
409                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
410                 }
411                 return;
412         }
413
414         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
415
416         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
417         if (ret != 0) {
418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
419         }
420
421         state->state = CTDB_CALL_DONE;
422         if (state->async.fn) {
423                 state->async.fn(state);
424         }
425 }
426
427
428
429 /*
430   called when a CTDB_REQ_DMASTER packet comes in
431
432   this comes into the lmaster for a record when the current dmaster
433   wants to give up the dmaster role and give it to someone else
434 */
435 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
436 {
437         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
438         TDB_DATA key, data, data2;
439         struct ctdb_ltdb_header header;
440         struct ctdb_db_context *ctdb_db;
441         uint32_t record_flags = 0;
442         size_t len;
443         int ret;
444
445         key.dptr = c->data;
446         key.dsize = c->keylen;
447         data.dptr = c->data + c->keylen;
448         data.dsize = c->datalen;
449         len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
450                         + sizeof(uint32_t);
451         if (len <= c->hdr.length) {
452                 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
453         }
454
455         ctdb_db = find_ctdb_db(ctdb, c->db_id);
456         if (!ctdb_db) {
457                 ctdb_send_error(ctdb, hdr, -1,
458                                 "Unknown database in request. db_id==0x%08x",
459                                 c->db_id);
460                 return;
461         }
462         
463         /* fetch the current record */
464         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
465                                            ctdb_call_input_pkt, ctdb, False);
466         if (ret == -1) {
467                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
468                 return;
469         }
470         if (ret == -2) {
471                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
472                 return;
473         }
474
475         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
476                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
477                          ctdb->pnn, ctdb_lmaster(ctdb, &key), 
478                          hdr->generation, ctdb->vnn_map->generation));
479                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
480         }
481
482         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
483                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
484
485         /* its a protocol error if the sending node is not the current dmaster */
486         if (header.dmaster != hdr->srcnode) {
487                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
488                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
489                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
490                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
491                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
492                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
493                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
494
495                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
496                         ctdb_ltdb_unlock(ctdb_db, key);
497                         return;
498                 }
499         }
500
501         if (header.rsn > c->rsn) {
502                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
503                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
504                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
505                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
506         }
507
508         /* use the rsn from the sending node */
509         header.rsn = c->rsn;
510
511         /* store the record flags from the sending node */
512         header.flags = record_flags;
513
514         /* check if the new dmaster is the lmaster, in which case we
515            skip the dmaster reply */
516         if (c->dmaster == ctdb->pnn) {
517                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
518         } else {
519                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
520
521                 ret = ctdb_ltdb_unlock(ctdb_db, key);
522                 if (ret != 0) {
523                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
524                 }
525         }
526 }
527
528 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
529                                        struct timeval t, void *private_data)
530 {
531         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
532                                                        struct ctdb_sticky_record);
533         talloc_free(sr);
534 }
535
536 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
537 {
538         if (data) {
539                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
540                 talloc_free(data);
541         }
542         return parm;
543 }
544
545 static int
546 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
547 {
548         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
549         uint32_t *k;
550         struct ctdb_sticky_record *sr;
551
552         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
553         if (k == NULL) {
554                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
555                 talloc_free(tmp_ctx);
556                 return -1;
557         }
558
559         k[0] = (key.dsize + 3) / 4 + 1;
560         memcpy(&k[1], key.dptr, key.dsize);
561
562         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
563         if (sr != NULL) {
564                 talloc_free(tmp_ctx);
565                 return 0;
566         }
567
568         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
569         if (sr == NULL) {
570                 talloc_free(tmp_ctx);
571                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
572                 return -1;
573         }
574
575         sr->ctdb    = ctdb;
576         sr->ctdb_db = ctdb_db;
577         sr->pindown = NULL;
578
579         DEBUG(DEBUG_ERR,("Make record sticky in db %s\n", ctdb_db->db_name));
580
581         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
582
583         event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
584
585         talloc_free(tmp_ctx);
586         return 0;
587 }
588
589 struct pinned_down_requeue_handle {
590         struct ctdb_context *ctdb;
591         struct ctdb_req_header *hdr;
592 };
593
594 struct pinned_down_deferred_call {
595         struct ctdb_context *ctdb;
596         struct ctdb_req_header *hdr;
597 };
598
599 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
600                        struct timeval t, void *private_data)
601 {
602         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
603         struct ctdb_context *ctdb = handle->ctdb;
604
605         talloc_steal(ctdb, handle->hdr);
606         ctdb_call_input_pkt(ctdb, handle->hdr);
607
608         talloc_free(handle);
609 }
610
611 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
612 {
613         struct ctdb_context *ctdb = pinned_down->ctdb;
614         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
615
616         handle->ctdb = pinned_down->ctdb;
617         handle->hdr  = pinned_down->hdr;
618         talloc_steal(handle, handle->hdr);
619
620         event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
621
622         return 0;
623 }
624
625 static int
626 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
627 {
628         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
629         uint32_t *k;
630         struct ctdb_sticky_record *sr;
631         struct pinned_down_deferred_call *pinned_down;
632
633         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
634         if (k == NULL) {
635                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
636                 talloc_free(tmp_ctx);
637                 return -1;
638         }
639
640         k[0] = (key.dsize + 3) / 4 + 1;
641         memcpy(&k[1], key.dptr, key.dsize);
642
643         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
644         if (sr == NULL) {
645                 talloc_free(tmp_ctx);
646                 return -1;
647         }
648
649         talloc_free(tmp_ctx);
650
651         if (sr->pindown == NULL) {
652                 return -1;
653         }
654         
655         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
656         if (pinned_down == NULL) {
657                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
658                 return -1;
659         }
660
661         pinned_down->ctdb = ctdb;
662         pinned_down->hdr  = hdr;
663
664         talloc_set_destructor(pinned_down, pinned_down_destructor);
665         talloc_steal(pinned_down, hdr);
666
667         return 0;
668 }
669
670 /*
671   called when a CTDB_REQ_CALL packet comes in
672 */
673 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
674 {
675         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
676         TDB_DATA data;
677         struct ctdb_reply_call *r;
678         int ret, len;
679         struct ctdb_ltdb_header header;
680         struct ctdb_call *call;
681         struct ctdb_db_context *ctdb_db;
682         int tmp_count, bucket;
683
684         if (ctdb->methods == NULL) {
685                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
686                 return;
687         }
688
689
690         ctdb_db = find_ctdb_db(ctdb, c->db_id);
691         if (!ctdb_db) {
692                 ctdb_send_error(ctdb, hdr, -1,
693                                 "Unknown database in request. db_id==0x%08x",
694                                 c->db_id);
695                 return;
696         }
697
698         call = talloc(hdr, struct ctdb_call);
699         CTDB_NO_MEMORY_FATAL(ctdb, call);
700
701         call->call_id  = c->callid;
702         call->key.dptr = c->data;
703         call->key.dsize = c->keylen;
704         call->call_data.dptr = c->data + c->keylen;
705         call->call_data.dsize = c->calldatalen;
706         call->reply_data.dptr  = NULL;
707         call->reply_data.dsize = 0;
708
709
710         /* If this record is pinned down we should defer the
711            request until the pindown times out
712         */
713         if (ctdb_db->sticky) {
714                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
715                   DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
716                         return;
717                 }
718         }
719
720
721         /* determine if we are the dmaster for this key. This also
722            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
723            if the call will be answered locally */
724
725         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
726                                            ctdb_call_input_pkt, ctdb, False);
727         if (ret == -1) {
728                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
729                 return;
730         }
731         if (ret == -2) {
732                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
733                 return;
734         }
735
736         /* Dont do READONLY if we dont have a tracking database */
737         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
738                 c->flags &= ~CTDB_WANT_READONLY;
739         }
740
741         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
742                 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
743                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
744                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
745                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
746                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
747                 }
748                 /* and clear out the tracking data */
749                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
750                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
751                 }
752         }
753
754         /* if we are revoking, we must defer all other calls until the revoke
755          * had completed.
756          */
757         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
758                 talloc_free(data.dptr);
759                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
760
761                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
762                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
763                 }
764                 talloc_free(call);
765                 return;
766         }
767
768         /* if we are not the dmaster and are not hosting any delegations,
769            then send a redirect to the requesting node */
770         if ((header.dmaster != ctdb->pnn) 
771             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
772                 talloc_free(data.dptr);
773                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
774
775                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
776                 if (ret != 0) {
777                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
778                 }
779                 return;
780         }
781
782         if ( (!(c->flags & CTDB_WANT_READONLY))
783         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
784                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
785                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
786                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
787                 }
788                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
789
790                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
791                         ctdb_fatal(ctdb, "Failed to start record revoke");
792                 }
793                 talloc_free(data.dptr);
794
795                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
796                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
797                 }
798                 talloc_free(call);
799
800                 return;
801         }               
802
803         /* If this is the first request for delegation. bump rsn and set
804          * the delegations flag
805          */
806         if ((c->flags & CTDB_WANT_READONLY)
807         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
808         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
809                 header.rsn     += 3;
810                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
811                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
812                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
813                 }
814         }
815         if ((c->flags & CTDB_WANT_READONLY) 
816         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
817                 TDB_DATA tdata;
818
819                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
820                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
821                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
822                 }
823                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
824                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
825                 }
826                 free(tdata.dptr);
827
828                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
829                 if (ret != 0) {
830                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
831                 }
832
833                 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
834                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
835                                             struct ctdb_reply_call);
836                 CTDB_NO_MEMORY_FATAL(ctdb, r);
837                 r->hdr.destnode  = c->hdr.srcnode;
838                 r->hdr.reqid     = c->hdr.reqid;
839                 r->status        = 0;
840                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
841                 header.rsn      -= 2;
842                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
843                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
844                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
845
846                 if (data.dsize) {
847                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
848                 }
849
850                 ctdb_queue_packet(ctdb, &r->hdr);
851                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
852                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
853
854                 talloc_free(r);
855                 return;
856         }
857
858         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
859         tmp_count = c->hopcount;
860         bucket = 0;
861         while (tmp_count) {
862                 tmp_count >>= 2;
863                 bucket++;
864         }
865         if (bucket >= MAX_COUNT_BUCKETS) {
866                 bucket = MAX_COUNT_BUCKETS - 1;
867         }
868         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
869         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
870
871
872         /* If this database supports sticky records, then check if the
873            hopcount is big. If it is it means the record is hot and we
874            should make it sticky.
875         */
876         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
877                 DEBUG(DEBUG_ERR, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db->db_name, c->hopcount, ctdb->tunable.sticky_duration));
878                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
879         }
880
881
882         /* if this nodes has done enough consecutive calls on the same record
883            then give them the record
884            or if the node requested an immediate migration
885         */
886         if ( c->hdr.srcnode != ctdb->pnn &&
887              ((header.laccessor == c->hdr.srcnode
888                && header.lacount >= ctdb->tunable.max_lacount
889                && ctdb->tunable.max_lacount != 0)
890               || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
891                 if (ctdb_db->transaction_active) {
892                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
893                               " of key %s while transaction is active\n",
894                               (char *)call->key.dptr));
895                 } else {
896                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
897                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
898                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
899                         talloc_free(data.dptr);
900
901                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
902                         if (ret != 0) {
903                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
904                         }
905                         return;
906                 }
907         }
908
909         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
910         if (ret != 0) {
911                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
912                 call->status = -1;
913         }
914
915         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
916         if (ret != 0) {
917                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
918         }
919
920         len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
921         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
922                                     struct ctdb_reply_call);
923         CTDB_NO_MEMORY_FATAL(ctdb, r);
924         r->hdr.destnode  = hdr->srcnode;
925         r->hdr.reqid     = hdr->reqid;
926         r->status        = call->status;
927         r->datalen       = call->reply_data.dsize;
928         if (call->reply_data.dsize) {
929                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
930         }
931
932         ctdb_queue_packet(ctdb, &r->hdr);
933
934         talloc_free(r);
935 }
936
937 /*
938   called when a CTDB_REPLY_CALL packet comes in
939
940   This packet comes in response to a CTDB_REQ_CALL request packet. It
941   contains any reply data from the call
942 */
943 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
944 {
945         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
946         struct ctdb_call_state *state;
947
948         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
949         if (state == NULL) {
950                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
951                 return;
952         }
953
954         if (hdr->reqid != state->reqid) {
955                 /* we found a record  but it was the wrong one */
956                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
957                 return;
958         }
959
960
961         /* read only delegation processing */
962         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
963          * delegation since we may need to update the record header
964          */
965         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
966                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
967                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
968                 struct ctdb_ltdb_header oldheader;
969                 TDB_DATA key, data, olddata;
970                 int ret;
971
972                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
973                         goto finished_ro;
974                         return;
975                 }
976
977                 key.dsize = state->c->keylen;
978                 key.dptr  = state->c->data;
979                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
980                                      ctdb_call_input_pkt, ctdb, False);
981                 if (ret == -2) {
982                         return;
983                 }
984                 if (ret != 0) {
985                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
986                         return;
987                 }
988
989                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
990                 if (ret != 0) {
991                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
992                         ctdb_ltdb_unlock(ctdb_db, key);
993                         goto finished_ro;
994                 }                       
995
996                 if (header->rsn <= oldheader.rsn) {
997                         ctdb_ltdb_unlock(ctdb_db, key);
998                         goto finished_ro;
999                 }
1000
1001                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1002                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1003                         ctdb_ltdb_unlock(ctdb_db, key);
1004                         goto finished_ro;
1005                 }
1006
1007                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1008                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1009                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1010                 if (ret != 0) {
1011                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1012                         ctdb_ltdb_unlock(ctdb_db, key);
1013                         goto finished_ro;
1014                 }                       
1015
1016                 ctdb_ltdb_unlock(ctdb_db, key);
1017         }
1018 finished_ro:
1019
1020         state->call->reply_data.dptr = c->data;
1021         state->call->reply_data.dsize = c->datalen;
1022         state->call->status = c->status;
1023
1024         talloc_steal(state, c);
1025
1026         state->state = CTDB_CALL_DONE;
1027         if (state->async.fn) {
1028                 state->async.fn(state);
1029         }
1030 }
1031
1032
1033 /*
1034   called when a CTDB_REPLY_DMASTER packet comes in
1035
1036   This packet comes in from the lmaster response to a CTDB_REQ_CALL
1037   request packet. It means that the current dmaster wants to give us
1038   the dmaster role
1039 */
1040 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1041 {
1042         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1043         struct ctdb_db_context *ctdb_db;
1044         TDB_DATA key, data;
1045         uint32_t record_flags = 0;
1046         size_t len;
1047         int ret;
1048
1049         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1050         if (ctdb_db == NULL) {
1051                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1052                 return;
1053         }
1054         
1055         key.dptr = c->data;
1056         key.dsize = c->keylen;
1057         data.dptr = &c->data[key.dsize];
1058         data.dsize = c->datalen;
1059         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1060                 + sizeof(uint32_t);
1061         if (len <= c->hdr.length) {
1062                 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
1063         }
1064
1065         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1066                                      ctdb_call_input_pkt, ctdb, False);
1067         if (ret == -2) {
1068                 return;
1069         }
1070         if (ret != 0) {
1071                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1072                 return;
1073         }
1074
1075         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1076 }
1077
1078
1079 /*
1080   called when a CTDB_REPLY_ERROR packet comes in
1081 */
1082 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1083 {
1084         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1085         struct ctdb_call_state *state;
1086
1087         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1088         if (state == NULL) {
1089                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1090                          ctdb->pnn, hdr->reqid));
1091                 return;
1092         }
1093
1094         if (hdr->reqid != state->reqid) {
1095                 /* we found a record  but it was the wrong one */
1096                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1097                 return;
1098         }
1099
1100         talloc_steal(state, c);
1101
1102         state->state  = CTDB_CALL_ERROR;
1103         state->errmsg = (char *)c->msg;
1104         if (state->async.fn) {
1105                 state->async.fn(state);
1106         }
1107 }
1108
1109
1110 /*
1111   destroy a ctdb_call
1112 */
1113 static int ctdb_call_destructor(struct ctdb_call_state *state)
1114 {
1115         DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1116         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1117         return 0;
1118 }
1119
1120
1121 /*
1122   called when a ctdb_call needs to be resent after a reconfigure event
1123 */
1124 static void ctdb_call_resend(struct ctdb_call_state *state)
1125 {
1126         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1127
1128         state->generation = ctdb->vnn_map->generation;
1129
1130         /* use a new reqid, in case the old reply does eventually come in */
1131         ctdb_reqid_remove(ctdb, state->reqid);
1132         state->reqid = ctdb_reqid_new(ctdb, state);
1133         state->c->hdr.reqid = state->reqid;
1134
1135         /* update the generation count for this request, so its valid with the new vnn_map */
1136         state->c->hdr.generation = state->generation;
1137
1138         /* send the packet to ourselves, it will be redirected appropriately */
1139         state->c->hdr.destnode = ctdb->pnn;
1140
1141         ctdb_queue_packet(ctdb, &state->c->hdr);
1142         DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1143 }
1144
1145 /*
1146   resend all pending calls on recovery
1147  */
1148 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1149 {
1150         struct ctdb_call_state *state, *next;
1151         for (state=ctdb->pending_calls;state;state=next) {
1152                 next = state->next;
1153                 ctdb_call_resend(state);
1154         }
1155 }
1156
1157 /*
1158   this allows the caller to setup a async.fn 
1159 */
1160 static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
1161                        struct timeval t, void *private_data)
1162 {
1163         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1164         if (state->async.fn) {
1165                 state->async.fn(state);
1166         }
1167 }       
1168
1169
1170 /*
1171   construct an event driven local ctdb_call
1172
1173   this is used so that locally processed ctdb_call requests are processed
1174   in an event driven manner
1175 */
1176 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1177                                              struct ctdb_call *call,
1178                                              struct ctdb_ltdb_header *header,
1179                                              TDB_DATA *data)
1180 {
1181         struct ctdb_call_state *state;
1182         struct ctdb_context *ctdb = ctdb_db->ctdb;
1183         int ret;
1184
1185         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1186         CTDB_NO_MEMORY_NULL(ctdb, state);
1187
1188         talloc_steal(state, data->dptr);
1189
1190         state->state = CTDB_CALL_DONE;
1191         state->call  = talloc(state, struct ctdb_call);
1192         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1193         *(state->call) = *call;
1194         state->ctdb_db = ctdb_db;
1195
1196         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
1197         if (ret != 0) {
1198                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1199         }
1200
1201         event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1202
1203         return state;
1204 }
1205
1206
1207 /*
1208   make a remote ctdb call - async send. Called in daemon context.
1209
1210   This constructs a ctdb_call request and queues it for processing. 
1211   This call never blocks.
1212 */
1213 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1214                                                      struct ctdb_call *call, 
1215                                                      struct ctdb_ltdb_header *header)
1216 {
1217         uint32_t len;
1218         struct ctdb_call_state *state;
1219         struct ctdb_context *ctdb = ctdb_db->ctdb;
1220
1221         if (ctdb->methods == NULL) {
1222                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1223                 return NULL;
1224         }
1225
1226         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1227         CTDB_NO_MEMORY_NULL(ctdb, state);
1228         state->call = talloc(state, struct ctdb_call);
1229         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1230
1231         state->reqid = ctdb_reqid_new(ctdb, state);
1232         state->ctdb_db = ctdb_db;
1233         talloc_set_destructor(state, ctdb_call_destructor);
1234
1235         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1236         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1237                                            struct ctdb_req_call);
1238         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1239         state->c->hdr.destnode  = header->dmaster;
1240
1241         /* this limits us to 16k outstanding messages - not unreasonable */
1242         state->c->hdr.reqid     = state->reqid;
1243         state->c->flags         = call->flags;
1244         state->c->db_id         = ctdb_db->db_id;
1245         state->c->callid        = call->call_id;
1246         state->c->hopcount      = 0;
1247         state->c->keylen        = call->key.dsize;
1248         state->c->calldatalen   = call->call_data.dsize;
1249         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1250         memcpy(&state->c->data[call->key.dsize], 
1251                call->call_data.dptr, call->call_data.dsize);
1252         *(state->call)              = *call;
1253         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1254         state->call->key.dptr       = &state->c->data[0];
1255
1256         state->state  = CTDB_CALL_WAIT;
1257         state->generation = ctdb->vnn_map->generation;
1258
1259         DLIST_ADD(ctdb->pending_calls, state);
1260
1261         ctdb_queue_packet(ctdb, &state->c->hdr);
1262
1263         return state;
1264 }
1265
1266 /*
1267   make a remote ctdb call - async recv - called in daemon context
1268
1269   This is called when the program wants to wait for a ctdb_call to complete and get the 
1270   results. This call will block unless the call has already completed.
1271 */
1272 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1273 {
1274         while (state->state < CTDB_CALL_DONE) {
1275                 event_loop_once(state->ctdb_db->ctdb->ev);
1276         }
1277         if (state->state != CTDB_CALL_DONE) {
1278                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1279                 talloc_free(state);
1280                 return -1;
1281         }
1282
1283         if (state->call->reply_data.dsize) {
1284                 call->reply_data.dptr = talloc_memdup(call,
1285                                                       state->call->reply_data.dptr,
1286                                                       state->call->reply_data.dsize);
1287                 call->reply_data.dsize = state->call->reply_data.dsize;
1288         } else {
1289                 call->reply_data.dptr = NULL;
1290                 call->reply_data.dsize = 0;
1291         }
1292         call->status = state->call->status;
1293         talloc_free(state);
1294         return 0;
1295 }
1296
1297
1298 /* 
1299    send a keepalive packet to the other node
1300 */
1301 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1302 {
1303         struct ctdb_req_keepalive *r;
1304         
1305         if (ctdb->methods == NULL) {
1306                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1307                 return;
1308         }
1309
1310         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1311                                     sizeof(struct ctdb_req_keepalive), 
1312                                     struct ctdb_req_keepalive);
1313         CTDB_NO_MEMORY_FATAL(ctdb, r);
1314         r->hdr.destnode  = destnode;
1315         r->hdr.reqid     = 0;
1316         
1317         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1318
1319         ctdb_queue_packet(ctdb, &r->hdr);
1320
1321         talloc_free(r);
1322 }
1323
1324
1325
1326 struct revokechild_deferred_call {
1327         struct ctdb_context *ctdb;
1328         struct ctdb_req_header *hdr;
1329         deferred_requeue_fn fn;
1330         void *ctx;
1331 };
1332
1333 struct revokechild_handle {
1334         struct revokechild_handle *next, *prev;
1335         struct ctdb_context *ctdb;
1336         struct ctdb_db_context *ctdb_db;
1337         struct fd_event *fde;
1338         int status;
1339         int fd[2];
1340         pid_t child;
1341         TDB_DATA key;
1342 };
1343
1344 struct revokechild_requeue_handle {
1345         struct ctdb_context *ctdb;
1346         struct ctdb_req_header *hdr;
1347         deferred_requeue_fn fn;
1348         void *ctx;
1349 };
1350
1351 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
1352                        struct timeval t, void *private_data)
1353 {
1354         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1355
1356         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1357         talloc_free(requeue_handle);
1358 }
1359
1360 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1361 {
1362         struct ctdb_context *ctdb = deferred_call->ctdb;
1363         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1364         struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1365
1366         requeue_handle->ctdb = ctdb;
1367         requeue_handle->hdr  = deferred_call->hdr;
1368         requeue_handle->fn   = deferred_call->fn;
1369         requeue_handle->ctx  = deferred_call->ctx;
1370         talloc_steal(requeue_handle, requeue_handle->hdr);
1371
1372         /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1373         event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1374
1375         return 0;
1376 }
1377
1378
1379 static int revokechild_destructor(struct revokechild_handle *rc)
1380 {
1381         if (rc->fde != NULL) {
1382                 talloc_free(rc->fde);
1383         }
1384
1385         if (rc->fd[0] != -1) {
1386                 close(rc->fd[0]);
1387         }
1388         if (rc->fd[1] != -1) {
1389                 close(rc->fd[1]);
1390         }
1391         kill(rc->child, SIGKILL);
1392
1393         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1394         return 0;
1395 }
1396
1397 static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
1398                              uint16_t flags, void *private_data)
1399 {
1400         struct revokechild_handle *rc = talloc_get_type(private_data, 
1401                                                      struct revokechild_handle);
1402         int ret;
1403         char c;
1404
1405         ret = read(rc->fd[0], &c, 1);
1406         if (ret != 1) {
1407                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1408                 rc->status = -1;
1409                 talloc_free(rc);
1410                 return;
1411         }
1412         if (c != 0) {
1413                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1414                 rc->status = -1;
1415                 talloc_free(rc);
1416                 return;
1417         }
1418
1419         talloc_free(rc);
1420 }
1421
1422 struct ctdb_revoke_state {
1423         struct ctdb_db_context *ctdb_db;
1424         TDB_DATA key;
1425         struct ctdb_ltdb_header *header;
1426         TDB_DATA data;
1427         int count;
1428         int status;
1429         int finished;
1430 };
1431
1432 static void update_record_cb(struct ctdb_client_control_state *state)
1433 {
1434         struct ctdb_revoke_state *revoke_state;
1435         int ret;
1436         int32_t res;
1437
1438         if (state == NULL) {
1439                 return;
1440         }
1441         revoke_state = state->async.private_data;
1442
1443         state->async.fn = NULL;
1444         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1445         if ((ret != 0) || (res != 0)) {
1446                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1447                 revoke_state->status = -1;
1448         }
1449
1450         revoke_state->count--;
1451         if (revoke_state->count <= 0) {
1452                 revoke_state->finished = 1;
1453         }
1454 }
1455
1456 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1457 {
1458         struct ctdb_revoke_state *revoke_state = private_data;
1459         struct ctdb_client_control_state *state;
1460
1461         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1462         if (state == NULL) {
1463                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1464                 revoke_state->status = -1;
1465                 return;
1466         }
1467         state->async.fn           = update_record_cb;
1468         state->async.private_data = revoke_state;
1469
1470         revoke_state->count++;
1471
1472 }
1473
1474 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
1475                               struct timeval yt, void *private_data)
1476 {
1477         struct ctdb_revoke_state *state = private_data;
1478
1479         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1480         state->finished = 1;
1481         state->status   = -1;
1482 }
1483
1484 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1485 {
1486         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1487         int status;
1488
1489         state->ctdb_db = ctdb_db;
1490         state->key     = key;
1491         state->header  = header;
1492         state->data    = data;
1493  
1494         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1495
1496         event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
1497
1498         while (state->finished == 0) {
1499                 event_loop_once(ctdb->ev);
1500         }
1501
1502         status = state->status;
1503
1504         if (status == 0) {
1505                 struct ctdb_ltdb_header new_header;
1506                 TDB_DATA new_data;
1507
1508                 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1509                         DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1510                         talloc_free(state);
1511                         return -1;
1512                 }
1513                 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1514                         ctdb_ltdb_unlock(ctdb_db, key);
1515                         DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1516                         talloc_free(state);
1517                         return -1;
1518                 }
1519                 header->rsn++;
1520                 if (new_header.rsn > header->rsn) {
1521                         ctdb_ltdb_unlock(ctdb_db, key);
1522                         DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1523                         talloc_free(state);
1524                         return -1;
1525                 }
1526                 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1527                         ctdb_ltdb_unlock(ctdb_db, key);
1528                         DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1529                         talloc_free(state);
1530                         return -1;
1531                 }
1532                 new_header.rsn++;
1533                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1534                 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1535                         ctdb_ltdb_unlock(ctdb_db, key);
1536                         DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1537                         talloc_free(state);
1538                         return -1;
1539                 }
1540                 ctdb_ltdb_unlock(ctdb_db, key);
1541         }
1542
1543         talloc_free(state);
1544         return status;
1545 }
1546
1547
1548 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1549 {
1550         TDB_DATA tdata;
1551         struct revokechild_handle *rc;
1552         pid_t parent = getpid();
1553         int ret;
1554
1555         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1556         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1557         header->rsn   -= 1;
1558
1559         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1560                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1561                 return -1;
1562         }
1563
1564         tdata = tdb_fetch(ctdb_db->rottdb, key);
1565         if (tdata.dsize > 0) {
1566                 uint8_t *tmp;
1567
1568                 tmp = tdata.dptr;
1569                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1570                 free(tmp);
1571         }
1572
1573         rc->status    = 0;
1574         rc->ctdb      = ctdb;
1575         rc->ctdb_db   = ctdb_db;
1576         rc->fd[0]     = -1;
1577         rc->fd[1]     = -1;
1578
1579         talloc_set_destructor(rc, revokechild_destructor);
1580
1581         rc->key.dsize = key.dsize;
1582         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1583         if (rc->key.dptr == NULL) {
1584                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1585                 talloc_free(rc);
1586                 return -1;
1587         }
1588
1589         ret = pipe(rc->fd);
1590         if (ret != 0) {
1591                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1592                 talloc_free(rc);
1593                 return -1;
1594         }
1595
1596
1597         rc->child = ctdb_fork(ctdb);
1598         if (rc->child == (pid_t)-1) {
1599                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1600                 talloc_free(rc);
1601                 return -1;
1602         }
1603
1604         if (rc->child == 0) {
1605                 char c = 0;
1606                 close(rc->fd[0]);
1607                 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1608
1609                 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1610                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1611                         c = 1;
1612                         goto child_finished;
1613                 }
1614
1615                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1616
1617 child_finished:
1618                 write(rc->fd[1], &c, 1);
1619                 /* make sure we die when our parent dies */
1620                 while (kill(parent, 0) == 0 || errno != ESRCH) {
1621                         sleep(5);
1622                 }
1623                 _exit(0);
1624         }
1625
1626         close(rc->fd[1]);
1627         rc->fd[1] = -1;
1628         set_close_on_exec(rc->fd[0]);
1629
1630         /* This is an active revokechild child process */
1631         DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1632
1633         rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1634                                    EVENT_FD_READ, revokechild_handler,
1635                                    (void *)rc);
1636         if (rc->fde == NULL) {
1637                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1638                 talloc_free(rc);
1639         }
1640         tevent_fd_set_auto_close(rc->fde);
1641
1642         return 0;
1643 }
1644
1645 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1646 {
1647         struct revokechild_handle *rc;
1648         struct revokechild_deferred_call *deferred_call;
1649
1650         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1651                 if (rc->key.dsize == 0) {
1652                         continue;
1653                 }
1654                 if (rc->key.dsize != key.dsize) {
1655                         continue;
1656                 }
1657                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1658                         break;
1659                 }
1660         }
1661
1662         if (rc == NULL) {
1663                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1664                 return -1;
1665         }
1666
1667         deferred_call = talloc(rc, struct revokechild_deferred_call);
1668         if (deferred_call == NULL) {
1669                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1670                 return -1;
1671         }
1672
1673         deferred_call->ctdb = ctdb;
1674         deferred_call->hdr  = hdr;
1675         deferred_call->fn   = fn;
1676         deferred_call->ctx  = call_context;
1677
1678         talloc_set_destructor(deferred_call, deferred_call_destructor);
1679         talloc_steal(deferred_call, hdr);
1680
1681         return 0;
1682 }