ctdb-daemon: Fix some strict-aliasing warnings
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30
31 struct ctdb_sticky_record {
32         struct ctdb_context *ctdb;
33         struct ctdb_db_context *ctdb_db;
34         TDB_CONTEXT *pindown;
35 };
36
37 /*
38   find the ctdb_db from a db index
39  */
40  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
41 {
42         struct ctdb_db_context *ctdb_db;
43
44         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45                 if (ctdb_db->db_id == id) {
46                         break;
47                 }
48         }
49         return ctdb_db;
50 }
51
52 /*
53   a varient of input packet that can be used in lock requeue
54 */
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
56 {
57         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58         ctdb_input_pkt(ctdb, hdr);
59 }
60
61
62 /*
63   send an error reply
64 */
65 static void ctdb_send_error(struct ctdb_context *ctdb, 
66                             struct ctdb_req_header *hdr, uint32_t status,
67                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb, 
69                             struct ctdb_req_header *hdr, uint32_t status,
70                             const char *fmt, ...)
71 {
72         va_list ap;
73         struct ctdb_reply_error *r;
74         char *msg;
75         int msglen, len;
76
77         if (ctdb->methods == NULL) {
78                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79                 return;
80         }
81
82         va_start(ap, fmt);
83         msg = talloc_vasprintf(ctdb, fmt, ap);
84         if (msg == NULL) {
85                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
86         }
87         va_end(ap);
88
89         msglen = strlen(msg)+1;
90         len = offsetof(struct ctdb_reply_error, msg);
91         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
92                                     struct ctdb_reply_error);
93         CTDB_NO_MEMORY_FATAL(ctdb, r);
94
95         r->hdr.destnode  = hdr->srcnode;
96         r->hdr.reqid     = hdr->reqid;
97         r->status        = status;
98         r->msglen        = msglen;
99         memcpy(&r->msg[0], msg, msglen);
100
101         ctdb_queue_packet(ctdb, &r->hdr);
102
103         talloc_free(msg);
104 }
105
106
107 /**
108  * send a redirect reply
109  *
110  * The logic behind this function is this:
111  *
112  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113  * to its local ctdb (ctdb_request_call). If the node is not itself
114  * the record's DMASTER, it first redirects the packet to  the
115  * record's LMASTER. The LMASTER then redirects the call packet to
116  * the current DMASTER. Note that this works because of this: When
117  * a record is migrated off a node, then the new DMASTER is stored
118  * in the record's copy on the former DMASTER.
119  */
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121                                     struct ctdb_db_context *ctdb_db,
122                                     TDB_DATA key,
123                                     struct ctdb_req_call *c, 
124                                     struct ctdb_ltdb_header *header)
125 {
126         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
127
128         c->hdr.destnode = lmaster;
129         if (ctdb->pnn == lmaster) {
130                 c->hdr.destnode = header->dmaster;
131         }
132         c->hopcount++;
133
134         if (c->hopcount%100 > 95) {
135                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
136                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137                         "header->dmaster:%d dst:%d\n",
138                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
139                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
140                         header->dmaster, c->hdr.destnode));
141         }
142
143         ctdb_queue_packet(ctdb, &c->hdr);
144 }
145
146
147 /*
148   send a dmaster reply
149
150   caller must have the chainlock before calling this routine. Caller must be
151   the lmaster
152 */
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154                                     struct ctdb_ltdb_header *header,
155                                     TDB_DATA key, TDB_DATA data,
156                                     uint32_t new_dmaster,
157                                     uint32_t reqid)
158 {
159         struct ctdb_context *ctdb = ctdb_db->ctdb;
160         struct ctdb_reply_dmaster *r;
161         int ret, len;
162         TALLOC_CTX *tmp_ctx;
163
164         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
166                 return;
167         }
168
169         header->dmaster = new_dmaster;
170         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
171         if (ret != 0) {
172                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
173                 return;
174         }
175
176         if (ctdb->methods == NULL) {
177                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
178                 return;
179         }
180
181         /* put the packet on a temporary context, allowing us to safely free
182            it below even if ctdb_reply_dmaster() has freed it already */
183         tmp_ctx = talloc_new(ctdb);
184
185         /* send the CTDB_REPLY_DMASTER */
186         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188                                     struct ctdb_reply_dmaster);
189         CTDB_NO_MEMORY_FATAL(ctdb, r);
190
191         r->hdr.destnode  = new_dmaster;
192         r->hdr.reqid     = reqid;
193         r->rsn           = header->rsn;
194         r->keylen        = key.dsize;
195         r->datalen       = data.dsize;
196         r->db_id         = ctdb_db->db_id;
197         memcpy(&r->data[0], key.dptr, key.dsize);
198         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
199         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
200
201         ctdb_queue_packet(ctdb, &r->hdr);
202
203         talloc_free(tmp_ctx);
204 }
205
206 /*
207   send a dmaster request (give another node the dmaster for a record)
208
209   This is always sent to the lmaster, which ensures that the lmaster
210   always knows who the dmaster is. The lmaster will then send a
211   CTDB_REPLY_DMASTER to the new dmaster
212 */
213 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
214                                    struct ctdb_req_call *c, 
215                                    struct ctdb_ltdb_header *header,
216                                    TDB_DATA *key, TDB_DATA *data)
217 {
218         struct ctdb_req_dmaster *r;
219         struct ctdb_context *ctdb = ctdb_db->ctdb;
220         int len;
221         uint32_t lmaster = ctdb_lmaster(ctdb, key);
222
223         if (ctdb->methods == NULL) {
224                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
225                 return;
226         }
227
228         if (data->dsize != 0) {
229                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
230         }
231
232         if (lmaster == ctdb->pnn) {
233                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
234                                         c->hdr.srcnode, c->hdr.reqid);
235                 return;
236         }
237         
238         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
239                         + sizeof(uint32_t);
240         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
241                                     struct ctdb_req_dmaster);
242         CTDB_NO_MEMORY_FATAL(ctdb, r);
243         r->hdr.destnode  = lmaster;
244         r->hdr.reqid     = c->hdr.reqid;
245         r->db_id         = c->db_id;
246         r->rsn           = header->rsn;
247         r->dmaster       = c->hdr.srcnode;
248         r->keylen        = key->dsize;
249         r->datalen       = data->dsize;
250         memcpy(&r->data[0], key->dptr, key->dsize);
251         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
252         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
253
254         header->dmaster = c->hdr.srcnode;
255         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
256                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
257         }
258         
259         ctdb_queue_packet(ctdb, &r->hdr);
260
261         talloc_free(r);
262 }
263
264 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
265                                        struct timeval t, void *private_data)
266 {
267         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
268                                                        struct ctdb_sticky_record);
269
270         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
271         if (sr->pindown != NULL) {
272                 talloc_free(sr->pindown);
273                 sr->pindown = NULL;
274         }
275 }
276
277 static int
278 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
279 {
280         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
281         uint32_t *k;
282         struct ctdb_sticky_record *sr;
283
284         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
285         if (k == NULL) {
286                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
287                 talloc_free(tmp_ctx);
288                 return -1;
289         }
290
291         k[0] = (key.dsize + 3) / 4 + 1;
292         memcpy(&k[1], key.dptr, key.dsize);
293
294         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
295         if (sr == NULL) {
296                 talloc_free(tmp_ctx);
297                 return 0;
298         }
299
300         talloc_free(tmp_ctx);
301
302         if (sr->pindown == NULL) {
303                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
304                 sr->pindown = talloc_new(sr);
305                 if (sr->pindown == NULL) {
306                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
307                         return -1;
308                 }
309                 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
310         }
311
312         return 0;
313 }
314
315 /*
316   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
318
319   must be called with the chainlock held. This function releases the chainlock
320 */
321 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
322                                 struct ctdb_req_header *hdr,
323                                 TDB_DATA key, TDB_DATA data,
324                                 uint64_t rsn, uint32_t record_flags)
325 {
326         struct ctdb_call_state *state;
327         struct ctdb_context *ctdb = ctdb_db->ctdb;
328         struct ctdb_ltdb_header header;
329         int ret;
330
331         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
332
333         ZERO_STRUCT(header);
334         header.rsn = rsn;
335         header.dmaster = ctdb->pnn;
336         header.flags = record_flags;
337
338         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
339
340         if (state) {
341                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
342                         /*
343                          * We temporarily add the VACUUM_MIGRATED flag to
344                          * the record flags, so that ctdb_ltdb_store can
345                          * decide whether the record should be stored or
346                          * deleted.
347                          */
348                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
349                 }
350         }
351
352         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
353                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
354
355                 ret = ctdb_ltdb_unlock(ctdb_db, key);
356                 if (ret != 0) {
357                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
358                 }
359                 return;
360         }
361
362         /* we just became DMASTER and this database is "sticky",
363            see if the record is flagged as "hot" and set up a pin-down
364            context to stop migrations for a little while if so
365         */
366         if (ctdb_db->sticky) {
367                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
368         }
369
370         if (state == NULL) {
371                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372                          ctdb->pnn, hdr->reqid, hdr->srcnode));
373
374                 ret = ctdb_ltdb_unlock(ctdb_db, key);
375                 if (ret != 0) {
376                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
377                 }
378                 return;
379         }
380
381         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
382                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
383
384                 ret = ctdb_ltdb_unlock(ctdb_db, key);
385                 if (ret != 0) {
386                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
387                 }
388                 return;
389         }
390
391         if (hdr->reqid != state->reqid) {
392                 /* we found a record  but it was the wrong one */
393                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
394
395                 ret = ctdb_ltdb_unlock(ctdb_db, key);
396                 if (ret != 0) {
397                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
398                 }
399                 return;
400         }
401
402         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
403
404         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
405         if (ret != 0) {
406                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
407         }
408
409         state->state = CTDB_CALL_DONE;
410         if (state->async.fn) {
411                 state->async.fn(state);
412         }
413 }
414
415
416
417 /*
418   called when a CTDB_REQ_DMASTER packet comes in
419
420   this comes into the lmaster for a record when the current dmaster
421   wants to give up the dmaster role and give it to someone else
422 */
423 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
424 {
425         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
426         TDB_DATA key, data, data2;
427         struct ctdb_ltdb_header header;
428         struct ctdb_db_context *ctdb_db;
429         uint32_t record_flags = 0;
430         size_t len;
431         int ret;
432
433         key.dptr = c->data;
434         key.dsize = c->keylen;
435         data.dptr = c->data + c->keylen;
436         data.dsize = c->datalen;
437         len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
438                         + sizeof(uint32_t);
439         if (len <= c->hdr.length) {
440                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
441                        sizeof(record_flags));
442         }
443
444         ctdb_db = find_ctdb_db(ctdb, c->db_id);
445         if (!ctdb_db) {
446                 ctdb_send_error(ctdb, hdr, -1,
447                                 "Unknown database in request. db_id==0x%08x",
448                                 c->db_id);
449                 return;
450         }
451         
452         /* fetch the current record */
453         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
454                                            ctdb_call_input_pkt, ctdb, false);
455         if (ret == -1) {
456                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
457                 return;
458         }
459         if (ret == -2) {
460                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
461                 return;
462         }
463
464         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
465                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
466                          ctdb->pnn, ctdb_lmaster(ctdb, &key), 
467                          hdr->generation, ctdb->vnn_map->generation));
468                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
469         }
470
471         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
472                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
473
474         /* its a protocol error if the sending node is not the current dmaster */
475         if (header.dmaster != hdr->srcnode) {
476                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
477                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
478                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
479                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
480                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
481                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
482                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
483
484                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
485                         ctdb_ltdb_unlock(ctdb_db, key);
486                         return;
487                 }
488         }
489
490         if (header.rsn > c->rsn) {
491                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
492                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
493                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
494                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
495         }
496
497         /* use the rsn from the sending node */
498         header.rsn = c->rsn;
499
500         /* store the record flags from the sending node */
501         header.flags = record_flags;
502
503         /* check if the new dmaster is the lmaster, in which case we
504            skip the dmaster reply */
505         if (c->dmaster == ctdb->pnn) {
506                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
507         } else {
508                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
509
510                 ret = ctdb_ltdb_unlock(ctdb_db, key);
511                 if (ret != 0) {
512                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
513                 }
514         }
515 }
516
517 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
518                                        struct timeval t, void *private_data)
519 {
520         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
521                                                        struct ctdb_sticky_record);
522         talloc_free(sr);
523 }
524
525 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
526 {
527         if (data) {
528                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
529                 talloc_free(data);
530         }
531         return parm;
532 }
533
534 static int
535 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
536 {
537         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
538         uint32_t *k;
539         struct ctdb_sticky_record *sr;
540
541         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
542         if (k == NULL) {
543                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
544                 talloc_free(tmp_ctx);
545                 return -1;
546         }
547
548         k[0] = (key.dsize + 3) / 4 + 1;
549         memcpy(&k[1], key.dptr, key.dsize);
550
551         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
552         if (sr != NULL) {
553                 talloc_free(tmp_ctx);
554                 return 0;
555         }
556
557         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
558         if (sr == NULL) {
559                 talloc_free(tmp_ctx);
560                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
561                 return -1;
562         }
563
564         sr->ctdb    = ctdb;
565         sr->ctdb_db = ctdb_db;
566         sr->pindown = NULL;
567
568         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
569                          ctdb->tunable.sticky_duration,
570                          ctdb_db->db_name, ctdb_hash(&key)));
571
572         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
573
574         event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
575
576         talloc_free(tmp_ctx);
577         return 0;
578 }
579
580 struct pinned_down_requeue_handle {
581         struct ctdb_context *ctdb;
582         struct ctdb_req_header *hdr;
583 };
584
585 struct pinned_down_deferred_call {
586         struct ctdb_context *ctdb;
587         struct ctdb_req_header *hdr;
588 };
589
590 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
591                        struct timeval t, void *private_data)
592 {
593         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
594         struct ctdb_context *ctdb = handle->ctdb;
595
596         talloc_steal(ctdb, handle->hdr);
597         ctdb_call_input_pkt(ctdb, handle->hdr);
598
599         talloc_free(handle);
600 }
601
602 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
603 {
604         struct ctdb_context *ctdb = pinned_down->ctdb;
605         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
606
607         handle->ctdb = pinned_down->ctdb;
608         handle->hdr  = pinned_down->hdr;
609         talloc_steal(handle, handle->hdr);
610
611         event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
612
613         return 0;
614 }
615
616 static int
617 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
618 {
619         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
620         uint32_t *k;
621         struct ctdb_sticky_record *sr;
622         struct pinned_down_deferred_call *pinned_down;
623
624         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
625         if (k == NULL) {
626                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
627                 talloc_free(tmp_ctx);
628                 return -1;
629         }
630
631         k[0] = (key.dsize + 3) / 4 + 1;
632         memcpy(&k[1], key.dptr, key.dsize);
633
634         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
635         if (sr == NULL) {
636                 talloc_free(tmp_ctx);
637                 return -1;
638         }
639
640         talloc_free(tmp_ctx);
641
642         if (sr->pindown == NULL) {
643                 return -1;
644         }
645         
646         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
647         if (pinned_down == NULL) {
648                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
649                 return -1;
650         }
651
652         pinned_down->ctdb = ctdb;
653         pinned_down->hdr  = hdr;
654
655         talloc_set_destructor(pinned_down, pinned_down_destructor);
656         talloc_steal(pinned_down, hdr);
657
658         return 0;
659 }
660
661 static void
662 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
663 {
664         int i, id;
665
666         /* smallest value is always at index 0 */
667         if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
668                 return;
669         }
670
671         /* see if we already know this key */
672         for (i = 0; i < MAX_HOT_KEYS; i++) {
673                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
674                         continue;
675                 }
676                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
677                         continue;
678                 }
679                 /* found an entry for this key */
680                 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
681                         return;
682                 }
683                 ctdb_db->statistics.hot_keys[i].count = hopcount;
684                 goto sort_keys;
685         }
686
687         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
688                 id = ctdb_db->statistics.num_hot_keys;
689                 ctdb_db->statistics.num_hot_keys++;
690         } else {
691                 id = 0;
692         }
693
694         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
695                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
696         }
697         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
698         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
699         ctdb_db->statistics.hot_keys[id].count = hopcount;
700         DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
701                             ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
702
703 sort_keys:
704         for (i = 1; i < MAX_HOT_KEYS; i++) {
705                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
706                         continue;
707                 }
708                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
709                         hopcount = ctdb_db->statistics.hot_keys[i].count;
710                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
711                         ctdb_db->statistics.hot_keys[0].count = hopcount;
712
713                         key = ctdb_db->statistics.hot_keys[i].key;
714                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
715                         ctdb_db->statistics.hot_keys[0].key = key;
716                 }
717         }
718 }
719
720 /*
721   called when a CTDB_REQ_CALL packet comes in
722 */
723 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
724 {
725         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
726         TDB_DATA data;
727         struct ctdb_reply_call *r;
728         int ret, len;
729         struct ctdb_ltdb_header header;
730         struct ctdb_call *call;
731         struct ctdb_db_context *ctdb_db;
732         int tmp_count, bucket;
733
734         if (ctdb->methods == NULL) {
735                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
736                 return;
737         }
738
739
740         ctdb_db = find_ctdb_db(ctdb, c->db_id);
741         if (!ctdb_db) {
742                 ctdb_send_error(ctdb, hdr, -1,
743                                 "Unknown database in request. db_id==0x%08x",
744                                 c->db_id);
745                 return;
746         }
747
748         call = talloc(hdr, struct ctdb_call);
749         CTDB_NO_MEMORY_FATAL(ctdb, call);
750
751         call->call_id  = c->callid;
752         call->key.dptr = c->data;
753         call->key.dsize = c->keylen;
754         call->call_data.dptr = c->data + c->keylen;
755         call->call_data.dsize = c->calldatalen;
756         call->reply_data.dptr  = NULL;
757         call->reply_data.dsize = 0;
758
759
760         /* If this record is pinned down we should defer the
761            request until the pindown times out
762         */
763         if (ctdb_db->sticky) {
764                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
765                         DEBUG(DEBUG_WARNING,
766                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
767                         talloc_free(call);
768                         return;
769                 }
770         }
771
772
773         /* determine if we are the dmaster for this key. This also
774            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
775            if the call will be answered locally */
776
777         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
778                                            ctdb_call_input_pkt, ctdb, false);
779         if (ret == -1) {
780                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
781                 talloc_free(call);
782                 return;
783         }
784         if (ret == -2) {
785                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
786                 talloc_free(call);
787                 return;
788         }
789
790         /* Dont do READONLY if we dont have a tracking database */
791         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
792                 c->flags &= ~CTDB_WANT_READONLY;
793         }
794
795         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
796                 header.flags &= ~CTDB_REC_RO_FLAGS;
797                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
798                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
799                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
800                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
801                 }
802                 /* and clear out the tracking data */
803                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
804                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
805                 }
806         }
807
808         /* if we are revoking, we must defer all other calls until the revoke
809          * had completed.
810          */
811         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
812                 talloc_free(data.dptr);
813                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
814
815                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
816                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
817                 }
818                 talloc_free(call);
819                 return;
820         }
821
822         /*
823          * If we are not the dmaster and are not hosting any delegations,
824          * then we redirect the request to the node than can answer it
825          * (the lmaster or the dmaster).
826          */
827         if ((header.dmaster != ctdb->pnn) 
828             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
829                 talloc_free(data.dptr);
830                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
831
832                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
833                 if (ret != 0) {
834                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
835                 }
836                 talloc_free(call);
837                 return;
838         }
839
840         if ( (!(c->flags & CTDB_WANT_READONLY))
841         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
842                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
843                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
844                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
845                 }
846                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
847
848                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
849                         ctdb_fatal(ctdb, "Failed to start record revoke");
850                 }
851                 talloc_free(data.dptr);
852
853                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
854                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
855                 }
856                 talloc_free(call);
857
858                 return;
859         }               
860
861         /* If this is the first request for delegation. bump rsn and set
862          * the delegations flag
863          */
864         if ((c->flags & CTDB_WANT_READONLY)
865         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
866         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
867                 header.rsn     += 3;
868                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
869                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
870                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
871                 }
872         }
873         if ((c->flags & CTDB_WANT_READONLY) 
874         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
875                 TDB_DATA tdata;
876
877                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
878                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
879                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
880                 }
881                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
882                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
883                 }
884                 free(tdata.dptr);
885
886                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
887                 if (ret != 0) {
888                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
889                 }
890
891                 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
892                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
893                                             struct ctdb_reply_call);
894                 CTDB_NO_MEMORY_FATAL(ctdb, r);
895                 r->hdr.destnode  = c->hdr.srcnode;
896                 r->hdr.reqid     = c->hdr.reqid;
897                 r->status        = 0;
898                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
899                 header.rsn      -= 2;
900                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
901                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
902                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
903
904                 if (data.dsize) {
905                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
906                 }
907
908                 ctdb_queue_packet(ctdb, &r->hdr);
909                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
910                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
911
912                 talloc_free(r);
913                 talloc_free(call);
914                 return;
915         }
916
917         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
918         tmp_count = c->hopcount;
919         bucket = 0;
920         while (tmp_count) {
921                 tmp_count >>= 2;
922                 bucket++;
923         }
924         if (bucket >= MAX_COUNT_BUCKETS) {
925                 bucket = MAX_COUNT_BUCKETS - 1;
926         }
927         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
928         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
929         ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
930
931         /* If this database supports sticky records, then check if the
932            hopcount is big. If it is it means the record is hot and we
933            should make it sticky.
934         */
935         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
936                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
937         }
938
939
940         /* Try if possible to migrate the record off to the caller node.
941          * From the clients perspective a fetch of the data is just as 
942          * expensive as a migration.
943          */
944         if (c->hdr.srcnode != ctdb->pnn) {
945                 if (ctdb_db->persistent_state) {
946                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
947                               " of key %s while transaction is active\n",
948                               (char *)call->key.dptr));
949                 } else {
950                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
951                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
952                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
953                         talloc_free(data.dptr);
954
955                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
956                         if (ret != 0) {
957                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
958                         }
959                 }
960                 talloc_free(call);
961                 return;
962         }
963
964         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
965         if (ret != 0) {
966                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
967                 call->status = -1;
968         }
969
970         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
971         if (ret != 0) {
972                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
973         }
974
975         len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
976         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
977                                     struct ctdb_reply_call);
978         CTDB_NO_MEMORY_FATAL(ctdb, r);
979         r->hdr.destnode  = hdr->srcnode;
980         r->hdr.reqid     = hdr->reqid;
981         r->status        = call->status;
982         r->datalen       = call->reply_data.dsize;
983         if (call->reply_data.dsize) {
984                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
985         }
986
987         ctdb_queue_packet(ctdb, &r->hdr);
988
989         talloc_free(r);
990         talloc_free(call);
991 }
992
993 /**
994  * called when a CTDB_REPLY_CALL packet comes in
995  *
996  * This packet comes in response to a CTDB_REQ_CALL request packet. It
997  * contains any reply data from the call
998  */
999 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1000 {
1001         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1002         struct ctdb_call_state *state;
1003
1004         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1005         if (state == NULL) {
1006                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1007                 return;
1008         }
1009
1010         if (hdr->reqid != state->reqid) {
1011                 /* we found a record  but it was the wrong one */
1012                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1013                 return;
1014         }
1015
1016
1017         /* read only delegation processing */
1018         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1019          * delegation since we may need to update the record header
1020          */
1021         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1022                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1023                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1024                 struct ctdb_ltdb_header oldheader;
1025                 TDB_DATA key, data, olddata;
1026                 int ret;
1027
1028                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1029                         goto finished_ro;
1030                         return;
1031                 }
1032
1033                 key.dsize = state->c->keylen;
1034                 key.dptr  = state->c->data;
1035                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1036                                      ctdb_call_input_pkt, ctdb, false);
1037                 if (ret == -2) {
1038                         return;
1039                 }
1040                 if (ret != 0) {
1041                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1042                         return;
1043                 }
1044
1045                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1046                 if (ret != 0) {
1047                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1048                         ctdb_ltdb_unlock(ctdb_db, key);
1049                         goto finished_ro;
1050                 }                       
1051
1052                 if (header->rsn <= oldheader.rsn) {
1053                         ctdb_ltdb_unlock(ctdb_db, key);
1054                         goto finished_ro;
1055                 }
1056
1057                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1058                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1059                         ctdb_ltdb_unlock(ctdb_db, key);
1060                         goto finished_ro;
1061                 }
1062
1063                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1064                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1065                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1066                 if (ret != 0) {
1067                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1068                         ctdb_ltdb_unlock(ctdb_db, key);
1069                         goto finished_ro;
1070                 }                       
1071
1072                 ctdb_ltdb_unlock(ctdb_db, key);
1073         }
1074 finished_ro:
1075
1076         state->call->reply_data.dptr = c->data;
1077         state->call->reply_data.dsize = c->datalen;
1078         state->call->status = c->status;
1079
1080         talloc_steal(state, c);
1081
1082         state->state = CTDB_CALL_DONE;
1083         if (state->async.fn) {
1084                 state->async.fn(state);
1085         }
1086 }
1087
1088
1089 /**
1090  * called when a CTDB_REPLY_DMASTER packet comes in
1091  *
1092  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1093  * request packet. It means that the current dmaster wants to give us
1094  * the dmaster role.
1095  */
1096 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1097 {
1098         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1099         struct ctdb_db_context *ctdb_db;
1100         TDB_DATA key, data;
1101         uint32_t record_flags = 0;
1102         size_t len;
1103         int ret;
1104
1105         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1106         if (ctdb_db == NULL) {
1107                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1108                 return;
1109         }
1110         
1111         key.dptr = c->data;
1112         key.dsize = c->keylen;
1113         data.dptr = &c->data[key.dsize];
1114         data.dsize = c->datalen;
1115         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1116                 + sizeof(uint32_t);
1117         if (len <= c->hdr.length) {
1118                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1119                        sizeof(record_flags));
1120         }
1121
1122         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1123                                      ctdb_call_input_pkt, ctdb, false);
1124         if (ret == -2) {
1125                 return;
1126         }
1127         if (ret != 0) {
1128                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1129                 return;
1130         }
1131
1132         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1133 }
1134
1135
1136 /*
1137   called when a CTDB_REPLY_ERROR packet comes in
1138 */
1139 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1140 {
1141         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1142         struct ctdb_call_state *state;
1143
1144         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1145         if (state == NULL) {
1146                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1147                          ctdb->pnn, hdr->reqid));
1148                 return;
1149         }
1150
1151         if (hdr->reqid != state->reqid) {
1152                 /* we found a record  but it was the wrong one */
1153                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1154                 return;
1155         }
1156
1157         talloc_steal(state, c);
1158
1159         state->state  = CTDB_CALL_ERROR;
1160         state->errmsg = (char *)c->msg;
1161         if (state->async.fn) {
1162                 state->async.fn(state);
1163         }
1164 }
1165
1166
1167 /*
1168   destroy a ctdb_call
1169 */
1170 static int ctdb_call_destructor(struct ctdb_call_state *state)
1171 {
1172         DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1173         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1174         return 0;
1175 }
1176
1177
1178 /*
1179   called when a ctdb_call needs to be resent after a reconfigure event
1180 */
1181 static void ctdb_call_resend(struct ctdb_call_state *state)
1182 {
1183         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1184
1185         state->generation = ctdb->vnn_map->generation;
1186
1187         /* use a new reqid, in case the old reply does eventually come in */
1188         ctdb_reqid_remove(ctdb, state->reqid);
1189         state->reqid = ctdb_reqid_new(ctdb, state);
1190         state->c->hdr.reqid = state->reqid;
1191
1192         /* update the generation count for this request, so its valid with the new vnn_map */
1193         state->c->hdr.generation = state->generation;
1194
1195         /* send the packet to ourselves, it will be redirected appropriately */
1196         state->c->hdr.destnode = ctdb->pnn;
1197
1198         ctdb_queue_packet(ctdb, &state->c->hdr);
1199         DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1200 }
1201
1202 /*
1203   resend all pending calls on recovery
1204  */
1205 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1206 {
1207         struct ctdb_call_state *state, *next;
1208         for (state=ctdb->pending_calls;state;state=next) {
1209                 next = state->next;
1210                 ctdb_call_resend(state);
1211         }
1212 }
1213
1214 /*
1215   this allows the caller to setup a async.fn 
1216 */
1217 static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
1218                        struct timeval t, void *private_data)
1219 {
1220         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1221         if (state->async.fn) {
1222                 state->async.fn(state);
1223         }
1224 }       
1225
1226
1227 /*
1228   construct an event driven local ctdb_call
1229
1230   this is used so that locally processed ctdb_call requests are processed
1231   in an event driven manner
1232 */
1233 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1234                                              struct ctdb_call *call,
1235                                              struct ctdb_ltdb_header *header,
1236                                              TDB_DATA *data)
1237 {
1238         struct ctdb_call_state *state;
1239         struct ctdb_context *ctdb = ctdb_db->ctdb;
1240         int ret;
1241
1242         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1243         CTDB_NO_MEMORY_NULL(ctdb, state);
1244
1245         talloc_steal(state, data->dptr);
1246
1247         state->state = CTDB_CALL_DONE;
1248         state->call  = talloc(state, struct ctdb_call);
1249         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1250         *(state->call) = *call;
1251         state->ctdb_db = ctdb_db;
1252
1253         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1254         if (ret != 0) {
1255                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1256         }
1257
1258         event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1259
1260         return state;
1261 }
1262
1263
1264 /*
1265   make a remote ctdb call - async send. Called in daemon context.
1266
1267   This constructs a ctdb_call request and queues it for processing. 
1268   This call never blocks.
1269 */
1270 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1271                                                      struct ctdb_call *call, 
1272                                                      struct ctdb_ltdb_header *header)
1273 {
1274         uint32_t len;
1275         struct ctdb_call_state *state;
1276         struct ctdb_context *ctdb = ctdb_db->ctdb;
1277
1278         if (ctdb->methods == NULL) {
1279                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1280                 return NULL;
1281         }
1282
1283         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1284         CTDB_NO_MEMORY_NULL(ctdb, state);
1285         state->call = talloc(state, struct ctdb_call);
1286         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1287
1288         state->reqid = ctdb_reqid_new(ctdb, state);
1289         state->ctdb_db = ctdb_db;
1290         talloc_set_destructor(state, ctdb_call_destructor);
1291
1292         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1293         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1294                                            struct ctdb_req_call);
1295         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1296         state->c->hdr.destnode  = header->dmaster;
1297
1298         /* this limits us to 16k outstanding messages - not unreasonable */
1299         state->c->hdr.reqid     = state->reqid;
1300         state->c->flags         = call->flags;
1301         state->c->db_id         = ctdb_db->db_id;
1302         state->c->callid        = call->call_id;
1303         state->c->hopcount      = 0;
1304         state->c->keylen        = call->key.dsize;
1305         state->c->calldatalen   = call->call_data.dsize;
1306         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1307         memcpy(&state->c->data[call->key.dsize], 
1308                call->call_data.dptr, call->call_data.dsize);
1309         *(state->call)              = *call;
1310         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1311         state->call->key.dptr       = &state->c->data[0];
1312
1313         state->state  = CTDB_CALL_WAIT;
1314         state->generation = ctdb->vnn_map->generation;
1315
1316         DLIST_ADD(ctdb->pending_calls, state);
1317
1318         ctdb_queue_packet(ctdb, &state->c->hdr);
1319
1320         return state;
1321 }
1322
1323 /*
1324   make a remote ctdb call - async recv - called in daemon context
1325
1326   This is called when the program wants to wait for a ctdb_call to complete and get the 
1327   results. This call will block unless the call has already completed.
1328 */
1329 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1330 {
1331         while (state->state < CTDB_CALL_DONE) {
1332                 event_loop_once(state->ctdb_db->ctdb->ev);
1333         }
1334         if (state->state != CTDB_CALL_DONE) {
1335                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1336                 talloc_free(state);
1337                 return -1;
1338         }
1339
1340         if (state->call->reply_data.dsize) {
1341                 call->reply_data.dptr = talloc_memdup(call,
1342                                                       state->call->reply_data.dptr,
1343                                                       state->call->reply_data.dsize);
1344                 call->reply_data.dsize = state->call->reply_data.dsize;
1345         } else {
1346                 call->reply_data.dptr = NULL;
1347                 call->reply_data.dsize = 0;
1348         }
1349         call->status = state->call->status;
1350         talloc_free(state);
1351         return 0;
1352 }
1353
1354
1355 /* 
1356    send a keepalive packet to the other node
1357 */
1358 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1359 {
1360         struct ctdb_req_keepalive *r;
1361         
1362         if (ctdb->methods == NULL) {
1363                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1364                 return;
1365         }
1366
1367         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1368                                     sizeof(struct ctdb_req_keepalive), 
1369                                     struct ctdb_req_keepalive);
1370         CTDB_NO_MEMORY_FATAL(ctdb, r);
1371         r->hdr.destnode  = destnode;
1372         r->hdr.reqid     = 0;
1373         
1374         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1375
1376         ctdb_queue_packet(ctdb, &r->hdr);
1377
1378         talloc_free(r);
1379 }
1380
1381
1382
1383 struct revokechild_deferred_call {
1384         struct ctdb_context *ctdb;
1385         struct ctdb_req_header *hdr;
1386         deferred_requeue_fn fn;
1387         void *ctx;
1388 };
1389
1390 struct revokechild_handle {
1391         struct revokechild_handle *next, *prev;
1392         struct ctdb_context *ctdb;
1393         struct ctdb_db_context *ctdb_db;
1394         struct fd_event *fde;
1395         int status;
1396         int fd[2];
1397         pid_t child;
1398         TDB_DATA key;
1399 };
1400
1401 struct revokechild_requeue_handle {
1402         struct ctdb_context *ctdb;
1403         struct ctdb_req_header *hdr;
1404         deferred_requeue_fn fn;
1405         void *ctx;
1406 };
1407
1408 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
1409                        struct timeval t, void *private_data)
1410 {
1411         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1412
1413         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1414         talloc_free(requeue_handle);
1415 }
1416
1417 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1418 {
1419         struct ctdb_context *ctdb = deferred_call->ctdb;
1420         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1421         struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1422
1423         requeue_handle->ctdb = ctdb;
1424         requeue_handle->hdr  = deferred_call->hdr;
1425         requeue_handle->fn   = deferred_call->fn;
1426         requeue_handle->ctx  = deferred_call->ctx;
1427         talloc_steal(requeue_handle, requeue_handle->hdr);
1428
1429         /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1430         event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1431
1432         return 0;
1433 }
1434
1435
1436 static int revokechild_destructor(struct revokechild_handle *rc)
1437 {
1438         if (rc->fde != NULL) {
1439                 talloc_free(rc->fde);
1440         }
1441
1442         if (rc->fd[0] != -1) {
1443                 close(rc->fd[0]);
1444         }
1445         if (rc->fd[1] != -1) {
1446                 close(rc->fd[1]);
1447         }
1448         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1449
1450         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1451         return 0;
1452 }
1453
1454 static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
1455                              uint16_t flags, void *private_data)
1456 {
1457         struct revokechild_handle *rc = talloc_get_type(private_data, 
1458                                                      struct revokechild_handle);
1459         int ret;
1460         char c;
1461
1462         ret = sys_read(rc->fd[0], &c, 1);
1463         if (ret != 1) {
1464                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1465                 rc->status = -1;
1466                 talloc_free(rc);
1467                 return;
1468         }
1469         if (c != 0) {
1470                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1471                 rc->status = -1;
1472                 talloc_free(rc);
1473                 return;
1474         }
1475
1476         talloc_free(rc);
1477 }
1478
1479 struct ctdb_revoke_state {
1480         struct ctdb_db_context *ctdb_db;
1481         TDB_DATA key;
1482         struct ctdb_ltdb_header *header;
1483         TDB_DATA data;
1484         int count;
1485         int status;
1486         int finished;
1487 };
1488
1489 static void update_record_cb(struct ctdb_client_control_state *state)
1490 {
1491         struct ctdb_revoke_state *revoke_state;
1492         int ret;
1493         int32_t res;
1494
1495         if (state == NULL) {
1496                 return;
1497         }
1498         revoke_state = state->async.private_data;
1499
1500         state->async.fn = NULL;
1501         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1502         if ((ret != 0) || (res != 0)) {
1503                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1504                 revoke_state->status = -1;
1505         }
1506
1507         revoke_state->count--;
1508         if (revoke_state->count <= 0) {
1509                 revoke_state->finished = 1;
1510         }
1511 }
1512
1513 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1514 {
1515         struct ctdb_revoke_state *revoke_state = private_data;
1516         struct ctdb_client_control_state *state;
1517
1518         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1519         if (state == NULL) {
1520                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1521                 revoke_state->status = -1;
1522                 return;
1523         }
1524         state->async.fn           = update_record_cb;
1525         state->async.private_data = revoke_state;
1526
1527         revoke_state->count++;
1528
1529 }
1530
1531 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
1532                               struct timeval yt, void *private_data)
1533 {
1534         struct ctdb_revoke_state *state = private_data;
1535
1536         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1537         state->finished = 1;
1538         state->status   = -1;
1539 }
1540
1541 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1542 {
1543         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1544         struct ctdb_ltdb_header new_header;
1545         TDB_DATA new_data;
1546
1547         state->ctdb_db = ctdb_db;
1548         state->key     = key;
1549         state->header  = header;
1550         state->data    = data;
1551  
1552         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1553
1554         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1555
1556         while (state->finished == 0) {
1557                 event_loop_once(ctdb->ev);
1558         }
1559
1560         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1561                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1562                 talloc_free(state);
1563                 return -1;
1564         }
1565         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1566                 ctdb_ltdb_unlock(ctdb_db, key);
1567                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1568                 talloc_free(state);
1569                 return -1;
1570         }
1571         header->rsn++;
1572         if (new_header.rsn > header->rsn) {
1573                 ctdb_ltdb_unlock(ctdb_db, key);
1574                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1575                 talloc_free(state);
1576                 return -1;
1577         }
1578         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1579                 ctdb_ltdb_unlock(ctdb_db, key);
1580                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1581                 talloc_free(state);
1582                 return -1;
1583         }
1584
1585         /*
1586          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1587          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1588          */
1589         if (state->status == 0) {
1590                 new_header.rsn++;
1591                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1592         } else {
1593                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1594                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1595         }
1596         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1597                 ctdb_ltdb_unlock(ctdb_db, key);
1598                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1599                 talloc_free(state);
1600                 return -1;
1601         }
1602         ctdb_ltdb_unlock(ctdb_db, key);
1603
1604         talloc_free(state);
1605         return 0;
1606 }
1607
1608
1609 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1610 {
1611         TDB_DATA tdata;
1612         struct revokechild_handle *rc;
1613         pid_t parent = getpid();
1614         int ret;
1615
1616         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1617         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1618         header->rsn   -= 1;
1619
1620         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1621                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1622                 return -1;
1623         }
1624
1625         tdata = tdb_fetch(ctdb_db->rottdb, key);
1626         if (tdata.dsize > 0) {
1627                 uint8_t *tmp;
1628
1629                 tmp = tdata.dptr;
1630                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1631                 free(tmp);
1632         }
1633
1634         rc->status    = 0;
1635         rc->ctdb      = ctdb;
1636         rc->ctdb_db   = ctdb_db;
1637         rc->fd[0]     = -1;
1638         rc->fd[1]     = -1;
1639
1640         talloc_set_destructor(rc, revokechild_destructor);
1641
1642         rc->key.dsize = key.dsize;
1643         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1644         if (rc->key.dptr == NULL) {
1645                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1646                 talloc_free(rc);
1647                 return -1;
1648         }
1649
1650         ret = pipe(rc->fd);
1651         if (ret != 0) {
1652                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1653                 talloc_free(rc);
1654                 return -1;
1655         }
1656
1657
1658         rc->child = ctdb_fork(ctdb);
1659         if (rc->child == (pid_t)-1) {
1660                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1661                 talloc_free(rc);
1662                 return -1;
1663         }
1664
1665         if (rc->child == 0) {
1666                 char c = 0;
1667                 close(rc->fd[0]);
1668                 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1669
1670                 ctdb_set_process_name("ctdb_revokechild");
1671                 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1672                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1673                         c = 1;
1674                         goto child_finished;
1675                 }
1676
1677                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1678
1679 child_finished:
1680                 sys_write(rc->fd[1], &c, 1);
1681                 /* make sure we die when our parent dies */
1682                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1683                         sleep(5);
1684                 }
1685                 _exit(0);
1686         }
1687
1688         close(rc->fd[1]);
1689         rc->fd[1] = -1;
1690         set_close_on_exec(rc->fd[0]);
1691
1692         /* This is an active revokechild child process */
1693         DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1694
1695         rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1696                                    EVENT_FD_READ, revokechild_handler,
1697                                    (void *)rc);
1698         if (rc->fde == NULL) {
1699                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1700                 talloc_free(rc);
1701         }
1702         tevent_fd_set_auto_close(rc->fde);
1703
1704         return 0;
1705 }
1706
1707 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1708 {
1709         struct revokechild_handle *rc;
1710         struct revokechild_deferred_call *deferred_call;
1711
1712         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1713                 if (rc->key.dsize == 0) {
1714                         continue;
1715                 }
1716                 if (rc->key.dsize != key.dsize) {
1717                         continue;
1718                 }
1719                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1720                         break;
1721                 }
1722         }
1723
1724         if (rc == NULL) {
1725                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1726                 return -1;
1727         }
1728
1729         deferred_call = talloc(rc, struct revokechild_deferred_call);
1730         if (deferred_call == NULL) {
1731                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1732                 return -1;
1733         }
1734
1735         deferred_call->ctdb = ctdb;
1736         deferred_call->hdr  = hdr;
1737         deferred_call->fn   = fn;
1738         deferred_call->ctx  = call_context;
1739
1740         talloc_set_destructor(deferred_call, deferred_call_destructor);
1741         talloc_steal(deferred_call, hdr);
1742
1743         return 0;
1744 }