r25027: Fix more warnings.
[jelmer/samba4-debian.git] / source / cluster / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
30
31 /*
32   allocate a packet for use in client<->daemon communication
33  */
34 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
35                                             TALLOC_CTX *mem_ctx, 
36                                             enum ctdb_operation operation, 
37                                             size_t length, size_t slength,
38                                             const char *type)
39 {
40         int size;
41         struct ctdb_req_header *hdr;
42
43         length = MAX(length, slength);
44         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
45
46         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
47         if (hdr == NULL) {
48                 DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
49                          operation, (unsigned)length));
50                 return NULL;
51         }
52         talloc_set_name_const(hdr, type);
53         memset(hdr, 0, slength);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_VERSION;
58         hdr->srcnode      = ctdb->vnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, uint32_t caller)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = (unsigned char *)talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88
89         for (fn=ctdb_db->calls;fn;fn=fn->next) {
90                 if (fn->id == call->call_id) break;
91         }
92         if (fn == NULL) {
93                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
94                 talloc_free(c);
95                 return -1;
96         }
97
98         if (fn->fn(c) != 0) {
99                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
100                 talloc_free(c);
101                 return -1;
102         }
103
104         if (header->laccessor != caller) {
105                 header->lacount = 0;
106         }
107         header->laccessor = caller;
108         header->lacount++;
109
110         /* we need to force the record to be written out if this was a remote access,
111            so that the lacount is updated */
112         if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
113                 c->new_data = &c->record_data;
114         }
115
116         if (c->new_data) {
117                 /* XXX check that we always have the lock here? */
118                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
119                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
120                         talloc_free(c);
121                         return -1;
122                 }
123         }
124
125         if (c->reply_data) {
126                 call->reply_data = *c->reply_data;
127                 talloc_steal(ctdb, call->reply_data.dptr);
128                 talloc_set_name_const(call->reply_data.dptr, __location__);
129         } else {
130                 call->reply_data.dptr = NULL;
131                 call->reply_data.dsize = 0;
132         }
133         call->status = c->status;
134
135         talloc_free(c);
136
137         return 0;
138 }
139
140
141 /*
142   queue a packet for sending from client to daemon
143 */
144 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
145 {
146         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
147 }
148
149
150 /*
151   state of a in-progress ctdb call in client
152 */
153 struct ctdb_client_call_state {
154         enum call_state state;
155         uint32_t reqid;
156         struct ctdb_db_context *ctdb_db;
157         struct ctdb_call call;
158 };
159
160 /*
161   called when a CTDB_REPLY_CALL packet comes in in the client
162
163   This packet comes in response to a CTDB_REQ_CALL request packet. It
164   contains any reply data from the call
165 */
166 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
167 {
168         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
169         struct ctdb_client_call_state *state;
170
171         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
172         if (state == NULL) {
173                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
174                 return;
175         }
176
177         if (hdr->reqid != state->reqid) {
178                 /* we found a record  but it was the wrong one */
179                 DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
180                 return;
181         }
182
183         state->call.reply_data.dptr = c->data;
184         state->call.reply_data.dsize = c->datalen;
185         state->call.status = c->status;
186
187         talloc_steal(state, c);
188
189         state->state = CTDB_CALL_DONE;
190 }
191
192 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
193
194 /*
195   this is called in the client, when data comes in from the daemon
196  */
197 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
198 {
199         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
200         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
201         TALLOC_CTX *tmp_ctx;
202
203         /* place the packet as a child of a tmp_ctx. We then use
204            talloc_free() below to free it. If any of the calls want
205            to keep it, then they will steal it somewhere else, and the
206            talloc_free() will be a no-op */
207         tmp_ctx = talloc_new(ctdb);
208         talloc_steal(tmp_ctx, hdr);
209
210         if (cnt == 0) {
211                 DEBUG(2,("Daemon has exited - shutting down client\n"));
212                 exit(0);
213         }
214
215         if (cnt < sizeof(*hdr)) {
216                 DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt));
217                 goto done;
218         }
219         if (cnt != hdr->length) {
220                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
221                                (unsigned)hdr->length, (unsigned)cnt);
222                 goto done;
223         }
224
225         if (hdr->ctdb_magic != CTDB_MAGIC) {
226                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
227                 goto done;
228         }
229
230         if (hdr->ctdb_version != CTDB_VERSION) {
231                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
232                 goto done;
233         }
234
235         switch (hdr->operation) {
236         case CTDB_REPLY_CALL:
237                 ctdb_client_reply_call(ctdb, hdr);
238                 break;
239
240         case CTDB_REQ_MESSAGE:
241                 ctdb_request_message(ctdb, hdr);
242                 break;
243
244         case CTDB_REPLY_CONTROL:
245                 ctdb_client_reply_control(ctdb, hdr);
246                 break;
247
248         default:
249                 DEBUG(0,("bogus operation code:%u\n",hdr->operation));
250         }
251
252 done:
253         talloc_free(tmp_ctx);
254 }
255
256 /*
257   connect to a unix domain socket
258 */
259 int ctdb_socket_connect(struct ctdb_context *ctdb)
260 {
261         struct sockaddr_un addr;
262
263         memset(&addr, 0, sizeof(addr));
264         addr.sun_family = AF_UNIX;
265         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
266
267         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
268         if (ctdb->daemon.sd == -1) {
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 return -1;
279         }
280
281         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
282                                               CTDB_DS_ALIGNMENT, 
283                                               ctdb_client_read_cb, ctdb);
284         return 0;
285 }
286
287
288 struct ctdb_record_handle {
289         struct ctdb_db_context *ctdb_db;
290         TDB_DATA key;
291         TDB_DATA *data;
292         struct ctdb_ltdb_header header;
293 };
294
295
296 /*
297   make a recv call to the local ctdb daemon - called from client context
298
299   This is called when the program wants to wait for a ctdb_call to complete and get the 
300   results. This call will block unless the call has already completed.
301 */
302 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
303 {
304         while (state->state < CTDB_CALL_DONE) {
305                 event_loop_once(state->ctdb_db->ctdb->ev);
306         }
307         if (state->state != CTDB_CALL_DONE) {
308                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
309                 talloc_free(state);
310                 return -1;
311         }
312
313         if (state->call.reply_data.dsize) {
314                 call->reply_data.dptr = (unsigned char *)talloc_memdup(
315                                         state->ctdb_db,
316                                         state->call.reply_data.dptr,
317                                         state->call.reply_data.dsize);
318                 call->reply_data.dsize = state->call.reply_data.dsize;
319         } else {
320                 call->reply_data.dptr = NULL;
321                 call->reply_data.dsize = 0;
322         }
323         call->status = state->call.status;
324         talloc_free(state);
325
326         return 0;
327 }
328
329
330
331
332 /*
333   destroy a ctdb_call in client
334 */
335 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
336 {
337         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
338         return 0;
339 }
340
341 /*
342   construct an event driven local ctdb_call
343
344   this is used so that locally processed ctdb_call requests are processed
345   in an event driven manner
346 */
347 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
348                                                                   struct ctdb_call *call,
349                                                                   struct ctdb_ltdb_header *header,
350                                                                   TDB_DATA *data)
351 {
352         struct ctdb_client_call_state *state;
353         struct ctdb_context *ctdb = ctdb_db->ctdb;
354         int ret;
355
356         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
357         CTDB_NO_MEMORY_NULL(ctdb, state);
358
359         talloc_steal(state, data->dptr);
360
361         state->state = CTDB_CALL_DONE;
362         state->call = *call;
363         state->ctdb_db = ctdb_db;
364
365         ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn);
366
367         return state;
368 }
369
370 /*
371   make a ctdb call to the local daemon - async send. Called from client context.
372
373   This constructs a ctdb_call request and queues it for processing. 
374   This call never blocks.
375 */
376 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
377                                        struct ctdb_call *call)
378 {
379         struct ctdb_client_call_state *state;
380         struct ctdb_context *ctdb = ctdb_db->ctdb;
381         struct ctdb_ltdb_header header;
382         TDB_DATA data;
383         int ret;
384         size_t len;
385         struct ctdb_req_call *c;
386
387         /* if the domain socket is not yet open, open it */
388         if (ctdb->daemon.sd==-1) {
389                 ctdb_socket_connect(ctdb);
390         }
391
392         ret = ctdb_ltdb_lock(ctdb_db, call->key);
393         if (ret != 0) {
394                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
395                 return NULL;
396         }
397
398         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
399
400         if (ret == 0 && header.dmaster == ctdb->vnn) {
401                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
402                 talloc_free(data.dptr);
403                 ctdb_ltdb_unlock(ctdb_db, call->key);
404                 return state;
405         }
406
407         ctdb_ltdb_unlock(ctdb_db, call->key);
408         talloc_free(data.dptr);
409
410         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
411         if (state == NULL) {
412                 DEBUG(0, (__location__ " failed to allocate state\n"));
413                 return NULL;
414         }
415
416         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
417         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
418         if (c == NULL) {
419                 DEBUG(0, (__location__ " failed to allocate packet\n"));
420                 return NULL;
421         }
422
423         state->reqid     = ctdb_reqid_new(ctdb, state);
424         state->ctdb_db = ctdb_db;
425         talloc_set_destructor(state, ctdb_client_call_destructor);
426
427         c->hdr.reqid     = state->reqid;
428         c->flags         = call->flags;
429         c->db_id         = ctdb_db->db_id;
430         c->callid        = call->call_id;
431         c->hopcount      = 0;
432         c->keylen        = call->key.dsize;
433         c->calldatalen   = call->call_data.dsize;
434         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
435         memcpy(&c->data[call->key.dsize], 
436                call->call_data.dptr, call->call_data.dsize);
437         state->call                = *call;
438         state->call.call_data.dptr = &c->data[call->key.dsize];
439         state->call.key.dptr       = &c->data[0];
440
441         state->state  = CTDB_CALL_WAIT;
442
443
444         ctdb_client_queue_pkt(ctdb, &c->hdr);
445
446         return state;
447 }
448
449
450 /*
451   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
452 */
453 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
454 {
455         struct ctdb_client_call_state *state;
456
457         state = ctdb_call_send(ctdb_db, call);
458         return ctdb_call_recv(state, call);
459 }
460
461
462 /*
463   tell the daemon what messaging srvid we will use, and register the message
464   handler function in the client
465 */
466 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
467                              ctdb_message_fn_t handler,
468                              void *private_data)
469                                     
470 {
471         int res;
472         int32_t status;
473         
474         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
475                            tdb_null, NULL, NULL, &status, NULL, NULL);
476         if (res != 0 || status != 0) {
477                 DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));
478                 return -1;
479         }
480
481         /* also need to register the handler with our own ctdb structure */
482         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
483 }
484
485 /*
486   tell the daemon we no longer want a srvid
487 */
488 static int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
489 {
490         int res;
491         int32_t status;
492         
493         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
494                            tdb_null, NULL, NULL, &status, NULL, NULL);
495         if (res != 0 || status != 0) {
496                 DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
497                 return -1;
498         }
499
500         /* also need to register the handler with our own ctdb structure */
501         ctdb_deregister_message_handler(ctdb, srvid, private_data);
502         return 0;
503 }
504
505
506 /*
507   send a message - from client context
508  */
509 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
510                       uint64_t srvid, TDB_DATA data)
511 {
512         struct ctdb_req_message *r;
513         int len, res;
514
515         len = offsetof(struct ctdb_req_message, data) + data.dsize;
516         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
517                                len, struct ctdb_req_message);
518         CTDB_NO_MEMORY(ctdb, r);
519
520         r->hdr.destnode  = vnn;
521         r->srvid         = srvid;
522         r->datalen       = data.dsize;
523         memcpy(&r->data[0], data.dptr, data.dsize);
524         
525         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
526         if (res != 0) {
527                 return res;
528         }
529
530         talloc_free(r);
531         return 0;
532 }
533
534
535 /*
536   cancel a ctdb_fetch_lock operation, releasing the lock
537  */
538 static int fetch_lock_destructor(struct ctdb_record_handle *h)
539 {
540         ctdb_ltdb_unlock(h->ctdb_db, h->key);
541         return 0;
542 }
543
544 /*
545   force the migration of a record to this node
546  */
547 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
548 {
549         struct ctdb_call call;
550         ZERO_STRUCT(call);
551         call.call_id = CTDB_NULL_FUNC;
552         call.key = key;
553         call.flags = CTDB_IMMEDIATE_MIGRATION;
554         return ctdb_call(ctdb_db, &call);
555 }
556
557 /*
558   get a lock on a record, and return the records data. Blocks until it gets the lock
559  */
560 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
561                                            TDB_DATA key, TDB_DATA *data)
562 {
563         int ret;
564         struct ctdb_record_handle *h;
565
566         /*
567           procedure is as follows:
568
569           1) get the chain lock. 
570           2) check if we are dmaster
571           3) if we are the dmaster then return handle 
572           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
573              reply from ctdbd
574           5) when we get the reply, goto (1)
575          */
576
577         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
578         if (h == NULL) {
579                 return NULL;
580         }
581
582         h->ctdb_db = ctdb_db;
583         h->key     = key;
584         h->key.dptr = (unsigned char *)talloc_memdup(h, key.dptr, key.dsize);
585         if (h->key.dptr == NULL) {
586                 talloc_free(h);
587                 return NULL;
588         }
589         h->data    = data;
590
591         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
592                  (const char *)key.dptr));
593
594 again:
595         /* step 1 - get the chain lock */
596         ret = ctdb_ltdb_lock(ctdb_db, key);
597         if (ret != 0) {
598                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
599                 talloc_free(h);
600                 return NULL;
601         }
602
603         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
604
605         talloc_set_destructor(h, fetch_lock_destructor);
606
607         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
608
609         /* when torturing, ensure we test the remote path */
610         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
611             random() % 5 == 0) {
612                 h->header.dmaster = (uint32_t)-1;
613         }
614
615
616         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
617
618         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->vnn) {
619                 ctdb_ltdb_unlock(ctdb_db, key);
620                 ret = ctdb_client_force_migration(ctdb_db, key);
621                 if (ret != 0) {
622                         DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
623                         talloc_free(h);
624                         return NULL;
625                 }
626                 goto again;
627         }
628
629         DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
630         return h;
631 }
632
633 /*
634   store some data to the record that was locked with ctdb_fetch_lock()
635 */
636 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
637 {
638         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
639 }
640
641 /*
642   non-locking fetch of a record
643  */
644 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
645                TDB_DATA key, TDB_DATA *data)
646 {
647         struct ctdb_call call;
648         int ret;
649
650         call.call_id = CTDB_FETCH_FUNC;
651         call.call_data.dptr = NULL;
652         call.call_data.dsize = 0;
653
654         ret = ctdb_call(ctdb_db, &call);
655
656         if (ret == 0) {
657                 *data = call.reply_data;
658                 talloc_steal(mem_ctx, data->dptr);
659         }
660
661         return ret;
662 }
663
664
665 struct ctdb_client_control_state {
666         struct ctdb_context *ctdb;
667         uint32_t reqid;
668         int32_t status;
669         TDB_DATA outdata;
670         enum call_state state;
671         char *errormsg;
672 };
673
674 /*
675   called when a CTDB_REPLY_CONTROL packet comes in in the client
676
677   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
678   contains any reply data from the control
679 */
680 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
681                                       struct ctdb_req_header *hdr)
682 {
683         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
684         struct ctdb_client_control_state *state;
685
686         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
687         if (state == NULL) {
688                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
689                 return;
690         }
691
692         if (hdr->reqid != state->reqid) {
693                 /* we found a record  but it was the wrong one */
694                 DEBUG(0, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
695                 return;
696         }
697
698         state->outdata.dptr = c->data;
699         state->outdata.dsize = c->datalen;
700         state->status = c->status;
701         if (c->errorlen) {
702                 state->errormsg = talloc_strndup(state, 
703                                                  (char *)&c->data[c->datalen], 
704                                                  c->errorlen);
705         }
706
707         talloc_steal(state, c);
708
709         state->state = CTDB_CALL_DONE;
710 }
711
712
713 /* time out handler for ctdb_control */
714 static void timeout_func(struct event_context *ev, struct timed_event *te, 
715         struct timeval t, void *private_data)
716 {
717         uint32_t *timed_out = (uint32_t *)private_data;
718
719         *timed_out = 1;
720 }
721
722 /*
723   destroy a ctdb_control in client
724 */
725 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
726 {
727         ctdb_reqid_remove(state->ctdb, state->reqid);
728         return 0;
729 }
730
731 /*
732   send a ctdb control message
733   timeout specifies how long we should wait for a reply.
734   if timeout is NULL we wait indefinitely
735  */
736 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
737                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
738                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
739                  struct timeval *timeout,
740                  char **errormsg)
741 {
742         struct ctdb_client_control_state *state;
743         struct ctdb_req_control *c;
744         size_t len;
745         int ret;
746         uint32_t timed_out;
747
748         if (errormsg) {
749                 *errormsg = NULL;
750         }
751
752         /* if the domain socket is not yet open, open it */
753         if (ctdb->daemon.sd==-1) {
754                 ctdb_socket_connect(ctdb);
755         }
756
757         state = talloc_zero(ctdb, struct ctdb_client_control_state);
758         CTDB_NO_MEMORY(ctdb, state);
759
760         state->ctdb  = ctdb;
761         state->reqid = ctdb_reqid_new(ctdb, state);
762         state->state = CTDB_CALL_WAIT;
763         state->errormsg = NULL;
764
765         talloc_set_destructor(state, ctdb_control_destructor);
766
767         len = offsetof(struct ctdb_req_control, data) + data.dsize;
768         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
769                                len, struct ctdb_req_control);
770         CTDB_NO_MEMORY(ctdb, c);
771         
772         c->hdr.reqid        = state->reqid;
773         c->hdr.destnode     = destnode;
774         c->hdr.reqid        = state->reqid;
775         c->opcode           = opcode;
776         c->client_id        = 0;
777         c->flags            = flags;
778         c->srvid            = srvid;
779         c->datalen          = data.dsize;
780         if (data.dsize) {
781                 memcpy(&c->data[0], data.dptr, data.dsize);
782         }
783
784         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
785         if (ret != 0) {
786                 talloc_free(state);
787                 return -1;
788         }
789
790         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
791                 talloc_free(state);
792                 return 0;
793         }
794
795         /* semi-async operation */
796         timed_out = 0;
797         if (timeout && !timeval_is_zero(timeout)) {
798                 event_add_timed(ctdb->ev, state, *timeout, timeout_func, &timed_out);
799         }
800         while ((state->state == CTDB_CALL_WAIT)
801         &&      (timed_out == 0) ){
802                 event_loop_once(ctdb->ev);
803         }
804         if (timed_out) {
805                 talloc_free(state);
806                 if (errormsg) {
807                         (*errormsg) = talloc_strdup(mem_ctx, "control timed out");
808                 } else {
809                         DEBUG(0,("ctdb_control timed out\n"));
810                 }
811                 return -1;
812         }
813
814         if (outdata) {
815                 *outdata = state->outdata;
816                 outdata->dptr = (unsigned char *)talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
817         }
818
819         *status = state->status;
820
821         if (!errormsg && state->errormsg) {
822                 DEBUG(0,("ctdb_control error: '%s'\n", state->errormsg));
823         }
824
825         if (errormsg && state->errormsg) {
826                 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
827         }
828
829         talloc_free(state);
830
831         return 0;       
832 }
833
834
835
836 /*
837   a process exists call. Returns 0 if process exists, -1 otherwise
838  */
839 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
840 {
841         int ret;
842         TDB_DATA data;
843         int32_t status;
844
845         data.dptr = (uint8_t*)&pid;
846         data.dsize = sizeof(pid);
847
848         ret = ctdb_control(ctdb, destnode, 0, 
849                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
850                            NULL, NULL, &status, NULL, NULL);
851         if (ret != 0) {
852                 DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
853                 return -1;
854         }
855
856         return status;
857 }
858
859 /*
860   get remote statistics
861  */
862 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
863 {
864         int ret;
865         TDB_DATA data;
866         int32_t res;
867
868         ret = ctdb_control(ctdb, destnode, 0, 
869                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
870                            ctdb, &data, &res, NULL, NULL);
871         if (ret != 0 || res != 0) {
872                 DEBUG(0,(__location__ " ctdb_control for statistics failed\n"));
873                 return -1;
874         }
875
876         if (data.dsize != sizeof(struct ctdb_statistics)) {
877                 DEBUG(0,(__location__ " Wrong statistics size %u - expected %u\n",
878                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
879                       return -1;
880         }
881
882         *status = *(struct ctdb_statistics *)data.dptr;
883         talloc_free(data.dptr);
884                         
885         return 0;
886 }
887
888 /*
889   shutdown a remote ctdb node
890  */
891 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
892 {
893         int ret;
894         int32_t res;
895
896         ret = ctdb_control(ctdb, destnode, 0, 
897                            CTDB_CONTROL_SHUTDOWN, CTDB_CTRL_FLAG_NOREPLY, tdb_null, 
898                            NULL, NULL, &res, &timeout, NULL);
899         if (ret != 0) {
900                 DEBUG(0,(__location__ " ctdb_control for shutdown failed\n"));
901                 return -1;
902         }
903
904         return 0;
905 }
906
907 /*
908   get vnn map from a remote node
909  */
910 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
911 {
912         int ret;
913         TDB_DATA outdata;
914         int32_t res;
915         struct ctdb_vnn_map_wire *map;
916
917         ret = ctdb_control(ctdb, destnode, 0, 
918                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
919                            mem_ctx, &outdata, &res, &timeout, NULL);
920         if (ret != 0 || res != 0) {
921                 DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
922                 return -1;
923         }
924         
925         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
926         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
927             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
928                 DEBUG(0,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
929                 return -1;
930         }
931
932         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
933         CTDB_NO_MEMORY(ctdb, *vnnmap);
934         (*vnnmap)->generation = map->generation;
935         (*vnnmap)->size       = map->size;
936         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
937
938         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
939         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
940         talloc_free(outdata.dptr);
941                     
942         return 0;
943 }
944
945 /*
946   get the recovery mode of a remote node
947  */
948 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
949 {
950         int ret;
951         int32_t res;
952
953         ret = ctdb_control(ctdb, destnode, 0, 
954                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
955                            NULL, NULL, &res, &timeout, NULL);
956         if (ret != 0) {
957                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
958                 return -1;
959         }
960
961         *recmode = res;
962
963         return 0;
964 }
965
966 /*
967   set the recovery mode of a remote node
968  */
969 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
970 {
971         int ret;
972         TDB_DATA data;
973         int32_t res;
974
975         data.dsize = sizeof(uint32_t);
976         data.dptr = (unsigned char *)&recmode;
977
978         ret = ctdb_control(ctdb, destnode, 0, 
979                            CTDB_CONTROL_SET_RECMODE, 0, data, 
980                            NULL, NULL, &res, &timeout, NULL);
981         if (ret != 0 || res != 0) {
982                 DEBUG(0,(__location__ " ctdb_control for setrecmode failed\n"));
983                 return -1;
984         }
985
986         return 0;
987 }
988
989 /*
990   get the recovery master of a remote node
991  */
992 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
993 {
994         int ret;
995         int32_t res;
996
997         ret = ctdb_control(ctdb, destnode, 0, 
998                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
999                            NULL, NULL, &res, &timeout, NULL);
1000         if (ret != 0) {
1001                 DEBUG(0,(__location__ " ctdb_control for getrecmaster failed\n"));
1002                 return -1;
1003         }
1004
1005         *recmaster = res;
1006
1007         return 0;
1008 }
1009
1010 /*
1011   set the recovery master of a remote node
1012  */
1013 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1014 {
1015         int ret;
1016         TDB_DATA data;
1017         int32_t res;
1018
1019         ZERO_STRUCT(data);
1020         data.dsize = sizeof(uint32_t);
1021         data.dptr = (unsigned char *)&recmaster;
1022
1023         ret = ctdb_control(ctdb, destnode, 0, 
1024                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1025                            NULL, NULL, &res, &timeout, NULL);
1026         if (ret != 0 || res != 0) {
1027                 DEBUG(0,(__location__ " ctdb_control for setrecmaster failed\n"));
1028                 return -1;
1029         }
1030
1031         return 0;
1032 }
1033
1034
1035 /*
1036   get a list of databases off a remote node
1037  */
1038 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1039                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1040 {
1041         int ret;
1042         TDB_DATA outdata;
1043         int32_t res;
1044
1045         ret = ctdb_control(ctdb, destnode, 0, 
1046                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1047                            mem_ctx, &outdata, &res, &timeout, NULL);
1048         if (ret != 0 || res != 0) {
1049                 DEBUG(0,(__location__ " ctdb_control for getdbmap failed\n"));
1050                 return -1;
1051         }
1052
1053         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1054         talloc_free(outdata.dptr);
1055                     
1056         return 0;
1057 }
1058
1059
1060 /*
1061   get a list of nodes (vnn and flags ) from a remote node
1062  */
1063 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1064                 struct timeval timeout, uint32_t destnode, 
1065                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1066 {
1067         int ret;
1068         TDB_DATA outdata;
1069         int32_t res;
1070
1071         ret = ctdb_control(ctdb, destnode, 0, 
1072                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1073                            mem_ctx, &outdata, &res, &timeout, NULL);
1074         if (ret != 0 || res != 0) {
1075                 DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
1076                 return -1;
1077         }
1078
1079         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1080         talloc_free(outdata.dptr);
1081                     
1082         return 0;
1083 }
1084
1085 /*
1086   set vnn map on a node
1087  */
1088 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1089                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1090 {
1091         int ret;
1092         TDB_DATA data;
1093         int32_t res;
1094         struct ctdb_vnn_map_wire *map;
1095         size_t len;
1096
1097         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1098         map = talloc_size(mem_ctx, len);
1099         CTDB_NO_MEMORY_VOID(ctdb, map);
1100
1101         map->generation = vnnmap->generation;
1102         map->size = vnnmap->size;
1103         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1104         
1105         data.dsize = len;
1106         data.dptr  = (uint8_t *)map;
1107
1108         ret = ctdb_control(ctdb, destnode, 0, 
1109                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1110                            NULL, NULL, &res, &timeout, NULL);
1111         if (ret != 0 || res != 0) {
1112                 DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
1113                 return -1;
1114         }
1115
1116         talloc_free(map);
1117
1118         return 0;
1119 }
1120
1121 /*
1122   get all keys and records for a specific database
1123  */
1124 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster, 
1125                      TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
1126 {
1127         int i, ret;
1128         TDB_DATA indata, outdata;
1129         struct ctdb_control_pulldb pull;
1130         struct ctdb_control_pulldb_reply *reply;
1131         struct ctdb_rec_data *rec;
1132         int32_t res;
1133
1134         pull.db_id   = dbid;
1135         pull.lmaster = lmaster;
1136
1137         indata.dsize = sizeof(struct ctdb_control_pulldb);
1138         indata.dptr  = (unsigned char *)&pull;
1139
1140         ret = ctdb_control(ctdb, destnode, 0, 
1141                            CTDB_CONTROL_PULL_DB, 0, indata, 
1142                            mem_ctx, &outdata, &res, NULL, NULL);
1143         if (ret != 0 || res != 0) {
1144                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1145                 return -1;
1146         }
1147
1148
1149         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
1150         keys->dbid     = reply->db_id;
1151         keys->num      = reply->count;
1152         
1153         keys->keys     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1154         keys->headers  = talloc_array(mem_ctx, struct ctdb_ltdb_header, keys->num);
1155         keys->data     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1156
1157         rec = (struct ctdb_rec_data *)&reply->data[0];
1158
1159         for (i=0;i<reply->count;i++) {
1160                 keys->keys[i].dptr = (unsigned char *)talloc_memdup(mem_ctx, &rec->data[0], rec->keylen);
1161                 keys->keys[i].dsize = rec->keylen;
1162                 
1163                 keys->data[i].dptr = (unsigned char *)talloc_memdup(mem_ctx, &rec->data[keys->keys[i].dsize], rec->datalen);
1164                 keys->data[i].dsize = rec->datalen;
1165
1166                 if (keys->data[i].dsize < sizeof(struct ctdb_ltdb_header)) {
1167                         DEBUG(0,(__location__ " bad ltdb record\n"));
1168                         return -1;
1169                 }
1170                 memcpy(&keys->headers[i], keys->data[i].dptr, sizeof(struct ctdb_ltdb_header));
1171                 keys->data[i].dptr += sizeof(struct ctdb_ltdb_header);
1172                 keys->data[i].dsize -= sizeof(struct ctdb_ltdb_header);
1173
1174                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1175         }           
1176
1177         talloc_free(outdata.dptr);
1178
1179         return 0;
1180 }
1181
1182 /*
1183   copy a tdb from one node to another node
1184  */
1185 int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t sourcenode, 
1186                      uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
1187 {
1188         int ret;
1189         TDB_DATA indata, outdata;
1190         int32_t res;
1191
1192         indata.dsize = 2*sizeof(uint32_t);
1193         indata.dptr  = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1194
1195         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1196         ((uint32_t *)(&indata.dptr[0]))[1] = lmaster;
1197
1198         DEBUG(3,("pulling dbid 0x%x from %u\n", dbid, sourcenode));
1199
1200         ret = ctdb_control(ctdb, sourcenode, 0, 
1201                            CTDB_CONTROL_PULL_DB, 0, indata, 
1202                            mem_ctx, &outdata, &res, &timeout, NULL);
1203         if (ret != 0 || res != 0) {
1204                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1205                 return -1;
1206         }
1207
1208         DEBUG(3,("pushing dbid 0x%x to %u\n", dbid, destnode));
1209
1210         ret = ctdb_control(ctdb, destnode, 0, 
1211                            CTDB_CONTROL_PUSH_DB, 0, outdata, 
1212                            mem_ctx, NULL, &res, &timeout, NULL);
1213         talloc_free(outdata.dptr);
1214         if (ret != 0 || res != 0) {
1215                 DEBUG(0,(__location__ " ctdb_control for pushdb failed\n"));
1216                 return -1;
1217         }
1218
1219         DEBUG(3,("copydb for dbid 0x%x done for %u to %u\n", 
1220                  dbid, sourcenode, destnode));
1221
1222         return 0;
1223 }
1224
1225 /*
1226   change dmaster for all keys in the database to the new value
1227  */
1228 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1229                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1230 {
1231         int ret;
1232         TDB_DATA indata;
1233         int32_t res;
1234
1235         indata.dsize = 2*sizeof(uint32_t);
1236         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1237
1238         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1239         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1240
1241         ret = ctdb_control(ctdb, destnode, 0, 
1242                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1243                            NULL, NULL, &res, &timeout, NULL);
1244         if (ret != 0 || res != 0) {
1245                 DEBUG(0,(__location__ " ctdb_control for setdmaster failed\n"));
1246                 return -1;
1247         }
1248
1249         return 0;
1250 }
1251
1252 /*
1253   ping a node, return number of clients connected
1254  */
1255 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1256 {
1257         int ret;
1258         int32_t res;
1259
1260         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1261                            tdb_null, NULL, NULL, &res, NULL, NULL);
1262         if (ret != 0) {
1263                 return -1;
1264         }
1265         return res;
1266 }
1267
1268 /*
1269   find the real path to a ltdb 
1270  */
1271 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1272                    const char **path)
1273 {
1274         int ret;
1275         int32_t res;
1276         TDB_DATA data;
1277
1278         data.dptr = (uint8_t *)&dbid;
1279         data.dsize = sizeof(dbid);
1280
1281         ret = ctdb_control(ctdb, destnode, 0, 
1282                            CTDB_CONTROL_GETDBPATH, 0, data, 
1283                            mem_ctx, &data, &res, &timeout, NULL);
1284         if (ret != 0 || res != 0) {
1285                 return -1;
1286         }
1287
1288         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1289         if ((*path) == NULL) {
1290                 return -1;
1291         }
1292
1293         talloc_free(data.dptr);
1294
1295         return 0;
1296 }
1297
1298 /*
1299   find the name of a db 
1300  */
1301 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1302                    const char **name)
1303 {
1304         int ret;
1305         int32_t res;
1306         TDB_DATA data;
1307
1308         data.dptr = (uint8_t *)&dbid;
1309         data.dsize = sizeof(dbid);
1310
1311         ret = ctdb_control(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1313                            mem_ctx, &data, &res, &timeout, NULL);
1314         if (ret != 0 || res != 0) {
1315                 return -1;
1316         }
1317
1318         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1319         if ((*name) == NULL) {
1320                 return -1;
1321         }
1322
1323         talloc_free(data.dptr);
1324
1325         return 0;
1326 }
1327
1328 /*
1329   create a database
1330  */
1331 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name)
1332 {
1333         int ret;
1334         int32_t res;
1335         TDB_DATA data;
1336
1337         data.dptr = (unsigned char *)discard_const(name);
1338         data.dsize = strlen(name)+1;
1339
1340         ret = ctdb_control(ctdb, destnode, 0, 
1341                            CTDB_CONTROL_DB_ATTACH, 0, data, 
1342                            mem_ctx, &data, &res, &timeout, NULL);
1343
1344         if (ret != 0 || res != 0) {
1345                 return -1;
1346         }
1347
1348         return 0;
1349 }
1350
1351 /*
1352   get debug level on a node
1353  */
1354 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *level)
1355 {
1356         int ret;
1357         int32_t res;
1358         TDB_DATA data;
1359
1360         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1361                            ctdb, &data, &res, NULL, NULL);
1362         if (ret != 0 || res != 0) {
1363                 return -1;
1364         }
1365         if (data.dsize != sizeof(uint32_t)) {
1366                 DEBUG(0,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1367                          (unsigned)data.dsize));
1368                 return -1;
1369         }
1370         *level = *(uint32_t *)data.dptr;
1371         talloc_free(data.dptr);
1372         return 0;
1373 }
1374
1375 /*
1376   set debug level on a node
1377  */
1378 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t level)
1379 {
1380         int ret;
1381         int32_t res;
1382         TDB_DATA data;
1383
1384         data.dptr = (uint8_t *)&level;
1385         data.dsize = sizeof(level);
1386
1387         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1388                            NULL, NULL, &res, NULL, NULL);
1389         if (ret != 0 || res != 0) {
1390                 return -1;
1391         }
1392         return 0;
1393 }
1394
1395
1396 /*
1397   get a list of connected nodes
1398  */
1399 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1400                                 struct timeval timeout,
1401                                 TALLOC_CTX *mem_ctx,
1402                                 uint32_t *num_nodes)
1403 {
1404         struct ctdb_node_map *map=NULL;
1405         int ret, i;
1406         uint32_t *nodes;
1407
1408         *num_nodes = 0;
1409
1410         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1411         if (ret != 0) {
1412                 return NULL;
1413         }
1414
1415         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1416         if (nodes == NULL) {
1417                 return NULL;
1418         }
1419
1420         for (i=0;i<map->num;i++) {
1421                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1422                         nodes[*num_nodes] = map->nodes[i].vnn;
1423                         (*num_nodes)++;
1424                 }
1425         }
1426
1427         return nodes;
1428 }
1429
1430
1431 /*
1432   reset remote status
1433  */
1434 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1435 {
1436         int ret;
1437         int32_t res;
1438
1439         ret = ctdb_control(ctdb, destnode, 0, 
1440                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1441                            NULL, NULL, &res, NULL, NULL);
1442         if (ret != 0 || res != 0) {
1443                 DEBUG(0,(__location__ " ctdb_control for reset statistics failed\n"));
1444                 return -1;
1445         }
1446         return 0;
1447 }
1448
1449
1450 /*
1451   attach to a specific database - client call
1452 */
1453 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
1454 {
1455         struct ctdb_db_context *ctdb_db;
1456         TDB_DATA data;
1457         int ret;
1458         int32_t res;
1459
1460         ctdb_db = ctdb_db_handle(ctdb, name);
1461         if (ctdb_db) {
1462                 return ctdb_db;
1463         }
1464
1465         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1466         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1467
1468         ctdb_db->ctdb = ctdb;
1469         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1470         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1471
1472         data.dptr = (unsigned char *)discard_const(name);
1473         data.dsize = strlen(name)+1;
1474
1475         /* tell ctdb daemon to attach */
1476         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
1477                            0, data, ctdb_db, &data, &res, NULL, NULL);
1478         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1479                 DEBUG(0,("Failed to attach to database '%s'\n", name));
1480                 talloc_free(ctdb_db);
1481                 return NULL;
1482         }
1483         
1484         ctdb_db->db_id = *(uint32_t *)data.dptr;
1485         talloc_free(data.dptr);
1486
1487         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1488         if (ret != 0) {
1489                 DEBUG(0,("Failed to get dbpath for database '%s'\n", name));
1490                 talloc_free(ctdb_db);
1491                 return NULL;
1492         }
1493
1494         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 0, O_RDWR, 0);
1495         if (ctdb_db->ltdb == NULL) {
1496                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1497                 talloc_free(ctdb_db);
1498                 return NULL;
1499         }
1500
1501         DLIST_ADD(ctdb->db_list, ctdb_db);
1502
1503         return ctdb_db;
1504 }
1505
1506
1507 /*
1508   setup a call for a database
1509  */
1510 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1511 {
1512         TDB_DATA data;
1513         int32_t status;
1514         struct ctdb_control_set_call c;
1515         int ret;
1516         struct ctdb_registered_call *call;
1517
1518         c.db_id = ctdb_db->db_id;
1519         c.fn    = fn;
1520         c.id    = id;
1521
1522         data.dptr = (uint8_t *)&c;
1523         data.dsize = sizeof(c);
1524
1525         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1526                            data, NULL, NULL, &status, NULL, NULL);
1527         if (ret != 0 || status != 0) {
1528                 DEBUG(0,("ctdb_set_call failed for call %u\n", id));
1529                 return -1;
1530         }
1531
1532         /* also register locally */
1533         call = talloc(ctdb_db, struct ctdb_registered_call);
1534         call->fn = fn;
1535         call->id = id;
1536
1537         DLIST_ADD(ctdb_db->calls, call);        
1538         return 0;
1539 }
1540
1541
1542 struct traverse_state {
1543         bool done;
1544         uint32_t count;
1545         ctdb_traverse_func fn;
1546         void *private_data;
1547 };
1548
1549 /*
1550   called on each key during a ctdb_traverse
1551  */
1552 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1553 {
1554         struct traverse_state *state = (struct traverse_state *)p;
1555         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1556         TDB_DATA key;
1557
1558         if (data.dsize < sizeof(uint32_t) ||
1559             d->length != data.dsize) {
1560                 DEBUG(0,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1561                 state->done = True;
1562                 return;
1563         }
1564
1565         key.dsize = d->keylen;
1566         key.dptr  = &d->data[0];
1567         data.dsize = d->datalen;
1568         data.dptr = &d->data[d->keylen];
1569
1570         if (key.dsize == 0 && data.dsize == 0) {
1571                 /* end of traverse */
1572                 state->done = True;
1573                 return;
1574         }
1575
1576         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1577                 state->done = True;
1578         }
1579
1580         state->count++;
1581 }
1582
1583
1584 /*
1585   start a cluster wide traverse, calling the supplied fn on each record
1586   return the number of records traversed, or -1 on error
1587  */
1588 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1589 {
1590         TDB_DATA data;
1591         struct ctdb_traverse_start t;
1592         int32_t status;
1593         int ret;
1594         uint64_t srvid = (getpid() | 0xFLL<<60);
1595         struct traverse_state state;
1596
1597         state.done = False;
1598         state.count = 0;
1599         state.private_data = private_data;
1600         state.fn = fn;
1601
1602         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1603         if (ret != 0) {
1604                 DEBUG(0,("Failed to setup traverse handler\n"));
1605                 return -1;
1606         }
1607
1608         t.db_id = ctdb_db->db_id;
1609         t.srvid = srvid;
1610         t.reqid = 0;
1611
1612         data.dptr = (uint8_t *)&t;
1613         data.dsize = sizeof(t);
1614
1615         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1616                            data, NULL, NULL, &status, NULL, NULL);
1617         if (ret != 0 || status != 0) {
1618                 DEBUG(0,("ctdb_traverse_all failed\n"));
1619                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1620                 return -1;
1621         }
1622
1623         while (!state.done) {
1624                 event_loop_once(ctdb_db->ctdb->ev);
1625         }
1626
1627         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1628         if (ret != 0) {
1629                 DEBUG(0,("Failed to remove ctdb_traverse handler\n"));
1630                 return -1;
1631         }
1632
1633         return state.count;
1634 }
1635
1636 /*
1637   called on each key during a catdb
1638  */
1639 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1640 {
1641         FILE *f = (FILE *)p;
1642         char *keystr, *datastr;
1643         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1644
1645         keystr  = hex_encode_talloc(ctdb, key.dptr, key.dsize);
1646         datastr = hex_encode_talloc(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
1647
1648         fprintf(f, "dmaster: %u\n", h->dmaster);
1649         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1650         fprintf(f, "key: %s\ndata: %s\n", keystr, datastr);
1651
1652         talloc_free(keystr);
1653         talloc_free(datastr);
1654         return 0;
1655 }
1656
1657 /*
1658   convenience function to list all keys to stdout
1659  */
1660 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1661 {
1662         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1663 }
1664
1665 /*
1666   get the pid of a ctdb daemon
1667  */
1668 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1669 {
1670         int ret;
1671         int32_t res;
1672
1673         ret = ctdb_control(ctdb, destnode, 0, 
1674                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1675                            NULL, NULL, &res, &timeout, NULL);
1676         if (ret != 0) {
1677                 DEBUG(0,(__location__ " ctdb_control for getpid failed\n"));
1678                 return -1;
1679         }
1680
1681         *pid = res;
1682
1683         return 0;
1684 }
1685
1686
1687 /*
1688   freeze a node
1689  */
1690 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1691 {
1692         int ret;
1693         int32_t res;
1694
1695         ret = ctdb_control(ctdb, destnode, 0, 
1696                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1697                            NULL, NULL, &res, &timeout, NULL);
1698         if (ret != 0 || res != 0) {
1699                 DEBUG(0,(__location__ " ctdb_control freeze failed\n"));
1700                 return -1;
1701         }
1702
1703         return 0;
1704 }
1705
1706 /*
1707   thaw a node
1708  */
1709 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1710 {
1711         int ret;
1712         int32_t res;
1713
1714         ret = ctdb_control(ctdb, destnode, 0, 
1715                            CTDB_CONTROL_THAW, 0, tdb_null, 
1716                            NULL, NULL, &res, &timeout, NULL);
1717         if (ret != 0 || res != 0) {
1718                 DEBUG(0,(__location__ " ctdb_control thaw failed\n"));
1719                 return -1;
1720         }
1721
1722         return 0;
1723 }
1724
1725 /*
1726   get vnn of a node, or -1
1727  */
1728 int ctdb_ctrl_getvnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1729 {
1730         int ret;
1731         int32_t res;
1732
1733         ret = ctdb_control(ctdb, destnode, 0, 
1734                            CTDB_CONTROL_GET_VNN, 0, tdb_null, 
1735                            NULL, NULL, &res, &timeout, NULL);
1736         if (ret != 0) {
1737                 DEBUG(0,(__location__ " ctdb_control for getvnn failed\n"));
1738                 return -1;
1739         }
1740
1741         return res;
1742 }
1743
1744 /*
1745   set the monitoring mode of a remote node
1746  */
1747 int ctdb_ctrl_setmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t monmode)
1748 {
1749         int ret;
1750         TDB_DATA data;
1751         int32_t res;
1752
1753         data.dsize = sizeof(uint32_t);
1754         data.dptr = (uint8_t *)&monmode;
1755
1756         ret = ctdb_control(ctdb, destnode, 0, 
1757                            CTDB_CONTROL_SET_MONMODE, 0, data, 
1758                            NULL, NULL, &res, &timeout, NULL);
1759         if (ret != 0 || res != 0) {
1760                 DEBUG(0,(__location__ " ctdb_control for setmonmode failed\n"));
1761                 return -1;
1762         }
1763
1764         return 0;
1765 }
1766
1767 /*
1768   get the monitoring mode of a remote node
1769  */
1770 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1771 {
1772         int ret;
1773         int32_t res;
1774
1775         ret = ctdb_control(ctdb, destnode, 0, 
1776                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1777                            NULL, NULL, &res, &timeout, NULL);
1778         if (ret != 0) {
1779                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
1780                 return -1;
1781         }
1782
1783         *monmode = res;
1784
1785         return 0;
1786 }
1787
1788
1789 /*
1790   get maximum rsn for a db on a node
1791  */
1792 int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1793                           uint32_t destnode, uint32_t db_id, uint64_t *max_rsn)
1794 {
1795         TDB_DATA data, outdata;
1796         int ret;
1797         int32_t res;
1798
1799         data.dptr = (uint8_t *)&db_id;
1800         data.dsize = sizeof(db_id);
1801
1802         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_MAX_RSN, 0, data, ctdb,
1803                            &outdata, &res, &timeout, NULL);
1804         if (ret != 0 || res != 0 || outdata.dsize != sizeof(uint64_t)) {
1805                 DEBUG(0,(__location__ " ctdb_control for get_max_rsn failed\n"));
1806                 return -1;
1807         }
1808
1809         *max_rsn = *(uint64_t *)outdata.dptr;
1810         talloc_free(outdata.dptr);
1811
1812         return 0;       
1813 }
1814
1815 /*
1816   set the rsn on non-empty records to the given rsn
1817  */
1818 int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
1819                                uint32_t destnode, uint32_t db_id, uint64_t rsn)
1820 {
1821         TDB_DATA data;
1822         int ret;
1823         int32_t res;
1824         struct ctdb_control_set_rsn_nonempty p;
1825
1826         p.db_id = db_id;
1827         p.rsn = rsn;
1828
1829         data.dptr = (uint8_t *)&p;
1830         data.dsize = sizeof(p);
1831
1832         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, NULL,
1833                            NULL, &res, &timeout, NULL);
1834         if (ret != 0 || res != 0) {
1835                 DEBUG(0,(__location__ " ctdb_control for set_rsn_nonempty failed\n"));
1836                 return -1;
1837         }
1838
1839         return 0;       
1840 }
1841
1842 /*
1843   delete records which have a rsn below the given rsn
1844  */
1845 int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1846                              uint32_t destnode, uint32_t db_id, uint64_t rsn)
1847 {
1848         TDB_DATA data;
1849         int ret;
1850         int32_t res;
1851         struct ctdb_control_delete_low_rsn p;
1852
1853         p.db_id = db_id;
1854         p.rsn = rsn;
1855
1856         data.dptr = (uint8_t *)&p;
1857         data.dsize = sizeof(p);
1858
1859         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, NULL,
1860                            NULL, &res, &timeout, NULL);
1861         if (ret != 0 || res != 0) {
1862                 DEBUG(0,(__location__ " ctdb_control for delete_low_rsn failed\n"));
1863                 return -1;
1864         }
1865
1866         return 0;       
1867 }
1868
1869 /* 
1870   sent to a node to make it take over an ip address
1871 */
1872 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1873                           uint32_t destnode, struct ctdb_public_ip *ip)
1874 {
1875         TDB_DATA data;
1876         int ret;
1877         int32_t res;
1878
1879         data.dsize = sizeof(*ip);
1880         data.dptr  = (uint8_t *)ip;
1881
1882         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1883                            NULL, &res, &timeout, NULL);
1884
1885         if (ret != 0 || res != 0) {
1886                 DEBUG(0,(__location__ " ctdb_control for takeover_ip failed\n"));
1887                 return -1;
1888         }
1889
1890         return 0;       
1891 }
1892
1893
1894 /* 
1895   sent to a node to make it release an ip address
1896 */
1897 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1898                          uint32_t destnode, struct ctdb_public_ip *ip)
1899 {
1900         TDB_DATA data;
1901         int ret;
1902         int32_t res;
1903
1904         data.dsize = sizeof(*ip);
1905         data.dptr  = (uint8_t *)ip;
1906
1907         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1908                            NULL, &res, &timeout, NULL);
1909
1910         if (ret != 0 || res != 0) {
1911                 DEBUG(0,(__location__ " ctdb_control for release_ip failed\n"));
1912                 return -1;
1913         }
1914
1915         return 0;       
1916 }
1917
1918
1919 /*
1920   get a tunable
1921  */
1922 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1923                           struct timeval timeout, 
1924                           uint32_t destnode,
1925                           const char *name, uint32_t *value)
1926 {
1927         struct ctdb_control_get_tunable *t;
1928         TDB_DATA data, outdata;
1929         int32_t res;
1930         int ret;
1931
1932         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1933         data.dptr  = (unsigned char *)talloc_size(ctdb, data.dsize);
1934         CTDB_NO_MEMORY(ctdb, data.dptr);
1935
1936         t = (struct ctdb_control_get_tunable *)data.dptr;
1937         t->length = strlen(name)+1;
1938         memcpy(t->name, name, t->length);
1939
1940         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1941                            &outdata, &res, &timeout, NULL);
1942         talloc_free(data.dptr);
1943         if (ret != 0 || res != 0) {
1944                 DEBUG(0,(__location__ " ctdb_control for get_tunable failed\n"));
1945                 return -1;
1946         }
1947
1948         if (outdata.dsize != sizeof(uint32_t)) {
1949                 DEBUG(0,("Invalid return data in get_tunable\n"));
1950                 talloc_free(outdata.dptr);
1951                 return -1;
1952         }
1953         
1954         *value = *(uint32_t *)outdata.dptr;
1955         talloc_free(outdata.dptr);
1956
1957         return 0;
1958 }
1959
1960 /*
1961   set a tunable
1962  */
1963 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1964                           struct timeval timeout, 
1965                           uint32_t destnode,
1966                           const char *name, uint32_t value)
1967 {
1968         struct ctdb_control_set_tunable *t;
1969         TDB_DATA data;
1970         int32_t res;
1971         int ret;
1972
1973         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1974         data.dptr  = talloc_array(ctdb, unsigned char, data.dsize);
1975         CTDB_NO_MEMORY(ctdb, data.dptr);
1976
1977         t = (struct ctdb_control_set_tunable *)data.dptr;
1978         t->length = strlen(name)+1;
1979         memcpy(t->name, name, t->length);
1980         t->value = value;
1981
1982         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1983                            NULL, &res, &timeout, NULL);
1984         talloc_free(data.dptr);
1985         if (ret != 0 || res != 0) {
1986                 DEBUG(0,(__location__ " ctdb_control for set_tunable failed\n"));
1987                 return -1;
1988         }
1989
1990         return 0;
1991 }
1992
1993 /*
1994   list tunables
1995  */
1996 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1997                             struct timeval timeout, 
1998                             uint32_t destnode,
1999                             TALLOC_CTX *mem_ctx,
2000                             const char ***list, uint32_t *count)
2001 {
2002         TDB_DATA outdata;
2003         int32_t res;
2004         int ret;
2005         struct ctdb_control_list_tunable *t;
2006         char *p, *s, *ptr;
2007
2008         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2009                            mem_ctx, &outdata, &res, &timeout, NULL);
2010         if (ret != 0 || res != 0) {
2011                 DEBUG(0,(__location__ " ctdb_control for list_tunables failed\n"));
2012                 return -1;
2013         }
2014
2015         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2016         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2017             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2018                 DEBUG(0,("Invalid data in list_tunables reply\n"));
2019                 talloc_free(outdata.dptr);
2020                 return -1;              
2021         }
2022         
2023         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2024         CTDB_NO_MEMORY(ctdb, p);
2025
2026         talloc_free(outdata.dptr);
2027         
2028         (*list) = NULL;
2029         (*count) = 0;
2030
2031         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2032                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2033                 CTDB_NO_MEMORY(ctdb, *list);
2034                 (*list)[*count] = talloc_strdup(*list, s);
2035                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2036                 (*count)++;
2037         }
2038
2039         talloc_free(p);
2040
2041         return 0;
2042 }
2043
2044
2045 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2046                         struct timeval timeout, uint32_t destnode, 
2047                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2048 {
2049         int ret;
2050         TDB_DATA outdata;
2051         int32_t res;
2052
2053         ret = ctdb_control(ctdb, destnode, 0, 
2054                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2055                            mem_ctx, &outdata, &res, &timeout, NULL);
2056         if (ret != 0 || res != 0) {
2057                 DEBUG(0,(__location__ " ctdb_control for getpublicips failed\n"));
2058                 return -1;
2059         }
2060
2061         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2062         talloc_free(outdata.dptr);
2063                     
2064         return 0;
2065 }
2066
2067 /*
2068   set/clear the permanent disabled bit on a remote node
2069  */
2070 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2071                        uint32_t set, uint32_t clear)
2072 {
2073         int ret;
2074         TDB_DATA data;
2075         struct ctdb_node_modflags m;
2076         int32_t res;
2077
2078         m.set = set;
2079         m.clear = clear;
2080
2081         data.dsize = sizeof(m);
2082         data.dptr = (unsigned char *)&m;
2083
2084         ret = ctdb_control(ctdb, destnode, 0, 
2085                            CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
2086                            NULL, NULL, &res, &timeout, NULL);
2087         if (ret != 0 || res != 0) {
2088                 DEBUG(0,(__location__ " ctdb_control for modflags failed\n"));
2089                 return -1;
2090         }
2091
2092         return 0;
2093 }
2094
2095
2096 /*
2097   get all tunables
2098  */
2099 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2100                                struct timeval timeout, 
2101                                uint32_t destnode,
2102                                struct ctdb_tunable *tunables)
2103 {
2104         TDB_DATA outdata;
2105         int ret;
2106         int32_t res;
2107
2108         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2109                            &outdata, &res, &timeout, NULL);
2110         if (ret != 0 || res != 0) {
2111                 DEBUG(0,(__location__ " ctdb_control for get all tunables failed\n"));
2112                 return -1;
2113         }
2114
2115         if (outdata.dsize != sizeof(*tunables)) {
2116                 DEBUG(0,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2117                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2118                 return -1;              
2119         }
2120
2121         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2122         talloc_free(outdata.dptr);
2123         return 0;
2124 }
2125
2126
2127 /*
2128   kill a tcp connection
2129  */
2130 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2131                       struct timeval timeout, 
2132                       uint32_t destnode,
2133                       struct ctdb_control_killtcp *killtcp)
2134 {
2135         TDB_DATA data;
2136         int32_t res;
2137         int ret;
2138
2139         data.dsize = sizeof(struct ctdb_control_killtcp);
2140         data.dptr  = (unsigned char *)killtcp;
2141
2142         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2143                            NULL, &res, &timeout, NULL);
2144         if (ret != 0 || res != 0) {
2145                 DEBUG(0,(__location__ " ctdb_control for killtcp failed\n"));
2146                 return -1;
2147         }
2148
2149         return 0;
2150 }
2151
2152 /*
2153   get a list of all tcp tickles that a node knows about for a particular vnn
2154  */
2155 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2156                               struct timeval timeout, uint32_t destnode, 
2157                               TALLOC_CTX *mem_ctx, uint32_t vnn,
2158                               struct ctdb_control_tcp_tickle_list **list)
2159 {
2160         int ret;
2161         TDB_DATA data, outdata;
2162         int32_t status;
2163
2164         data.dptr = (uint8_t*)&vnn;
2165         data.dsize = sizeof(vnn);
2166
2167         ret = ctdb_control(ctdb, destnode, 0, 
2168                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2169                            mem_ctx, &outdata, &status, NULL, NULL);
2170         if (ret != 0) {
2171                 DEBUG(0,(__location__ " ctdb_control for get tcp tickles failed\n"));
2172                 return -1;
2173         }
2174
2175         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2176
2177         return status;
2178 }
2179
2180 /*
2181   initialise the ctdb daemon for client applications
2182
2183   NOTE: In current code the daemon does not fork. This is for testing purposes only
2184   and to simplify the code.
2185 */
2186 struct ctdb_context *ctdb_init(struct event_context *ev)
2187 {
2188         struct ctdb_context *ctdb;
2189
2190         ctdb = talloc_zero(ev, struct ctdb_context);
2191         ctdb->ev  = ev;
2192         ctdb->idr = idr_init(ctdb);
2193         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2194
2195         ctdb_set_socketname(ctdb, CTDB_PATH);
2196
2197         return ctdb;
2198 }
2199
2200
2201 /*
2202   set some ctdb flags
2203 */
2204 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2205 {
2206         ctdb->flags |= flags;
2207 }
2208
2209 /*
2210   setup the local socket name
2211 */
2212 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2213 {
2214         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2215         return 0;
2216 }
2217
2218 /*
2219   return the vnn of this node
2220 */
2221 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
2222 {
2223         return ctdb->vnn;
2224 }
2225