r23795: more v2->v3 conversion
[samba.git] / source4 / cluster / ctdb / common / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 3 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20 /*
21   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
22   protocol design and packet details
23 */
24 #include "includes.h"
25 #include "lib/events/events.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30
31 /*
32   find the ctdb_db from a db index
33  */
34  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
35 {
36         struct ctdb_db_context *ctdb_db;
37
38         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
39                 if (ctdb_db->db_id == id) {
40                         break;
41                 }
42         }
43         return ctdb_db;
44 }
45
46
47 /*
48   local version of ctdb_call
49 */
50 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
51                     struct ctdb_ltdb_header *header, TDB_DATA *data,
52                     uint32_t caller)
53 {
54         struct ctdb_call_info *c;
55         struct ctdb_registered_call *fn;
56         struct ctdb_context *ctdb = ctdb_db->ctdb;
57         
58         c = talloc(ctdb, struct ctdb_call_info);
59         CTDB_NO_MEMORY(ctdb, c);
60
61         c->key = call->key;
62         c->call_data = &call->call_data;
63         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
64         c->record_data.dsize = data->dsize;
65         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
66         c->new_data = NULL;
67         c->reply_data = NULL;
68         c->status = 0;
69
70         for (fn=ctdb_db->calls;fn;fn=fn->next) {
71                 if (fn->id == call->call_id) break;
72         }
73         if (fn == NULL) {
74                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
75                 talloc_free(c);
76                 return -1;
77         }
78
79         if (fn->fn(c) != 0) {
80                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
81                 talloc_free(c);
82                 return -1;
83         }
84
85         if (header->laccessor != caller) {
86                 header->lacount = 0;
87         }
88         header->laccessor = caller;
89         header->lacount++;
90
91         /* we need to force the record to be written out if this was a remote access,
92            so that the lacount is updated */
93         if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
94                 c->new_data = &c->record_data;
95         }
96
97         if (c->new_data) {
98                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
99                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
100                         talloc_free(c);
101                         return -1;
102                 }
103         }
104
105         if (c->reply_data) {
106                 call->reply_data = *c->reply_data;
107                 talloc_steal(ctdb, call->reply_data.dptr);
108                 talloc_set_name_const(call->reply_data.dptr, __location__);
109         } else {
110                 call->reply_data.dptr = NULL;
111                 call->reply_data.dsize = 0;
112         }
113         call->status = c->status;
114
115         talloc_free(c);
116
117         return 0;
118 }
119
120 /*
121   send an error reply
122 */
123 static void ctdb_send_error(struct ctdb_context *ctdb, 
124                             struct ctdb_req_header *hdr, uint32_t status,
125                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
126 static void ctdb_send_error(struct ctdb_context *ctdb, 
127                             struct ctdb_req_header *hdr, uint32_t status,
128                             const char *fmt, ...)
129 {
130         va_list ap;
131         struct ctdb_reply_error *r;
132         char *msg;
133         int msglen, len;
134
135         va_start(ap, fmt);
136         msg = talloc_vasprintf(ctdb, fmt, ap);
137         if (msg == NULL) {
138                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
139         }
140         va_end(ap);
141
142         msglen = strlen(msg)+1;
143         len = offsetof(struct ctdb_reply_error, msg);
144         r = ctdb->methods->allocate_pkt(msg, len + msglen);
145         CTDB_NO_MEMORY_FATAL(ctdb, r);
146         talloc_set_name_const(r, "send_error packet");
147
148         r->hdr.length    = len + msglen;
149         r->hdr.ctdb_magic = CTDB_MAGIC;
150         r->hdr.ctdb_version = CTDB_VERSION;
151         r->hdr.operation = CTDB_REPLY_ERROR;
152         r->hdr.destnode  = hdr->srcnode;
153         r->hdr.srcnode   = ctdb->vnn;
154         r->hdr.reqid     = hdr->reqid;
155         r->status        = status;
156         r->msglen        = msglen;
157         memcpy(&r->msg[0], msg, msglen);
158
159         ctdb_queue_packet(ctdb, &r->hdr);
160
161         talloc_free(msg);
162 }
163
164
165 /*
166   send a redirect reply
167 */
168 static void ctdb_call_send_redirect(struct ctdb_context *ctdb, 
169                                     struct ctdb_req_call *c, 
170                                     struct ctdb_ltdb_header *header)
171 {
172         struct ctdb_reply_redirect *r;
173
174         r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
175         CTDB_NO_MEMORY_FATAL(ctdb, r);
176         talloc_set_name_const(r, "send_redirect packet");
177         r->hdr.length = sizeof(*r);
178         r->hdr.ctdb_magic = CTDB_MAGIC;
179         r->hdr.ctdb_version = CTDB_VERSION;
180         r->hdr.operation = CTDB_REPLY_REDIRECT;
181         r->hdr.destnode  = c->hdr.srcnode;
182         r->hdr.srcnode   = ctdb->vnn;
183         r->hdr.reqid     = c->hdr.reqid;
184         r->dmaster       = header->dmaster;
185
186         ctdb_queue_packet(ctdb, &r->hdr);
187
188         talloc_free(r);
189 }
190
191 /*
192   send a dmaster request (give another node the dmaster for a record)
193
194   This is always sent to the lmaster, which ensures that the lmaster
195   always knows who the dmaster is. The lmaster will then send a
196   CTDB_REPLY_DMASTER to the new dmaster
197 */
198 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
199                                    struct ctdb_req_call *c, 
200                                    struct ctdb_ltdb_header *header,
201                                    TDB_DATA *key, TDB_DATA *data)
202 {
203         struct ctdb_req_dmaster *r;
204         struct ctdb_context *ctdb = ctdb_db->ctdb;
205         int len;
206         
207         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
208         r = ctdb->methods->allocate_pkt(ctdb, len);
209         CTDB_NO_MEMORY_FATAL(ctdb, r);
210         talloc_set_name_const(r, "send_dmaster packet");
211         r->hdr.length    = len;
212         r->hdr.ctdb_magic = CTDB_MAGIC;
213         r->hdr.ctdb_version = CTDB_VERSION;
214         r->hdr.operation = CTDB_REQ_DMASTER;
215         r->hdr.destnode  = ctdb_lmaster(ctdb, key);
216         r->hdr.srcnode   = ctdb->vnn;
217         r->hdr.reqid     = c->hdr.reqid;
218         r->db_id         = c->db_id;
219         r->dmaster       = c->hdr.srcnode;
220         r->keylen        = key->dsize;
221         r->datalen       = data->dsize;
222         memcpy(&r->data[0], key->dptr, key->dsize);
223         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
224
225         /* XXX - probably not necessary when lmaster==dmaster
226            update the ltdb to record the new dmaster */
227         header->dmaster = r->hdr.destnode;
228         ctdb_ltdb_store(ctdb_db, *key, header, *data);
229         
230         ctdb_queue_packet(ctdb, &r->hdr);
231
232         talloc_free(r);
233 }
234
235
236 /*
237   called when a CTDB_REQ_DMASTER packet comes in
238
239   this comes into the lmaster for a record when the current dmaster
240   wants to give up the dmaster role and give it to someone else
241 */
242 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
243 {
244         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
245         struct ctdb_reply_dmaster *r;
246         TDB_DATA key, data, data2;
247         struct ctdb_ltdb_header header;
248         struct ctdb_db_context *ctdb_db;
249         int ret, len;
250         TALLOC_CTX *tmp_ctx;
251
252         key.dptr = c->data;
253         key.dsize = c->keylen;
254         data.dptr = c->data + c->keylen;
255         data.dsize = c->datalen;
256
257         ctdb_db = find_ctdb_db(ctdb, c->db_id);
258         if (!ctdb_db) {
259                 ctdb_send_error(ctdb, hdr, -1,
260                                 "Unknown database in request. db_id==0x%08x",
261                                 c->db_id);
262                 return;
263         }
264         
265         /* fetch the current record */
266         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
267                                            ctdb_recv_raw_pkt, ctdb);
268         if (ret == -1) {
269                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
270                 return;
271         }
272         if (ret == -2) {
273                 DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
274                 return;
275         }
276         
277         /* its a protocol error if the sending node is not the current dmaster */
278         if (header.dmaster != hdr->srcnode && 
279             hdr->srcnode != ctdb_lmaster(ctdb_db->ctdb, &key)) {
280                 ctdb_fatal(ctdb, "dmaster request from non-master");
281                 return;
282         }
283         
284         header.dmaster = c->dmaster;
285         ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
286         ctdb_ltdb_unlock(ctdb_db, key);
287         if (ret != 0) {
288                 ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
289                 return;
290         }
291
292         /* put the packet on a temporary context, allowing us to safely free
293            it below even if ctdb_reply_dmaster() has freed it already */
294         tmp_ctx = talloc_new(ctdb);
295
296         /* send the CTDB_REPLY_DMASTER */
297         len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
298         r = ctdb->methods->allocate_pkt(tmp_ctx, len);
299         CTDB_NO_MEMORY_FATAL(ctdb, r);
300
301         talloc_set_name_const(r, "reply_dmaster packet");
302         r->hdr.length    = len;
303         r->hdr.ctdb_magic = CTDB_MAGIC;
304         r->hdr.ctdb_version = CTDB_VERSION;
305         r->hdr.operation = CTDB_REPLY_DMASTER;
306         r->hdr.destnode  = c->dmaster;
307         r->hdr.srcnode   = ctdb->vnn;
308         r->hdr.reqid     = hdr->reqid;
309         r->datalen       = data.dsize;
310         memcpy(&r->data[0], data.dptr, data.dsize);
311
312         ctdb_queue_packet(ctdb, &r->hdr);
313
314         talloc_free(tmp_ctx);
315 }
316
317
318 /*
319   called when a CTDB_REQ_CALL packet comes in
320 */
321 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
322 {
323         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
324         TDB_DATA data;
325         struct ctdb_reply_call *r;
326         int ret, len;
327         struct ctdb_ltdb_header header;
328         struct ctdb_call call;
329         struct ctdb_db_context *ctdb_db;
330
331         ctdb_db = find_ctdb_db(ctdb, c->db_id);
332         if (!ctdb_db) {
333                 ctdb_send_error(ctdb, hdr, -1,
334                                 "Unknown database in request. db_id==0x%08x",
335                                 c->db_id);
336                 return;
337         }
338
339         call.call_id  = c->callid;
340         call.key.dptr = c->data;
341         call.key.dsize = c->keylen;
342         call.call_data.dptr = c->data + c->keylen;
343         call.call_data.dsize = c->calldatalen;
344
345         /* determine if we are the dmaster for this key. This also
346            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
347            if the call will be answered locally */
348
349         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
350                                            ctdb_recv_raw_pkt, ctdb);
351         if (ret == -1) {
352                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
353                 return;
354         }
355         if (ret == -2) {
356                 DEBUG(2,(__location__ " deferred ctdb_request_call\n"));
357                 return;
358         }
359
360         /* if we are not the dmaster, then send a redirect to the
361            requesting node */
362         if (header.dmaster != ctdb->vnn) {
363                 ctdb_call_send_redirect(ctdb, c, &header);
364                 talloc_free(data.dptr);
365                 ctdb_ltdb_unlock(ctdb_db, call.key);
366                 return;
367         }
368
369         /* if this nodes has done enough consecutive calls on the same record
370            then give them the record
371            or if the node requested an immediate migration
372         */
373         if ( (header.laccessor == c->hdr.srcnode
374               && header.lacount >= ctdb->max_lacount)
375            || c->flags&CTDB_IMMEDIATE_MIGRATION ) {
376                 ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
377                 talloc_free(data.dptr);
378                 ctdb_ltdb_unlock(ctdb_db, call.key);
379                 return;
380         }
381
382         ctdb_call_local(ctdb_db, &call, &header, &data, c->hdr.srcnode);
383
384         ctdb_ltdb_unlock(ctdb_db, call.key);
385
386         len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
387         r = ctdb->methods->allocate_pkt(ctdb, len);
388         CTDB_NO_MEMORY_FATAL(ctdb, r);
389         talloc_set_name_const(r, "reply_call packet");
390         r->hdr.length    = len;
391         r->hdr.ctdb_magic = CTDB_MAGIC;
392         r->hdr.ctdb_version = CTDB_VERSION;
393         r->hdr.operation = CTDB_REPLY_CALL;
394         r->hdr.destnode  = hdr->srcnode;
395         r->hdr.srcnode   = hdr->destnode;
396         r->hdr.reqid     = hdr->reqid;
397         r->status        = call.status;
398         r->datalen       = call.reply_data.dsize;
399         if (call.reply_data.dsize) {
400                 memcpy(&r->data[0], call.reply_data.dptr, call.reply_data.dsize);
401                 talloc_free(call.reply_data.dptr);
402         }
403
404         ctdb_queue_packet(ctdb, &r->hdr);
405
406         talloc_free(r);
407 }
408
409 /*
410   called when a CTDB_REPLY_CALL packet comes in
411
412   This packet comes in response to a CTDB_REQ_CALL request packet. It
413   contains any reply data from the call
414 */
415 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
416 {
417         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
418         struct ctdb_call_state *state;
419
420         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
421         if (state == NULL) {
422                 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
423                 return;
424         }
425
426         state->call.reply_data.dptr = c->data;
427         state->call.reply_data.dsize = c->datalen;
428         state->call.status = c->status;
429
430         talloc_steal(state, c);
431
432         state->state = CTDB_CALL_DONE;
433         if (state->async.fn) {
434                 state->async.fn(state);
435         }
436 }
437
438 /*
439   called when a CTDB_REPLY_DMASTER packet comes in
440
441   This packet comes in from the lmaster response to a CTDB_REQ_CALL
442   request packet. It means that the current dmaster wants to give us
443   the dmaster role
444 */
445 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
446 {
447         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
448         struct ctdb_call_state *state;
449         struct ctdb_db_context *ctdb_db;
450         TDB_DATA data;
451         int ret;
452
453         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
454         if (state == NULL) {
455                 return;
456         }
457
458         ctdb_db = state->ctdb_db;
459
460         ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr,
461                                      ctdb_recv_raw_pkt, ctdb);
462         if (ret == -2) {
463                 return;
464         }
465         if (ret != 0) {
466                 DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
467                 return;
468         }
469
470         data.dptr = c->data;
471         data.dsize = c->datalen;
472
473         talloc_steal(state, c);
474
475         /* we're now the dmaster - update our local ltdb with new header
476            and data */
477         state->header.dmaster = ctdb->vnn;
478
479         if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
480                 ctdb_ltdb_unlock(ctdb_db, state->call.key);
481                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
482                 return;
483         }
484
485         ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
486
487         ctdb_ltdb_unlock(ctdb_db, state->call.key);
488
489         talloc_steal(state, state->call.reply_data.dptr);
490
491         state->state = CTDB_CALL_DONE;
492         if (state->async.fn) {
493                 state->async.fn(state);
494         }
495 }
496
497
498 /*
499   called when a CTDB_REPLY_ERROR packet comes in
500 */
501 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
502 {
503         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
504         struct ctdb_call_state *state;
505
506         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
507         if (state == NULL) return;
508
509         talloc_steal(state, c);
510
511         state->state  = CTDB_CALL_ERROR;
512         state->errmsg = (char *)c->msg;
513         if (state->async.fn) {
514                 state->async.fn(state);
515         }
516 }
517
518
519 /*
520   called when a CTDB_REPLY_REDIRECT packet comes in
521
522   This packet arrives when we have sent a CTDB_REQ_CALL request and
523   the node that received it is not the dmaster for the given key. We
524   are given a hint as to what node to try next.
525 */
526 void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
527 {
528         struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
529         struct ctdb_call_state *state;
530
531         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
532         if (state == NULL) return;
533
534         talloc_steal(state, c);
535         
536         /* don't allow for too many redirects */
537         if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
538                 c->dmaster = ctdb_lmaster(ctdb, &state->call.key);
539         }
540
541         /* send it off again */
542         state->node = ctdb->nodes[c->dmaster];
543         state->c->hdr.destnode = c->dmaster;
544
545         ctdb_queue_packet(ctdb, &state->c->hdr);
546 }
547
548 /*
549   destroy a ctdb_call
550 */
551 static int ctdb_call_destructor(struct ctdb_call_state *state)
552 {
553         idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
554         return 0;
555 }
556
557
558 /*
559   called when a ctdb_call times out
560 */
561 void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, 
562                        struct timeval t, void *private_data)
563 {
564         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
565         state->state = CTDB_CALL_ERROR;
566         ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
567                        state->c->hdr.reqid);
568         if (state->async.fn) {
569                 state->async.fn(state);
570         }
571 }
572
573 /*
574   this allows the caller to setup a async.fn 
575 */
576 static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
577                        struct timeval t, void *private_data)
578 {
579         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
580         if (state->async.fn) {
581                 state->async.fn(state);
582         }
583 }       
584
585
586 /*
587   construct an event driven local ctdb_call
588
589   this is used so that locally processed ctdb_call requests are processed
590   in an event driven manner
591 */
592 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
593                                              struct ctdb_call *call,
594                                              struct ctdb_ltdb_header *header,
595                                              TDB_DATA *data)
596 {
597         struct ctdb_call_state *state;
598         struct ctdb_context *ctdb = ctdb_db->ctdb;
599         int ret;
600
601         state = talloc_zero(ctdb_db, struct ctdb_call_state);
602         CTDB_NO_MEMORY_NULL(ctdb, state);
603
604         talloc_steal(state, data->dptr);
605
606         state->state = CTDB_CALL_DONE;
607         state->node = ctdb->nodes[ctdb->vnn];
608         state->call = *call;
609         state->ctdb_db = ctdb_db;
610
611         ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
612         talloc_steal(state, state->call.reply_data.dptr);
613
614         event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
615
616         return state;
617 }
618
619
620 /*
621   make a remote ctdb call - async send. Called in daemon context.
622
623   This constructs a ctdb_call request and queues it for processing. 
624   This call never blocks.
625 */
626 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
627                                                      struct ctdb_call *call, 
628                                                      struct ctdb_ltdb_header *header)
629 {
630         uint32_t len;
631         struct ctdb_call_state *state;
632         struct ctdb_context *ctdb = ctdb_db->ctdb;
633
634         state = talloc_zero(ctdb_db, struct ctdb_call_state);
635         CTDB_NO_MEMORY_NULL(ctdb, state);
636
637         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
638         state->c = ctdb->methods->allocate_pkt(state, len);
639         CTDB_NO_MEMORY_NULL(ctdb, state->c);
640         talloc_set_name_const(state->c, "req_call packet");
641
642         state->c->hdr.length    = len;
643         state->c->hdr.ctdb_magic = CTDB_MAGIC;
644         state->c->hdr.ctdb_version = CTDB_VERSION;
645         state->c->hdr.operation = CTDB_REQ_CALL;
646         state->c->hdr.destnode  = header->dmaster;
647         state->c->hdr.srcnode   = ctdb->vnn;
648         /* this limits us to 16k outstanding messages - not unreasonable */
649         state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
650         state->c->flags         = call->flags;
651         state->c->db_id         = ctdb_db->db_id;
652         state->c->callid        = call->call_id;
653         state->c->keylen        = call->key.dsize;
654         state->c->calldatalen   = call->call_data.dsize;
655         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
656         memcpy(&state->c->data[call->key.dsize], 
657                call->call_data.dptr, call->call_data.dsize);
658         state->call                = *call;
659         state->call.call_data.dptr = &state->c->data[call->key.dsize];
660         state->call.key.dptr       = &state->c->data[0];
661
662         state->node   = ctdb->nodes[header->dmaster];
663         state->state  = CTDB_CALL_WAIT;
664         state->header = *header;
665         state->ctdb_db = ctdb_db;
666
667         talloc_set_destructor(state, ctdb_call_destructor);
668
669         ctdb_queue_packet(ctdb, &state->c->hdr);
670
671         event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
672                         ctdb_call_timeout, state);
673         return state;
674 }
675
676 /*
677   make a remote ctdb call - async recv - called in daemon context
678
679   This is called when the program wants to wait for a ctdb_call to complete and get the 
680   results. This call will block unless the call has already completed.
681 */
682 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
683 {
684         while (state->state < CTDB_CALL_DONE) {
685                 event_loop_once(state->node->ctdb->ev);
686         }
687         if (state->state != CTDB_CALL_DONE) {
688                 ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
689                 talloc_free(state);
690                 return -1;
691         }
692
693         if (state->call.reply_data.dsize) {
694                 call->reply_data.dptr = talloc_memdup(state->node->ctdb,
695                                                       state->call.reply_data.dptr,
696                                                       state->call.reply_data.dsize);
697                 call->reply_data.dsize = state->call.reply_data.dsize;
698         } else {
699                 call->reply_data.dptr = NULL;
700                 call->reply_data.dsize = 0;
701         }
702         call->status = state->call.status;
703         talloc_free(state);
704         return 0;
705 }
706
707