make srvid 64 bits instead of 32 bits
[metze/ctdb/wip.git] / common / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This library is free software; you can redistribute it and/or
8    modify it under the terms of the GNU Lesser General Public
9    License as published by the Free Software Foundation; either
10    version 2 of the License, or (at your option) any later version.
11
12    This library is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15    Lesser General Public License for more details.
16
17    You should have received a copy of the GNU Lesser General Public
18    License along with this library; if not, write to the Free Software
19    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20 */
21
22 #include "includes.h"
23 #include "db_wrap.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   queue a packet for sending from client to daemon
34 */
35 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
36 {
37         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
38 }
39
40
41 /*
42   handle a connect wait reply packet
43  */
44 static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, 
45                                     struct ctdb_req_header *hdr)
46 {
47         struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
48         ctdb->vnn = r->vnn;
49         ctdb->num_connected = r->num_connected;
50 }
51
52 /*
53   state of a in-progress ctdb call in client
54 */
55 struct ctdb_client_call_state {
56         enum call_state state;
57         uint32_t reqid;
58         struct ctdb_db_context *ctdb_db;
59         struct ctdb_call call;
60 };
61
62 /*
63   called when a CTDB_REPLY_CALL packet comes in in the client
64
65   This packet comes in response to a CTDB_REQ_CALL request packet. It
66   contains any reply data from the call
67 */
68 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
69 {
70         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
71         struct ctdb_client_call_state *state;
72
73         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
74         if (state == NULL) {
75                 DEBUG(0,(__location__ " reqid %d not found\n", hdr->reqid));
76                 return;
77         }
78
79         if (hdr->reqid != state->reqid) {
80                 /* we found a record  but it was the wrong one */
81                 DEBUG(0, ("Dropped orphaned reply with reqid:%d\n",hdr->reqid));
82                 return;
83         }
84
85         state->call.reply_data.dptr = c->data;
86         state->call.reply_data.dsize = c->datalen;
87         state->call.status = c->status;
88
89         talloc_steal(state, c);
90
91         state->state = CTDB_CALL_DONE;
92 }
93
94 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
95
96 /*
97   this is called in the client, when data comes in from the daemon
98  */
99 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
100 {
101         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
102         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
103         TALLOC_CTX *tmp_ctx;
104
105         /* place the packet as a child of a tmp_ctx. We then use
106            talloc_free() below to free it. If any of the calls want
107            to keep it, then they will steal it somewhere else, and the
108            talloc_free() will be a no-op */
109         tmp_ctx = talloc_new(ctdb);
110         talloc_steal(tmp_ctx, hdr);
111
112         if (cnt == 0) {
113                 DEBUG(2,("Daemon has exited - shutting down client\n"));
114                 exit(0);
115         }
116
117         if (cnt < sizeof(*hdr)) {
118                 DEBUG(0,("Bad packet length %d in client\n", cnt));
119                 goto done;
120         }
121         if (cnt != hdr->length) {
122                 ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n", 
123                                hdr->length, cnt);
124                 goto done;
125         }
126
127         if (hdr->ctdb_magic != CTDB_MAGIC) {
128                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
129                 goto done;
130         }
131
132         if (hdr->ctdb_version != CTDB_VERSION) {
133                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
134                 goto done;
135         }
136
137         switch (hdr->operation) {
138         case CTDB_REPLY_CALL:
139                 ctdb_client_reply_call(ctdb, hdr);
140                 break;
141
142         case CTDB_REQ_MESSAGE:
143                 ctdb_request_message(ctdb, hdr);
144                 break;
145
146         case CTDB_REPLY_CONNECT_WAIT:
147                 ctdb_reply_connect_wait(ctdb, hdr);
148                 break;
149
150         case CTDB_REPLY_CONTROL:
151                 ctdb_client_reply_control(ctdb, hdr);
152                 break;
153
154         default:
155                 DEBUG(0,("bogus operation code:%d\n",hdr->operation));
156         }
157
158 done:
159         talloc_free(tmp_ctx);
160 }
161
162 /*
163   connect to a unix domain socket
164 */
165 int ctdb_socket_connect(struct ctdb_context *ctdb)
166 {
167         struct sockaddr_un addr;
168
169         memset(&addr, 0, sizeof(addr));
170         addr.sun_family = AF_UNIX;
171         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
172
173         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
174         if (ctdb->daemon.sd == -1) {
175                 return -1;
176         }
177         
178         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
179                 close(ctdb->daemon.sd);
180                 ctdb->daemon.sd = -1;
181                 return -1;
182         }
183
184         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
185                                               CTDB_DS_ALIGNMENT, 
186                                               ctdb_client_read_cb, ctdb);
187         return 0;
188 }
189
190
191 struct ctdb_record_handle {
192         struct ctdb_db_context *ctdb_db;
193         TDB_DATA key;
194         TDB_DATA *data;
195         struct ctdb_ltdb_header header;
196 };
197
198
199 /*
200   make a recv call to the local ctdb daemon - called from client context
201
202   This is called when the program wants to wait for a ctdb_call to complete and get the 
203   results. This call will block unless the call has already completed.
204 */
205 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
206 {
207         while (state->state < CTDB_CALL_DONE) {
208                 event_loop_once(state->ctdb_db->ctdb->ev);
209         }
210         if (state->state != CTDB_CALL_DONE) {
211                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
212                 talloc_free(state);
213                 return -1;
214         }
215
216         if (state->call.reply_data.dsize) {
217                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
218                                                       state->call.reply_data.dptr,
219                                                       state->call.reply_data.dsize);
220                 call->reply_data.dsize = state->call.reply_data.dsize;
221         } else {
222                 call->reply_data.dptr = NULL;
223                 call->reply_data.dsize = 0;
224         }
225         call->status = state->call.status;
226         talloc_free(state);
227
228         return 0;
229 }
230
231
232
233
234 /*
235   destroy a ctdb_call in client
236 */
237 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
238 {
239         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
240         return 0;
241 }
242
243 /*
244   construct an event driven local ctdb_call
245
246   this is used so that locally processed ctdb_call requests are processed
247   in an event driven manner
248 */
249 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
250                                                                   struct ctdb_call *call,
251                                                                   struct ctdb_ltdb_header *header,
252                                                                   TDB_DATA *data)
253 {
254         struct ctdb_client_call_state *state;
255         struct ctdb_context *ctdb = ctdb_db->ctdb;
256         int ret;
257
258         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
259         CTDB_NO_MEMORY_NULL(ctdb, state);
260
261         talloc_steal(state, data->dptr);
262
263         state->state = CTDB_CALL_DONE;
264         state->call = *call;
265         state->ctdb_db = ctdb_db;
266
267         ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
268         talloc_steal(state, state->call.reply_data.dptr);
269
270         return state;
271 }
272
273 /*
274   make a ctdb call to the local daemon - async send. Called from client context.
275
276   This constructs a ctdb_call request and queues it for processing. 
277   This call never blocks.
278 */
279 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
280                                        struct ctdb_call *call)
281 {
282         struct ctdb_client_call_state *state;
283         struct ctdb_context *ctdb = ctdb_db->ctdb;
284         struct ctdb_ltdb_header header;
285         TDB_DATA data;
286         int ret;
287         size_t len;
288         struct ctdb_req_call *c;
289
290         /* if the domain socket is not yet open, open it */
291         if (ctdb->daemon.sd==-1) {
292                 ctdb_socket_connect(ctdb);
293         }
294
295         ret = ctdb_ltdb_lock(ctdb_db, call->key);
296         if (ret != 0) {
297                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
298                 return NULL;
299         }
300
301         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
302         if (ret != 0) {
303                 ctdb_ltdb_unlock(ctdb_db, call->key);
304                 DEBUG(0,(__location__ " Failed to fetch record\n"));
305                 return NULL;
306         }
307
308         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
309                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
310                 talloc_free(data.dptr);
311                 ctdb_ltdb_unlock(ctdb_db, call->key);
312                 return state;
313         }
314
315         ctdb_ltdb_unlock(ctdb_db, call->key);
316         talloc_free(data.dptr);
317
318         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
319         if (state == NULL) {
320                 DEBUG(0, (__location__ " failed to allocate state\n"));
321                 return NULL;
322         }
323
324         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
325         c = ctdbd_allocate_pkt(state, len);
326         if (c == NULL) {
327                 DEBUG(0, (__location__ " failed to allocate packet\n"));
328                 return NULL;
329         }
330         talloc_set_name_const(c, "ctdb client req_call packet");
331         memset(c, 0, offsetof(struct ctdb_req_call, data));
332
333         c->hdr.length    = len;
334         c->hdr.ctdb_magic = CTDB_MAGIC;
335         c->hdr.ctdb_version = CTDB_VERSION;
336         c->hdr.operation = CTDB_REQ_CALL;
337         /* this limits us to 16k outstanding messages - not unreasonable */
338         c->hdr.reqid     = ctdb_reqid_new(ctdb, state);
339         c->flags         = call->flags;
340         c->db_id         = ctdb_db->db_id;
341         c->callid        = call->call_id;
342         c->keylen        = call->key.dsize;
343         c->calldatalen   = call->call_data.dsize;
344         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
345         memcpy(&c->data[call->key.dsize], 
346                call->call_data.dptr, call->call_data.dsize);
347         state->call                = *call;
348         state->call.call_data.dptr = &c->data[call->key.dsize];
349         state->call.key.dptr       = &c->data[0];
350
351         state->state  = CTDB_CALL_WAIT;
352         state->ctdb_db = ctdb_db;
353         state->reqid = c->hdr.reqid;
354
355         talloc_set_destructor(state, ctdb_client_call_destructor);
356
357         ctdb_client_queue_pkt(ctdb, &c->hdr);
358
359         return state;
360 }
361
362
363 /*
364   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
365 */
366 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
367 {
368         struct ctdb_client_call_state *state;
369
370         state = ctdb_call_send(ctdb_db, call);
371         return ctdb_call_recv(state, call);
372 }
373
374
375 /*
376   tell the daemon what messaging srvid we will use, and register the message
377   handler function in the client
378 */
379 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
380                              ctdb_message_fn_t handler,
381                              void *private_data)
382                                     
383 {
384         struct ctdb_req_register c;
385         int res;
386
387         /* if the domain socket is not yet open, open it */
388         if (ctdb->daemon.sd==-1) {
389                 ctdb_socket_connect(ctdb);
390         }
391
392         ZERO_STRUCT(c);
393
394         c.hdr.length       = sizeof(c);
395         c.hdr.ctdb_magic   = CTDB_MAGIC;
396         c.hdr.ctdb_version = CTDB_VERSION;
397         c.hdr.operation    = CTDB_REQ_REGISTER;
398         c.srvid            = srvid;
399
400         res = ctdb_client_queue_pkt(ctdb, &c.hdr);
401         if (res != 0) {
402                 return res;
403         }
404
405         /* also need to register the handler with our ctdb structure */
406         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
407 }
408
409
410 /*
411   send a message - from client context
412  */
413 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
414                       uint64_t srvid, TDB_DATA data)
415 {
416         struct ctdb_req_message *r;
417         int len, res;
418
419         len = offsetof(struct ctdb_req_message, data) + data.dsize;
420         r = ctdbd_allocate_pkt(ctdb, len);
421         CTDB_NO_MEMORY(ctdb, r);
422         talloc_set_name_const(r, "req_message packet");
423
424         r->hdr.length    = len;
425         r->hdr.ctdb_magic = CTDB_MAGIC;
426         r->hdr.ctdb_version = CTDB_VERSION;
427         r->hdr.operation = CTDB_REQ_MESSAGE;
428         r->hdr.destnode  = vnn;
429         r->hdr.srcnode   = ctdb->vnn;
430         r->hdr.reqid     = 0;
431         r->srvid         = srvid;
432         r->datalen       = data.dsize;
433         memcpy(&r->data[0], data.dptr, data.dsize);
434         
435         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
436         if (res != 0) {
437                 return res;
438         }
439
440         talloc_free(r);
441         return 0;
442 }
443
444 /*
445   wait for all nodes to be connected - from client
446  */
447 void ctdb_connect_wait(struct ctdb_context *ctdb)
448 {
449         struct ctdb_req_connect_wait r;
450         int res;
451
452         ZERO_STRUCT(r);
453
454         r.hdr.length     = sizeof(r);
455         r.hdr.ctdb_magic = CTDB_MAGIC;
456         r.hdr.ctdb_version = CTDB_VERSION;
457         r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
458
459         DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
460
461         /* if the domain socket is not yet open, open it */
462         if (ctdb->daemon.sd==-1) {
463                 ctdb_socket_connect(ctdb);
464         }
465         
466         res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
467         if (res != 0) {
468                 DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
469                 return;
470         }
471
472         DEBUG(3,("ctdb_connect_wait: waiting\n"));
473
474         /* now we can go into the normal wait routine, as the reply packet
475            will update the ctdb->num_connected variable */
476         ctdb_daemon_connect_wait(ctdb);
477
478         /* get other config variables */
479         ctdb_get_config(ctdb);
480 }
481
482 /*
483   cancel a ctdb_fetch_lock operation, releasing the lock
484  */
485 static int fetch_lock_destructor(struct ctdb_record_handle *h)
486 {
487         ctdb_ltdb_unlock(h->ctdb_db, h->key);
488         return 0;
489 }
490
491 /*
492   force the migration of a record to this node
493  */
494 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
495 {
496         struct ctdb_call call;
497         ZERO_STRUCT(call);
498         call.call_id = CTDB_NULL_FUNC;
499         call.key = key;
500         call.flags = CTDB_IMMEDIATE_MIGRATION;
501         return ctdb_call(ctdb_db, &call);
502 }
503
504 /*
505   get a lock on a record, and return the records data. Blocks until it gets the lock
506  */
507 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
508                                            TDB_DATA key, TDB_DATA *data)
509 {
510         int ret;
511         struct ctdb_record_handle *h;
512
513         /*
514           procedure is as follows:
515
516           1) get the chain lock. 
517           2) check if we are dmaster
518           3) if we are the dmaster then return handle 
519           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
520              reply from ctdbd
521           5) when we get the reply, goto (1)
522          */
523
524         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
525         if (h == NULL) {
526                 return NULL;
527         }
528
529         h->ctdb_db = ctdb_db;
530         h->key     = key;
531         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
532         if (h->key.dptr == NULL) {
533                 talloc_free(h);
534                 return NULL;
535         }
536         h->data    = data;
537
538         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize, 
539                  (const char *)key.dptr));
540
541 again:
542         /* step 1 - get the chain lock */
543         ret = ctdb_ltdb_lock(ctdb_db, key);
544         if (ret != 0) {
545                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
546                 talloc_free(h);
547                 return NULL;
548         }
549
550         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
551
552         talloc_set_destructor(h, fetch_lock_destructor);
553
554         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
555         if (ret != 0) {
556                 ctdb_ltdb_unlock(ctdb_db, key);
557                 talloc_free(h);
558                 return NULL;
559         }
560
561         /* when torturing, ensure we test the remote path */
562         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
563             random() % 5 == 0) {
564                 h->header.dmaster = (uint32_t)-1;
565         }
566
567
568         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
569
570         if (h->header.dmaster != ctdb_db->ctdb->vnn) {
571                 ctdb_ltdb_unlock(ctdb_db, key);
572                 ret = ctdb_client_force_migration(ctdb_db, key);
573                 if (ret != 0) {
574                         DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
575                         talloc_free(h);
576                         return NULL;
577                 }
578                 goto again;
579         }
580
581         DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
582         return h;
583 }
584
585 /*
586   store some data to the record that was locked with ctdb_fetch_lock()
587 */
588 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
589 {
590         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
591 }
592
593 /*
594   wait until we're the only node left.
595   this function never returns
596 */
597 void ctdb_shutdown(struct ctdb_context *ctdb)
598 {
599         struct ctdb_req_shutdown r;
600         int len;
601
602         /* if the domain socket is not yet open, open it */
603         if (ctdb->daemon.sd==-1) {
604                 ctdb_socket_connect(ctdb);
605         }
606
607         len = sizeof(struct ctdb_req_shutdown);
608         ZERO_STRUCT(r);
609         r.hdr.length       = len;
610         r.hdr.ctdb_magic   = CTDB_MAGIC;
611         r.hdr.ctdb_version = CTDB_VERSION;
612         r.hdr.operation    = CTDB_REQ_SHUTDOWN;
613         r.hdr.reqid        = 0;
614
615         ctdb_client_queue_pkt(ctdb, &(r.hdr));
616
617         /* this event loop will terminate once we receive the reply */
618         while (1) {
619                 event_loop_once(ctdb->ev);
620         }
621 }
622
623
624 struct ctdb_client_control_state {
625         uint32_t reqid;
626         int32_t status;
627         TDB_DATA outdata;
628         enum call_state state;
629 };
630
631 /*
632   called when a CTDB_REPLY_CONTROL packet comes in in the client
633
634   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
635   contains any reply data from the control
636 */
637 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
638                                       struct ctdb_req_header *hdr)
639 {
640         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
641         struct ctdb_client_control_state *state;
642
643         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
644         if (state == NULL) {
645                 DEBUG(0,(__location__ " reqid %d not found\n", hdr->reqid));
646                 return;
647         }
648
649         if (hdr->reqid != state->reqid) {
650                 /* we found a record  but it was the wrong one */
651                 DEBUG(0, ("Dropped orphaned reply control with reqid:%d\n",hdr->reqid));
652                 return;
653         }
654
655         state->outdata.dptr = c->data;
656         state->outdata.dsize = c->datalen;
657         state->status = c->status;
658
659         talloc_steal(state, c);
660
661         state->state = CTDB_CALL_DONE;
662 }
663
664
665 /*
666   send a ctdb control message
667  */
668 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
669                  uint32_t opcode, TDB_DATA data, 
670                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status)
671 {
672         struct ctdb_client_control_state *state;
673         struct ctdb_req_control *c;
674         size_t len;
675         int ret;
676
677         /* if the domain socket is not yet open, open it */
678         if (ctdb->daemon.sd==-1) {
679                 ctdb_socket_connect(ctdb);
680         }
681
682         state = talloc_zero(ctdb, struct ctdb_client_control_state);
683         CTDB_NO_MEMORY(ctdb, state);
684
685         state->reqid = ctdb_reqid_new(ctdb, state);
686         state->state = CTDB_CALL_WAIT;
687
688         len = offsetof(struct ctdb_req_control, data) + data.dsize;
689         c = ctdbd_allocate_pkt(state, len);
690         
691         memset(c, 0, len);
692         c->hdr.length       = len;
693         c->hdr.ctdb_magic   = CTDB_MAGIC;
694         c->hdr.ctdb_version = CTDB_VERSION;
695         c->hdr.operation    = CTDB_REQ_CONTROL;
696         c->hdr.reqid        = state->reqid;
697         c->hdr.destnode     = destnode;
698         c->hdr.srcnode      = ctdb->vnn;
699         c->hdr.reqid        = state->reqid;
700         c->opcode           = opcode;
701         c->srvid            = srvid;
702         c->datalen          = data.dsize;
703         if (data.dsize) {
704                 memcpy(&c->data[0], data.dptr, data.dsize);
705         }
706
707         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
708         if (ret != 0) {
709                 talloc_free(state);
710                 return -1;
711         }
712
713         /* semi-async operation */
714         while (state->state == CTDB_CALL_WAIT) {
715                 event_loop_once(ctdb->ev);
716         }
717
718         if (outdata) {
719                 *outdata = state->outdata;
720                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
721         }
722
723         *status = state->status;
724
725         talloc_free(state);
726
727         return 0;       
728 }
729
730
731
732 /*
733   a process exists call. Returns 0 if process exists, -1 otherwise
734  */
735 int ctdb_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
736 {
737         int ret;
738         TDB_DATA data;
739         int32_t status;
740
741         data.dptr = (uint8_t*)&pid;
742         data.dsize = sizeof(pid);
743
744         ret = ctdb_control(ctdb, destnode, 0, 
745                            CTDB_CONTROL_PROCESS_EXISTS, data, 
746                            NULL, NULL, &status);
747         if (ret != 0) {
748                 DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
749                 return -1;
750         }
751
752         return status;
753 }
754
755 /*
756   get remote status
757  */
758 int ctdb_status(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_status *status)
759 {
760         int ret;
761         TDB_DATA data;
762         int32_t res;
763
764         ZERO_STRUCT(data);
765         ret = ctdb_control(ctdb, destnode, 0, 
766                            CTDB_CONTROL_STATUS, data, 
767                            ctdb, &data, &res);
768         if (ret != 0 || res != 0) {
769                 DEBUG(0,(__location__ " ctdb_control for status failed\n"));
770                 return -1;
771         }
772
773         if (data.dsize != sizeof(struct ctdb_status)) {
774                 DEBUG(0,(__location__ " Wrong status size %u - expected %u\n",
775                          data.dsize, sizeof(struct ctdb_status)));
776                       return -1;
777         }
778
779         *status = *(struct ctdb_status *)data.dptr;
780         talloc_free(data.dptr);
781                         
782         return 0;
783 }
784
785 /*
786   get vnn map from a remote node
787  */
788 int ctdb_getvnnmap(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_vnn_map *vnnmap)
789 {
790         int ret;
791         TDB_DATA data, outdata;
792         int32_t i, res;
793
794         ZERO_STRUCT(data);
795         ret = ctdb_control(ctdb, destnode, 0, 
796                            CTDB_CONTROL_GETVNNMAP, data, 
797                            ctdb, &outdata, &res);
798         if (ret != 0 || res != 0) {
799                 DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
800                 return -1;
801         }
802
803         vnnmap->generation = ((uint32_t *)outdata.dptr)[0];
804         vnnmap->size = ((uint32_t *)outdata.dptr)[1];
805         if (vnnmap->map) {
806                 talloc_free(vnnmap->map);
807                 vnnmap->map = NULL;
808         }
809         vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
810         for (i=0;i<vnnmap->size;i++) {
811                 vnnmap->map[i] = ((uint32_t *)outdata.dptr)[i+2];
812         }
813                     
814         return 0;
815 }
816
817
818 /*
819   set vnn map on a node
820  */
821 int ctdb_setvnnmap(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_vnn_map *vnnmap)
822 {
823         int ret;
824         TDB_DATA *data, outdata;
825         int32_t i, res;
826
827         data = talloc_zero(ctdb, TDB_DATA);
828         data->dsize = (vnnmap->size+2)*sizeof(uint32_t);
829         data->dptr = (unsigned char *)talloc_array(data, uint32_t, vnnmap->size+2);
830
831         ((uint32_t *)&data->dptr[0])[0] = vnnmap->generation;
832         ((uint32_t *)&data->dptr[0])[1] = vnnmap->size;
833         for (i=0;i<vnnmap->size;i++) {
834                 ((uint32_t *)&data->dptr[0])[i+2] = vnnmap->map[i];
835         }
836
837         ret = ctdb_control(ctdb, destnode, 0, 
838                            CTDB_CONTROL_SETVNNMAP, *data, 
839                            ctdb, &outdata, &res);
840         if (ret != 0 || res != 0) {
841                 DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
842                 return -1;
843         }
844
845         talloc_free(data);                  
846         return 0;
847 }
848
849 /*
850   ping a node
851  */
852 int ctdb_ping(struct ctdb_context *ctdb, uint32_t destnode)
853 {
854         int ret;
855         int32_t res;
856         TDB_DATA data;
857
858         ZERO_STRUCT(data);
859         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, data, NULL, NULL, &res);
860         if (ret != 0 || res != 0) {
861                 return -1;
862         }
863         return 0;
864 }
865
866 /*
867   get ctdb config
868  */
869 int ctdb_get_config(struct ctdb_context *ctdb)
870 {
871         int ret;
872         int32_t res;
873         TDB_DATA data;
874         struct ctdb_context c;
875
876         ZERO_STRUCT(data);
877         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CONFIG, data, 
878                            ctdb, &data, &res);
879         if (ret != 0 || res != 0) {
880                 return -1;
881         }
882         if (data.dsize != sizeof(c)) {
883                 DEBUG(0,("Bad config size %u - expected %u\n", data.dsize, sizeof(c)));
884                 return -1;
885         }
886
887         c = *(struct ctdb_context *)data.dptr;
888         talloc_free(data.dptr);
889
890         ctdb->num_nodes = c.num_nodes;
891         ctdb->num_connected = c.num_connected;
892         ctdb->vnn = c.vnn;
893         ctdb->max_lacount = c.max_lacount;
894         
895         return 0;
896 }
897
898 /*
899   find the real path to a ltdb 
900  */
901 int ctdb_getdbpath(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
902                    const char **path)
903 {
904         int ret;
905         int32_t res;
906         TDB_DATA data;
907
908         data.dptr = (uint8_t *)&ctdb_db->db_id;
909         data.dsize = sizeof(ctdb_db->db_id);
910
911         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
912                            CTDB_CONTROL_GETDBPATH, data, 
913                            ctdb_db, &data, &res);
914         if (ret != 0 || res != 0) {
915                 return -1;
916         }
917
918         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
919         if ((*path) == NULL) {
920                 return -1;
921         }
922
923         talloc_free(data.dptr);
924
925         return 0;
926 }
927
928 /*
929   get debug level on a node
930  */
931 int ctdb_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *level)
932 {
933         int ret;
934         int32_t res;
935         TDB_DATA data;
936
937         ZERO_STRUCT(data);
938         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, data, 
939                            ctdb, &data, &res);
940         if (ret != 0 || res != 0) {
941                 return -1;
942         }
943         if (data.dsize != sizeof(uint32_t)) {
944                 DEBUG(0,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
945                               data.dsize));
946                 return -1;
947         }
948         *level = *(uint32_t *)data.dptr;
949         talloc_free(data.dptr);
950         return 0;
951 }
952
953 /*
954   set debug level on a node
955  */
956 int ctdb_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t level)
957 {
958         int ret;
959         int32_t res;
960         TDB_DATA data;
961
962         data.dptr = (uint8_t *)&level;
963         data.dsize = sizeof(level);
964
965         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, data, 
966                            NULL, NULL, &res);
967         if (ret != 0 || res != 0) {
968                 return -1;
969         }
970         return 0;
971 }