- added a --torture option to all ctdb tools. This sets
[samba.git] / ctdb / common / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20 /*
21   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
22   protocol design and packet details
23 */
24 #include "includes.h"
25 #include "lib/events/events.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30
31 /*
32   find the ctdb_db from a db index
33  */
34  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
35 {
36         struct ctdb_db_context *ctdb_db;
37
38         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
39                 if (ctdb_db->db_id == id) {
40                         break;
41                 }
42         }
43         return ctdb_db;
44 }
45
46
47 /*
48   local version of ctdb_call
49 */
50 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
51                     struct ctdb_ltdb_header *header, TDB_DATA *data,
52                     uint32_t caller)
53 {
54         struct ctdb_call_info *c;
55         struct ctdb_registered_call *fn;
56         struct ctdb_context *ctdb = ctdb_db->ctdb;
57         
58         c = talloc(ctdb, struct ctdb_call_info);
59         CTDB_NO_MEMORY(ctdb, c);
60
61         c->key = call->key;
62         c->call_data = &call->call_data;
63         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
64         c->record_data.dsize = data->dsize;
65         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
66         c->new_data = NULL;
67         c->reply_data = NULL;
68         c->status = 0;
69
70         for (fn=ctdb_db->calls;fn;fn=fn->next) {
71                 if (fn->id == call->call_id) break;
72         }
73         if (fn == NULL) {
74                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
75                 talloc_free(c);
76                 return -1;
77         }
78
79         if (fn->fn(c) != 0) {
80                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
81                 talloc_free(c);
82                 return -1;
83         }
84
85         if (header->laccessor != caller) {
86                 header->lacount = 0;
87         }
88         header->laccessor = caller;
89         header->lacount++;
90
91         /* we need to force the record to be written out if this was a remote access,
92            so that the lacount is updated */
93         if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
94                 c->new_data = &c->record_data;
95         }
96
97         if (c->new_data) {
98                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
99                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
100                         talloc_free(c);
101                         return -1;
102                 }
103         }
104
105         if (c->reply_data) {
106                 call->reply_data = *c->reply_data;
107                 talloc_steal(ctdb, call->reply_data.dptr);
108                 talloc_set_name_const(call->reply_data.dptr, __location__);
109         } else {
110                 call->reply_data.dptr = NULL;
111                 call->reply_data.dsize = 0;
112         }
113         call->status = c->status;
114
115         talloc_free(c);
116
117         return 0;
118 }
119
120 /*
121   send an error reply
122 */
123 static void ctdb_send_error(struct ctdb_context *ctdb, 
124                             struct ctdb_req_header *hdr, uint32_t status,
125                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
126 static void ctdb_send_error(struct ctdb_context *ctdb, 
127                             struct ctdb_req_header *hdr, uint32_t status,
128                             const char *fmt, ...)
129 {
130         va_list ap;
131         struct ctdb_reply_error *r;
132         char *msg;
133         int msglen, len;
134
135         va_start(ap, fmt);
136         msg = talloc_vasprintf(ctdb, fmt, ap);
137         if (msg == NULL) {
138                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
139         }
140         va_end(ap);
141
142         msglen = strlen(msg)+1;
143         len = offsetof(struct ctdb_reply_error, msg);
144         r = ctdb->methods->allocate_pkt(msg, len + msglen);
145         CTDB_NO_MEMORY_FATAL(ctdb, r);
146         talloc_set_name_const(r, "send_error packet");
147
148         r->hdr.length    = len + msglen;
149         r->hdr.ctdb_magic = CTDB_MAGIC;
150         r->hdr.ctdb_version = CTDB_VERSION;
151         r->hdr.operation = CTDB_REPLY_ERROR;
152         r->hdr.destnode  = hdr->srcnode;
153         r->hdr.srcnode   = ctdb->vnn;
154         r->hdr.reqid     = hdr->reqid;
155         r->status        = status;
156         r->msglen        = msglen;
157         memcpy(&r->msg[0], msg, msglen);
158
159         ctdb_queue_packet(ctdb, &r->hdr);
160
161         talloc_free(msg);
162 }
163
164
165 /*
166   send a redirect reply
167 */
168 static void ctdb_call_send_redirect(struct ctdb_context *ctdb, 
169                                     struct ctdb_req_call *c, 
170                                     struct ctdb_ltdb_header *header)
171 {
172         struct ctdb_reply_redirect *r;
173
174         r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
175         CTDB_NO_MEMORY_FATAL(ctdb, r);
176         talloc_set_name_const(r, "send_redirect packet");
177         r->hdr.length = sizeof(*r);
178         r->hdr.ctdb_magic = CTDB_MAGIC;
179         r->hdr.ctdb_version = CTDB_VERSION;
180         r->hdr.operation = CTDB_REPLY_REDIRECT;
181         r->hdr.destnode  = c->hdr.srcnode;
182         r->hdr.srcnode   = ctdb->vnn;
183         r->hdr.reqid     = c->hdr.reqid;
184         r->dmaster       = header->dmaster;
185
186         ctdb_queue_packet(ctdb, &r->hdr);
187
188         talloc_free(r);
189 }
190
191 /*
192   send a dmaster request (give another node the dmaster for a record)
193
194   This is always sent to the lmaster, which ensures that the lmaster
195   always knows who the dmaster is. The lmaster will then send a
196   CTDB_REPLY_DMASTER to the new dmaster
197 */
198 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
199                                    struct ctdb_req_call *c, 
200                                    struct ctdb_ltdb_header *header,
201                                    TDB_DATA *key, TDB_DATA *data)
202 {
203         struct ctdb_req_dmaster *r;
204         struct ctdb_context *ctdb = ctdb_db->ctdb;
205         int len;
206         
207         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
208         r = ctdb->methods->allocate_pkt(ctdb, len);
209         CTDB_NO_MEMORY_FATAL(ctdb, r);
210         talloc_set_name_const(r, "send_dmaster packet");
211         r->hdr.length    = len;
212         r->hdr.ctdb_magic = CTDB_MAGIC;
213         r->hdr.ctdb_version = CTDB_VERSION;
214         r->hdr.operation = CTDB_REQ_DMASTER;
215         r->hdr.destnode  = ctdb_lmaster(ctdb, key);
216         r->hdr.srcnode   = ctdb->vnn;
217         r->hdr.reqid     = c->hdr.reqid;
218         r->db_id         = c->db_id;
219         r->dmaster       = c->hdr.srcnode;
220         r->keylen        = key->dsize;
221         r->datalen       = data->dsize;
222         memcpy(&r->data[0], key->dptr, key->dsize);
223         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
224
225         if (r->hdr.destnode == ctdb->vnn) {
226                 /* we are the lmaster - don't send to ourselves */
227                 ctdb_recv_pkt(ctdb, (uint8_t *)&r->hdr, r->hdr.length);
228                 return;
229         } else {
230                 ctdb_queue_packet(ctdb, &r->hdr);
231
232                 /* update the ltdb to record the new dmaster */
233                 header->dmaster = r->hdr.destnode;
234                 ctdb_ltdb_store(ctdb_db, *key, header, *data);
235         }
236
237         talloc_free(r);
238 }
239
240
241 /*
242   called when a CTDB_REQ_DMASTER packet comes in
243
244   this comes into the lmaster for a record when the current dmaster
245   wants to give up the dmaster role and give it to someone else
246 */
247 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
248 {
249         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
250         struct ctdb_reply_dmaster *r;
251         TDB_DATA key, data, data2;
252         struct ctdb_ltdb_header header;
253         struct ctdb_db_context *ctdb_db;
254         int ret, len;
255         TALLOC_CTX *tmp_ctx;
256
257         key.dptr = c->data;
258         key.dsize = c->keylen;
259         data.dptr = c->data + c->keylen;
260         data.dsize = c->datalen;
261
262         ctdb_db = find_ctdb_db(ctdb, c->db_id);
263         if (!ctdb_db) {
264                 ctdb_send_error(ctdb, hdr, -1,
265                                 "Unknown database in request. db_id==0x%08x",
266                                 c->db_id);
267                 return;
268         }
269         
270         /* fetch the current record */
271         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
272                                            ctdb_recv_raw_pkt, ctdb);
273         if (ret == -1) {
274                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
275                 return;
276         }
277         if (ret == -2) {
278                 DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
279                 return;
280         }
281         
282         /* its a protocol error if the sending node is not the current dmaster */
283         if (header.dmaster != hdr->srcnode) {
284                 ctdb_fatal(ctdb, "dmaster request from non-master");
285                 return;
286         }
287         
288         header.dmaster = c->dmaster;
289         ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
290         ctdb_ltdb_unlock(ctdb_db, key);
291         if (ret != 0) {
292                 ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
293                 return;
294         }
295
296         /* put the packet on a temporary context, allowing us to safely free
297            it below even if ctdb_reply_dmaster() has freed it already */
298         tmp_ctx = talloc_new(ctdb);
299
300         /* send the CTDB_REPLY_DMASTER */
301         len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
302         r = ctdb->methods->allocate_pkt(tmp_ctx, len);
303         CTDB_NO_MEMORY_FATAL(ctdb, r);
304
305         talloc_set_name_const(r, "reply_dmaster packet");
306         r->hdr.length    = len;
307         r->hdr.ctdb_magic = CTDB_MAGIC;
308         r->hdr.ctdb_version = CTDB_VERSION;
309         r->hdr.operation = CTDB_REPLY_DMASTER;
310         r->hdr.destnode  = c->dmaster;
311         r->hdr.srcnode   = ctdb->vnn;
312         r->hdr.reqid     = hdr->reqid;
313         r->datalen       = data.dsize;
314         memcpy(&r->data[0], data.dptr, data.dsize);
315
316         if (r->hdr.destnode == r->hdr.srcnode) {
317                 ctdb_reply_dmaster(ctdb, &r->hdr);
318         } else {
319                 ctdb_queue_packet(ctdb, &r->hdr);
320         }
321
322         talloc_free(tmp_ctx);
323 }
324
325
326 /*
327   called when a CTDB_REQ_CALL packet comes in
328 */
329 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
330 {
331         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
332         TDB_DATA data;
333         struct ctdb_reply_call *r;
334         int ret, len;
335         struct ctdb_ltdb_header header;
336         struct ctdb_call call;
337         struct ctdb_db_context *ctdb_db;
338
339         ctdb_db = find_ctdb_db(ctdb, c->db_id);
340         if (!ctdb_db) {
341                 ctdb_send_error(ctdb, hdr, -1,
342                                 "Unknown database in request. db_id==0x%08x",
343                                 c->db_id);
344                 return;
345         }
346
347         call.call_id  = c->callid;
348         call.key.dptr = c->data;
349         call.key.dsize = c->keylen;
350         call.call_data.dptr = c->data + c->keylen;
351         call.call_data.dsize = c->calldatalen;
352
353         /* determine if we are the dmaster for this key. This also
354            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
355            if the call will be answered locally */
356
357         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
358                                            ctdb_recv_raw_pkt, ctdb);
359         if (ret == -1) {
360                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
361                 return;
362         }
363         if (ret == -2) {
364                 DEBUG(2,(__location__ " deferred ctdb_request_call\n"));
365                 return;
366         }
367
368         /* if we are not the dmaster, then send a redirect to the
369            requesting node */
370         if (header.dmaster != ctdb->vnn) {
371                 ctdb_call_send_redirect(ctdb, c, &header);
372                 talloc_free(data.dptr);
373                 ctdb_ltdb_unlock(ctdb_db, call.key);
374                 return;
375         }
376
377         /* if this nodes has done enough consecutive calls on the same record
378            then give them the record
379            or if the node requested an immediate migration
380         */
381         if ( (header.laccessor == c->hdr.srcnode
382               && header.lacount >= ctdb->max_lacount)
383            || c->flags&CTDB_IMMEDIATE_MIGRATION ) {
384                 ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
385                 talloc_free(data.dptr);
386                 ctdb_ltdb_unlock(ctdb_db, call.key);
387                 return;
388         }
389
390         ctdb_call_local(ctdb_db, &call, &header, &data, c->hdr.srcnode);
391
392         ctdb_ltdb_unlock(ctdb_db, call.key);
393
394         len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
395         r = ctdb->methods->allocate_pkt(ctdb, len);
396         CTDB_NO_MEMORY_FATAL(ctdb, r);
397         talloc_set_name_const(r, "reply_call packet");
398         r->hdr.length    = len;
399         r->hdr.ctdb_magic = CTDB_MAGIC;
400         r->hdr.ctdb_version = CTDB_VERSION;
401         r->hdr.operation = CTDB_REPLY_CALL;
402         r->hdr.destnode  = hdr->srcnode;
403         r->hdr.srcnode   = hdr->destnode;
404         r->hdr.reqid     = hdr->reqid;
405         r->status        = call.status;
406         r->datalen       = call.reply_data.dsize;
407         if (call.reply_data.dsize) {
408                 memcpy(&r->data[0], call.reply_data.dptr, call.reply_data.dsize);
409                 talloc_free(call.reply_data.dptr);
410         }
411
412         ctdb_queue_packet(ctdb, &r->hdr);
413
414         talloc_free(r);
415 }
416
417 /*
418   called when a CTDB_REPLY_CALL packet comes in
419
420   This packet comes in response to a CTDB_REQ_CALL request packet. It
421   contains any reply data from the call
422 */
423 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
424 {
425         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
426         struct ctdb_call_state *state;
427
428         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
429         if (state == NULL) {
430                 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
431                 return;
432         }
433
434         state->call.reply_data.dptr = c->data;
435         state->call.reply_data.dsize = c->datalen;
436         state->call.status = c->status;
437
438         talloc_steal(state, c);
439
440         state->state = CTDB_CALL_DONE;
441         if (state->async.fn) {
442                 state->async.fn(state);
443         }
444 }
445
446 /*
447   called when a CTDB_REPLY_DMASTER packet comes in
448
449   This packet comes in from the lmaster response to a CTDB_REQ_CALL
450   request packet. It means that the current dmaster wants to give us
451   the dmaster role
452 */
453 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
454 {
455         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
456         struct ctdb_call_state *state;
457         struct ctdb_db_context *ctdb_db;
458         TDB_DATA data;
459
460         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
461         if (state == NULL) {
462                 return;
463         }
464
465         ctdb_db = state->ctdb_db;
466
467         data.dptr = c->data;
468         data.dsize = c->datalen;
469
470         talloc_steal(state, c);
471
472         /* we're now the dmaster - update our local ltdb with new header
473            and data */
474         state->header.dmaster = ctdb->vnn;
475
476         if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
477                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
478                 return;
479         }
480
481         ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
482
483         talloc_steal(state, state->call.reply_data.dptr);
484
485         state->state = CTDB_CALL_DONE;
486         if (state->async.fn) {
487                 state->async.fn(state);
488         }
489 }
490
491
492 /*
493   called when a CTDB_REPLY_ERROR packet comes in
494 */
495 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
496 {
497         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
498         struct ctdb_call_state *state;
499
500         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
501         if (state == NULL) return;
502
503         talloc_steal(state, c);
504
505         state->state  = CTDB_CALL_ERROR;
506         state->errmsg = (char *)c->msg;
507         if (state->async.fn) {
508                 state->async.fn(state);
509         }
510 }
511
512
513 /*
514   called when a CTDB_REPLY_REDIRECT packet comes in
515
516   This packet arrives when we have sent a CTDB_REQ_CALL request and
517   the node that received it is not the dmaster for the given key. We
518   are given a hint as to what node to try next.
519 */
520 void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
521 {
522         struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
523         struct ctdb_call_state *state;
524
525         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
526         if (state == NULL) return;
527
528         talloc_steal(state, c);
529         
530         /* don't allow for too many redirects */
531         if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
532                 c->dmaster = ctdb_lmaster(ctdb, &state->call.key);
533         }
534
535         /* send it off again */
536         state->node = ctdb->nodes[c->dmaster];
537         state->c->hdr.destnode = c->dmaster;
538
539         ctdb_queue_packet(ctdb, &state->c->hdr);
540 }
541
542 /*
543   destroy a ctdb_call
544 */
545 static int ctdb_call_destructor(struct ctdb_call_state *state)
546 {
547         idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
548         return 0;
549 }
550
551
552 /*
553   called when a ctdb_call times out
554 */
555 void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, 
556                        struct timeval t, void *private_data)
557 {
558         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
559         state->state = CTDB_CALL_ERROR;
560         ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
561                        state->c->hdr.reqid);
562         if (state->async.fn) {
563                 state->async.fn(state);
564         }
565 }
566
567 /*
568   this allows the caller to setup a async.fn 
569 */
570 static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
571                        struct timeval t, void *private_data)
572 {
573         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
574         if (state->async.fn) {
575                 state->async.fn(state);
576         }
577 }       
578
579
580 /*
581   construct an event driven local ctdb_call
582
583   this is used so that locally processed ctdb_call requests are processed
584   in an event driven manner
585 */
586 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
587                                              struct ctdb_call *call,
588                                              struct ctdb_ltdb_header *header,
589                                              TDB_DATA *data)
590 {
591         struct ctdb_call_state *state;
592         struct ctdb_context *ctdb = ctdb_db->ctdb;
593         int ret;
594
595         state = talloc_zero(ctdb_db, struct ctdb_call_state);
596         CTDB_NO_MEMORY_NULL(ctdb, state);
597
598         talloc_steal(state, data->dptr);
599
600         state->state = CTDB_CALL_DONE;
601         state->node = ctdb->nodes[ctdb->vnn];
602         state->call = *call;
603         state->ctdb_db = ctdb_db;
604
605         ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
606         talloc_steal(state, state->call.reply_data.dptr);
607
608         event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
609
610         return state;
611 }
612
613
614 /*
615   make a remote ctdb call - async send. Called in daemon context.
616
617   This constructs a ctdb_call request and queues it for processing. 
618   This call never blocks.
619 */
620 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
621                                                      struct ctdb_call *call, 
622                                                      struct ctdb_ltdb_header *header)
623 {
624         uint32_t len;
625         struct ctdb_call_state *state;
626         struct ctdb_context *ctdb = ctdb_db->ctdb;
627
628         state = talloc_zero(ctdb_db, struct ctdb_call_state);
629         CTDB_NO_MEMORY_NULL(ctdb, state);
630
631         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
632         state->c = ctdb->methods->allocate_pkt(state, len);
633         CTDB_NO_MEMORY_NULL(ctdb, state->c);
634         talloc_set_name_const(state->c, "req_call packet");
635
636         state->c->hdr.length    = len;
637         state->c->hdr.ctdb_magic = CTDB_MAGIC;
638         state->c->hdr.ctdb_version = CTDB_VERSION;
639         state->c->hdr.operation = CTDB_REQ_CALL;
640         state->c->hdr.destnode  = header->dmaster;
641         state->c->hdr.srcnode   = ctdb->vnn;
642         /* this limits us to 16k outstanding messages - not unreasonable */
643         state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
644         state->c->flags         = call->flags;
645         state->c->db_id         = ctdb_db->db_id;
646         state->c->callid        = call->call_id;
647         state->c->keylen        = call->key.dsize;
648         state->c->calldatalen   = call->call_data.dsize;
649         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
650         memcpy(&state->c->data[call->key.dsize], 
651                call->call_data.dptr, call->call_data.dsize);
652         state->call                = *call;
653         state->call.call_data.dptr = &state->c->data[call->key.dsize];
654         state->call.key.dptr       = &state->c->data[0];
655
656         state->node   = ctdb->nodes[header->dmaster];
657         state->state  = CTDB_CALL_WAIT;
658         state->header = *header;
659         state->ctdb_db = ctdb_db;
660
661         talloc_set_destructor(state, ctdb_call_destructor);
662
663         ctdb_queue_packet(ctdb, &state->c->hdr);
664
665         event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
666                         ctdb_call_timeout, state);
667         return state;
668 }
669
670 /*
671   make a remote ctdb call - async recv - called in daemon context
672
673   This is called when the program wants to wait for a ctdb_call to complete and get the 
674   results. This call will block unless the call has already completed.
675 */
676 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
677 {
678         while (state->state < CTDB_CALL_DONE) {
679                 event_loop_once(state->node->ctdb->ev);
680         }
681         if (state->state != CTDB_CALL_DONE) {
682                 ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
683                 talloc_free(state);
684                 return -1;
685         }
686
687         if (state->call.reply_data.dsize) {
688                 call->reply_data.dptr = talloc_memdup(state->node->ctdb,
689                                                       state->call.reply_data.dptr,
690                                                       state->call.reply_data.dsize);
691                 call->reply_data.dsize = state->call.reply_data.dsize;
692         } else {
693                 call->reply_data.dptr = NULL;
694                 call->reply_data.dsize = 0;
695         }
696         call->status = state->call.status;
697         talloc_free(state);
698         return 0;
699 }
700
701