b569a69c8d8d3bcce5f241288140751a52b4ce3d
[kai/samba.git] / source4 / cluster / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
30
31 /*
32   allocate a packet for use in client<->daemon communication
33  */
34 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
35                                             TALLOC_CTX *mem_ctx, 
36                                             enum ctdb_operation operation, 
37                                             size_t length, size_t slength,
38                                             const char *type)
39 {
40         int size;
41         struct ctdb_req_header *hdr;
42
43         length = MAX(length, slength);
44         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
45
46         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
47         if (hdr == NULL) {
48                 DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
49                          operation, (unsigned)length));
50                 return NULL;
51         }
52         talloc_set_name_const(hdr, type);
53         memset(hdr, 0, slength);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_VERSION;
58         hdr->srcnode      = ctdb->vnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, uint32_t caller)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88
89         for (fn=ctdb_db->calls;fn;fn=fn->next) {
90                 if (fn->id == call->call_id) break;
91         }
92         if (fn == NULL) {
93                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
94                 talloc_free(c);
95                 return -1;
96         }
97
98         if (fn->fn(c) != 0) {
99                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
100                 talloc_free(c);
101                 return -1;
102         }
103
104         if (header->laccessor != caller) {
105                 header->lacount = 0;
106         }
107         header->laccessor = caller;
108         header->lacount++;
109
110         /* we need to force the record to be written out if this was a remote access,
111            so that the lacount is updated */
112         if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
113                 c->new_data = &c->record_data;
114         }
115
116         if (c->new_data) {
117                 /* XXX check that we always have the lock here? */
118                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
119                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
120                         talloc_free(c);
121                         return -1;
122                 }
123         }
124
125         if (c->reply_data) {
126                 call->reply_data = *c->reply_data;
127                 talloc_steal(ctdb, call->reply_data.dptr);
128                 talloc_set_name_const(call->reply_data.dptr, __location__);
129         } else {
130                 call->reply_data.dptr = NULL;
131                 call->reply_data.dsize = 0;
132         }
133         call->status = c->status;
134
135         talloc_free(c);
136
137         return 0;
138 }
139
140
141 /*
142   queue a packet for sending from client to daemon
143 */
144 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
145 {
146         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
147 }
148
149
150 /*
151   state of a in-progress ctdb call in client
152 */
153 struct ctdb_client_call_state {
154         enum call_state state;
155         uint32_t reqid;
156         struct ctdb_db_context *ctdb_db;
157         struct ctdb_call call;
158 };
159
160 /*
161   called when a CTDB_REPLY_CALL packet comes in in the client
162
163   This packet comes in response to a CTDB_REQ_CALL request packet. It
164   contains any reply data from the call
165 */
166 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
167 {
168         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
169         struct ctdb_client_call_state *state;
170
171         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
172         if (state == NULL) {
173                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
174                 return;
175         }
176
177         if (hdr->reqid != state->reqid) {
178                 /* we found a record  but it was the wrong one */
179                 DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
180                 return;
181         }
182
183         state->call.reply_data.dptr = c->data;
184         state->call.reply_data.dsize = c->datalen;
185         state->call.status = c->status;
186
187         talloc_steal(state, c);
188
189         state->state = CTDB_CALL_DONE;
190 }
191
192 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
193
194 /*
195   this is called in the client, when data comes in from the daemon
196  */
197 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
198 {
199         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
200         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
201         TALLOC_CTX *tmp_ctx;
202
203         /* place the packet as a child of a tmp_ctx. We then use
204            talloc_free() below to free it. If any of the calls want
205            to keep it, then they will steal it somewhere else, and the
206            talloc_free() will be a no-op */
207         tmp_ctx = talloc_new(ctdb);
208         talloc_steal(tmp_ctx, hdr);
209
210         if (cnt == 0) {
211                 DEBUG(2,("Daemon has exited - shutting down client\n"));
212                 exit(0);
213         }
214
215         if (cnt < sizeof(*hdr)) {
216                 DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt));
217                 goto done;
218         }
219         if (cnt != hdr->length) {
220                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
221                                (unsigned)hdr->length, (unsigned)cnt);
222                 goto done;
223         }
224
225         if (hdr->ctdb_magic != CTDB_MAGIC) {
226                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
227                 goto done;
228         }
229
230         if (hdr->ctdb_version != CTDB_VERSION) {
231                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
232                 goto done;
233         }
234
235         switch (hdr->operation) {
236         case CTDB_REPLY_CALL:
237                 ctdb_client_reply_call(ctdb, hdr);
238                 break;
239
240         case CTDB_REQ_MESSAGE:
241                 ctdb_request_message(ctdb, hdr);
242                 break;
243
244         case CTDB_REPLY_CONTROL:
245                 ctdb_client_reply_control(ctdb, hdr);
246                 break;
247
248         default:
249                 DEBUG(0,("bogus operation code:%u\n",hdr->operation));
250         }
251
252 done:
253         talloc_free(tmp_ctx);
254 }
255
256 /*
257   connect to a unix domain socket
258 */
259 int ctdb_socket_connect(struct ctdb_context *ctdb)
260 {
261         struct sockaddr_un addr;
262
263         memset(&addr, 0, sizeof(addr));
264         addr.sun_family = AF_UNIX;
265         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
266
267         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
268         if (ctdb->daemon.sd == -1) {
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 return -1;
279         }
280
281         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
282                                               CTDB_DS_ALIGNMENT, 
283                                               ctdb_client_read_cb, ctdb);
284         return 0;
285 }
286
287
288 struct ctdb_record_handle {
289         struct ctdb_db_context *ctdb_db;
290         TDB_DATA key;
291         TDB_DATA *data;
292         struct ctdb_ltdb_header header;
293 };
294
295
296 /*
297   make a recv call to the local ctdb daemon - called from client context
298
299   This is called when the program wants to wait for a ctdb_call to complete and get the 
300   results. This call will block unless the call has already completed.
301 */
302 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
303 {
304         while (state->state < CTDB_CALL_DONE) {
305                 event_loop_once(state->ctdb_db->ctdb->ev);
306         }
307         if (state->state != CTDB_CALL_DONE) {
308                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
309                 talloc_free(state);
310                 return -1;
311         }
312
313         if (state->call.reply_data.dsize) {
314                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
315                                                       state->call.reply_data.dptr,
316                                                       state->call.reply_data.dsize);
317                 call->reply_data.dsize = state->call.reply_data.dsize;
318         } else {
319                 call->reply_data.dptr = NULL;
320                 call->reply_data.dsize = 0;
321         }
322         call->status = state->call.status;
323         talloc_free(state);
324
325         return 0;
326 }
327
328
329
330
331 /*
332   destroy a ctdb_call in client
333 */
334 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
335 {
336         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
337         return 0;
338 }
339
340 /*
341   construct an event driven local ctdb_call
342
343   this is used so that locally processed ctdb_call requests are processed
344   in an event driven manner
345 */
346 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
347                                                                   struct ctdb_call *call,
348                                                                   struct ctdb_ltdb_header *header,
349                                                                   TDB_DATA *data)
350 {
351         struct ctdb_client_call_state *state;
352         struct ctdb_context *ctdb = ctdb_db->ctdb;
353         int ret;
354
355         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
356         CTDB_NO_MEMORY_NULL(ctdb, state);
357
358         talloc_steal(state, data->dptr);
359
360         state->state = CTDB_CALL_DONE;
361         state->call = *call;
362         state->ctdb_db = ctdb_db;
363
364         ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn);
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                        struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if (ret == 0 && header.dmaster == ctdb->vnn) {
400                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
401                 talloc_free(data.dptr);
402                 ctdb_ltdb_unlock(ctdb_db, call->key);
403                 return state;
404         }
405
406         ctdb_ltdb_unlock(ctdb_db, call->key);
407         talloc_free(data.dptr);
408
409         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
410         if (state == NULL) {
411                 DEBUG(0, (__location__ " failed to allocate state\n"));
412                 return NULL;
413         }
414
415         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
416         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
417         if (c == NULL) {
418                 DEBUG(0, (__location__ " failed to allocate packet\n"));
419                 return NULL;
420         }
421
422         state->reqid     = ctdb_reqid_new(ctdb, state);
423         state->ctdb_db = ctdb_db;
424         talloc_set_destructor(state, ctdb_client_call_destructor);
425
426         c->hdr.reqid     = state->reqid;
427         c->flags         = call->flags;
428         c->db_id         = ctdb_db->db_id;
429         c->callid        = call->call_id;
430         c->hopcount      = 0;
431         c->keylen        = call->key.dsize;
432         c->calldatalen   = call->call_data.dsize;
433         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
434         memcpy(&c->data[call->key.dsize], 
435                call->call_data.dptr, call->call_data.dsize);
436         state->call                = *call;
437         state->call.call_data.dptr = &c->data[call->key.dsize];
438         state->call.key.dptr       = &c->data[0];
439
440         state->state  = CTDB_CALL_WAIT;
441
442
443         ctdb_client_queue_pkt(ctdb, &c->hdr);
444
445         return state;
446 }
447
448
449 /*
450   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
451 */
452 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
453 {
454         struct ctdb_client_call_state *state;
455
456         state = ctdb_call_send(ctdb_db, call);
457         return ctdb_call_recv(state, call);
458 }
459
460
461 /*
462   tell the daemon what messaging srvid we will use, and register the message
463   handler function in the client
464 */
465 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
466                              ctdb_message_fn_t handler,
467                              void *private_data)
468                                     
469 {
470         int res;
471         int32_t status;
472         
473         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
474                            tdb_null, NULL, NULL, &status, NULL, NULL);
475         if (res != 0 || status != 0) {
476                 DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));
477                 return -1;
478         }
479
480         /* also need to register the handler with our own ctdb structure */
481         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
482 }
483
484 /*
485   tell the daemon we no longer want a srvid
486 */
487 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
488 {
489         int res;
490         int32_t status;
491         
492         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
493                            tdb_null, NULL, NULL, &status, NULL, NULL);
494         if (res != 0 || status != 0) {
495                 DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
496                 return -1;
497         }
498
499         /* also need to register the handler with our own ctdb structure */
500         ctdb_deregister_message_handler(ctdb, srvid, private_data);
501         return 0;
502 }
503
504
505 /*
506   send a message - from client context
507  */
508 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
509                       uint64_t srvid, TDB_DATA data)
510 {
511         struct ctdb_req_message *r;
512         int len, res;
513
514         len = offsetof(struct ctdb_req_message, data) + data.dsize;
515         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
516                                len, struct ctdb_req_message);
517         CTDB_NO_MEMORY(ctdb, r);
518
519         r->hdr.destnode  = vnn;
520         r->srvid         = srvid;
521         r->datalen       = data.dsize;
522         memcpy(&r->data[0], data.dptr, data.dsize);
523         
524         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
525         if (res != 0) {
526                 return res;
527         }
528
529         talloc_free(r);
530         return 0;
531 }
532
533
534 /*
535   cancel a ctdb_fetch_lock operation, releasing the lock
536  */
537 static int fetch_lock_destructor(struct ctdb_record_handle *h)
538 {
539         ctdb_ltdb_unlock(h->ctdb_db, h->key);
540         return 0;
541 }
542
543 /*
544   force the migration of a record to this node
545  */
546 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
547 {
548         struct ctdb_call call;
549         ZERO_STRUCT(call);
550         call.call_id = CTDB_NULL_FUNC;
551         call.key = key;
552         call.flags = CTDB_IMMEDIATE_MIGRATION;
553         return ctdb_call(ctdb_db, &call);
554 }
555
556 /*
557   get a lock on a record, and return the records data. Blocks until it gets the lock
558  */
559 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
560                                            TDB_DATA key, TDB_DATA *data)
561 {
562         int ret;
563         struct ctdb_record_handle *h;
564
565         /*
566           procedure is as follows:
567
568           1) get the chain lock. 
569           2) check if we are dmaster
570           3) if we are the dmaster then return handle 
571           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
572              reply from ctdbd
573           5) when we get the reply, goto (1)
574          */
575
576         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
577         if (h == NULL) {
578                 return NULL;
579         }
580
581         h->ctdb_db = ctdb_db;
582         h->key     = key;
583         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
584         if (h->key.dptr == NULL) {
585                 talloc_free(h);
586                 return NULL;
587         }
588         h->data    = data;
589
590         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
591                  (const char *)key.dptr));
592
593 again:
594         /* step 1 - get the chain lock */
595         ret = ctdb_ltdb_lock(ctdb_db, key);
596         if (ret != 0) {
597                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
598                 talloc_free(h);
599                 return NULL;
600         }
601
602         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
603
604         talloc_set_destructor(h, fetch_lock_destructor);
605
606         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
607
608         /* when torturing, ensure we test the remote path */
609         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
610             random() % 5 == 0) {
611                 h->header.dmaster = (uint32_t)-1;
612         }
613
614
615         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
616
617         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->vnn) {
618                 ctdb_ltdb_unlock(ctdb_db, key);
619                 ret = ctdb_client_force_migration(ctdb_db, key);
620                 if (ret != 0) {
621                         DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
622                         talloc_free(h);
623                         return NULL;
624                 }
625                 goto again;
626         }
627
628         DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
629         return h;
630 }
631
632 /*
633   store some data to the record that was locked with ctdb_fetch_lock()
634 */
635 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
636 {
637         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
638 }
639
640 /*
641   non-locking fetch of a record
642  */
643 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
644                TDB_DATA key, TDB_DATA *data)
645 {
646         struct ctdb_call call;
647         int ret;
648
649         call.call_id = CTDB_FETCH_FUNC;
650         call.call_data.dptr = NULL;
651         call.call_data.dsize = 0;
652
653         ret = ctdb_call(ctdb_db, &call);
654
655         if (ret == 0) {
656                 *data = call.reply_data;
657                 talloc_steal(mem_ctx, data->dptr);
658         }
659
660         return ret;
661 }
662
663
664 struct ctdb_client_control_state {
665         struct ctdb_context *ctdb;
666         uint32_t reqid;
667         int32_t status;
668         TDB_DATA outdata;
669         enum call_state state;
670         char *errormsg;
671 };
672
673 /*
674   called when a CTDB_REPLY_CONTROL packet comes in in the client
675
676   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
677   contains any reply data from the control
678 */
679 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
680                                       struct ctdb_req_header *hdr)
681 {
682         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
683         struct ctdb_client_control_state *state;
684
685         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
686         if (state == NULL) {
687                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
688                 return;
689         }
690
691         if (hdr->reqid != state->reqid) {
692                 /* we found a record  but it was the wrong one */
693                 DEBUG(0, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
694                 return;
695         }
696
697         state->outdata.dptr = c->data;
698         state->outdata.dsize = c->datalen;
699         state->status = c->status;
700         if (c->errorlen) {
701                 state->errormsg = talloc_strndup(state, 
702                                                  (char *)&c->data[c->datalen], 
703                                                  c->errorlen);
704         }
705
706         talloc_steal(state, c);
707
708         state->state = CTDB_CALL_DONE;
709 }
710
711
712 /* time out handler for ctdb_control */
713 static void timeout_func(struct event_context *ev, struct timed_event *te, 
714         struct timeval t, void *private_data)
715 {
716         uint32_t *timed_out = (uint32_t *)private_data;
717
718         *timed_out = 1;
719 }
720
721 /*
722   destroy a ctdb_control in client
723 */
724 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
725 {
726         ctdb_reqid_remove(state->ctdb, state->reqid);
727         return 0;
728 }
729
730 /*
731   send a ctdb control message
732   timeout specifies how long we should wait for a reply.
733   if timeout is NULL we wait indefinitely
734  */
735 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
736                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
737                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
738                  struct timeval *timeout,
739                  char **errormsg)
740 {
741         struct ctdb_client_control_state *state;
742         struct ctdb_req_control *c;
743         size_t len;
744         int ret;
745         uint32_t timed_out;
746
747         if (errormsg) {
748                 *errormsg = NULL;
749         }
750
751         /* if the domain socket is not yet open, open it */
752         if (ctdb->daemon.sd==-1) {
753                 ctdb_socket_connect(ctdb);
754         }
755
756         state = talloc_zero(ctdb, struct ctdb_client_control_state);
757         CTDB_NO_MEMORY(ctdb, state);
758
759         state->ctdb  = ctdb;
760         state->reqid = ctdb_reqid_new(ctdb, state);
761         state->state = CTDB_CALL_WAIT;
762         state->errormsg = NULL;
763
764         talloc_set_destructor(state, ctdb_control_destructor);
765
766         len = offsetof(struct ctdb_req_control, data) + data.dsize;
767         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
768                                len, struct ctdb_req_control);
769         CTDB_NO_MEMORY(ctdb, c);
770         
771         c->hdr.reqid        = state->reqid;
772         c->hdr.destnode     = destnode;
773         c->hdr.reqid        = state->reqid;
774         c->opcode           = opcode;
775         c->client_id        = 0;
776         c->flags            = flags;
777         c->srvid            = srvid;
778         c->datalen          = data.dsize;
779         if (data.dsize) {
780                 memcpy(&c->data[0], data.dptr, data.dsize);
781         }
782
783         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
784         if (ret != 0) {
785                 talloc_free(state);
786                 return -1;
787         }
788
789         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
790                 talloc_free(state);
791                 return 0;
792         }
793
794         /* semi-async operation */
795         timed_out = 0;
796         if (timeout && !timeval_is_zero(timeout)) {
797                 event_add_timed(ctdb->ev, state, *timeout, timeout_func, &timed_out);
798         }
799         while ((state->state == CTDB_CALL_WAIT)
800         &&      (timed_out == 0) ){
801                 event_loop_once(ctdb->ev);
802         }
803         if (timed_out) {
804                 talloc_free(state);
805                 if (errormsg) {
806                         (*errormsg) = talloc_strdup(mem_ctx, "control timed out");
807                 } else {
808                         DEBUG(0,("ctdb_control timed out\n"));
809                 }
810                 return -1;
811         }
812
813         if (outdata) {
814                 *outdata = state->outdata;
815                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
816         }
817
818         *status = state->status;
819
820         if (!errormsg && state->errormsg) {
821                 DEBUG(0,("ctdb_control error: '%s'\n", state->errormsg));
822         }
823
824         if (errormsg && state->errormsg) {
825                 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
826         }
827
828         talloc_free(state);
829
830         return 0;       
831 }
832
833
834
835 /*
836   a process exists call. Returns 0 if process exists, -1 otherwise
837  */
838 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
839 {
840         int ret;
841         TDB_DATA data;
842         int32_t status;
843
844         data.dptr = (uint8_t*)&pid;
845         data.dsize = sizeof(pid);
846
847         ret = ctdb_control(ctdb, destnode, 0, 
848                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
849                            NULL, NULL, &status, NULL, NULL);
850         if (ret != 0) {
851                 DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
852                 return -1;
853         }
854
855         return status;
856 }
857
858 /*
859   get remote statistics
860  */
861 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
862 {
863         int ret;
864         TDB_DATA data;
865         int32_t res;
866
867         ret = ctdb_control(ctdb, destnode, 0, 
868                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
869                            ctdb, &data, &res, NULL, NULL);
870         if (ret != 0 || res != 0) {
871                 DEBUG(0,(__location__ " ctdb_control for statistics failed\n"));
872                 return -1;
873         }
874
875         if (data.dsize != sizeof(struct ctdb_statistics)) {
876                 DEBUG(0,(__location__ " Wrong statistics size %u - expected %u\n",
877                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
878                       return -1;
879         }
880
881         *status = *(struct ctdb_statistics *)data.dptr;
882         talloc_free(data.dptr);
883                         
884         return 0;
885 }
886
887 /*
888   shutdown a remote ctdb node
889  */
890 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
891 {
892         int ret;
893         int32_t res;
894
895         ret = ctdb_control(ctdb, destnode, 0, 
896                            CTDB_CONTROL_SHUTDOWN, CTDB_CTRL_FLAG_NOREPLY, tdb_null, 
897                            NULL, NULL, &res, &timeout, NULL);
898         if (ret != 0) {
899                 DEBUG(0,(__location__ " ctdb_control for shutdown failed\n"));
900                 return -1;
901         }
902
903         return 0;
904 }
905
906 /*
907   get vnn map from a remote node
908  */
909 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
910 {
911         int ret;
912         TDB_DATA outdata;
913         int32_t res;
914         struct ctdb_vnn_map_wire *map;
915
916         ret = ctdb_control(ctdb, destnode, 0, 
917                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
918                            mem_ctx, &outdata, &res, &timeout, NULL);
919         if (ret != 0 || res != 0) {
920                 DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
921                 return -1;
922         }
923         
924         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
925         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
926             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
927                 DEBUG(0,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
928                 return -1;
929         }
930
931         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
932         CTDB_NO_MEMORY(ctdb, *vnnmap);
933         (*vnnmap)->generation = map->generation;
934         (*vnnmap)->size       = map->size;
935         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
936
937         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
938         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
939         talloc_free(outdata.dptr);
940                     
941         return 0;
942 }
943
944 /*
945   get the recovery mode of a remote node
946  */
947 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
948 {
949         int ret;
950         int32_t res;
951
952         ret = ctdb_control(ctdb, destnode, 0, 
953                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
954                            NULL, NULL, &res, &timeout, NULL);
955         if (ret != 0) {
956                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
957                 return -1;
958         }
959
960         *recmode = res;
961
962         return 0;
963 }
964
965 /*
966   set the recovery mode of a remote node
967  */
968 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
969 {
970         int ret;
971         TDB_DATA data;
972         int32_t res;
973
974         data.dsize = sizeof(uint32_t);
975         data.dptr = (unsigned char *)&recmode;
976
977         ret = ctdb_control(ctdb, destnode, 0, 
978                            CTDB_CONTROL_SET_RECMODE, 0, data, 
979                            NULL, NULL, &res, &timeout, NULL);
980         if (ret != 0 || res != 0) {
981                 DEBUG(0,(__location__ " ctdb_control for setrecmode failed\n"));
982                 return -1;
983         }
984
985         return 0;
986 }
987
988 /*
989   get the recovery master of a remote node
990  */
991 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
992 {
993         int ret;
994         int32_t res;
995
996         ret = ctdb_control(ctdb, destnode, 0, 
997                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
998                            NULL, NULL, &res, &timeout, NULL);
999         if (ret != 0) {
1000                 DEBUG(0,(__location__ " ctdb_control for getrecmaster failed\n"));
1001                 return -1;
1002         }
1003
1004         *recmaster = res;
1005
1006         return 0;
1007 }
1008
1009 /*
1010   set the recovery master of a remote node
1011  */
1012 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1013 {
1014         int ret;
1015         TDB_DATA data;
1016         int32_t res;
1017
1018         ZERO_STRUCT(data);
1019         data.dsize = sizeof(uint32_t);
1020         data.dptr = (unsigned char *)&recmaster;
1021
1022         ret = ctdb_control(ctdb, destnode, 0, 
1023                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1024                            NULL, NULL, &res, &timeout, NULL);
1025         if (ret != 0 || res != 0) {
1026                 DEBUG(0,(__location__ " ctdb_control for setrecmaster failed\n"));
1027                 return -1;
1028         }
1029
1030         return 0;
1031 }
1032
1033
1034 /*
1035   get a list of databases off a remote node
1036  */
1037 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1038                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1039 {
1040         int ret;
1041         TDB_DATA outdata;
1042         int32_t res;
1043
1044         ret = ctdb_control(ctdb, destnode, 0, 
1045                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1046                            mem_ctx, &outdata, &res, &timeout, NULL);
1047         if (ret != 0 || res != 0) {
1048                 DEBUG(0,(__location__ " ctdb_control for getdbmap failed\n"));
1049                 return -1;
1050         }
1051
1052         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1053         talloc_free(outdata.dptr);
1054                     
1055         return 0;
1056 }
1057
1058
1059 /*
1060   get a list of nodes (vnn and flags ) from a remote node
1061  */
1062 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1063                 struct timeval timeout, uint32_t destnode, 
1064                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1065 {
1066         int ret;
1067         TDB_DATA outdata;
1068         int32_t res;
1069
1070         ret = ctdb_control(ctdb, destnode, 0, 
1071                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1072                            mem_ctx, &outdata, &res, &timeout, NULL);
1073         if (ret != 0 || res != 0) {
1074                 DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
1075                 return -1;
1076         }
1077
1078         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1079         talloc_free(outdata.dptr);
1080                     
1081         return 0;
1082 }
1083
1084 /*
1085   set vnn map on a node
1086  */
1087 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1088                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1089 {
1090         int ret;
1091         TDB_DATA data;
1092         int32_t res;
1093         struct ctdb_vnn_map_wire *map;
1094         size_t len;
1095
1096         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1097         map = talloc_size(mem_ctx, len);
1098         CTDB_NO_MEMORY_VOID(ctdb, map);
1099
1100         map->generation = vnnmap->generation;
1101         map->size = vnnmap->size;
1102         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1103         
1104         data.dsize = len;
1105         data.dptr  = (uint8_t *)map;
1106
1107         ret = ctdb_control(ctdb, destnode, 0, 
1108                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1109                            NULL, NULL, &res, &timeout, NULL);
1110         if (ret != 0 || res != 0) {
1111                 DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
1112                 return -1;
1113         }
1114
1115         talloc_free(map);
1116
1117         return 0;
1118 }
1119
1120 /*
1121   get all keys and records for a specific database
1122  */
1123 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster, 
1124                      TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
1125 {
1126         int i, ret;
1127         TDB_DATA indata, outdata;
1128         struct ctdb_control_pulldb pull;
1129         struct ctdb_control_pulldb_reply *reply;
1130         struct ctdb_rec_data *rec;
1131         int32_t res;
1132
1133         pull.db_id   = dbid;
1134         pull.lmaster = lmaster;
1135
1136         indata.dsize = sizeof(struct ctdb_control_pulldb);
1137         indata.dptr  = (unsigned char *)&pull;
1138
1139         ret = ctdb_control(ctdb, destnode, 0, 
1140                            CTDB_CONTROL_PULL_DB, 0, indata, 
1141                            mem_ctx, &outdata, &res, NULL, NULL);
1142         if (ret != 0 || res != 0) {
1143                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1144                 return -1;
1145         }
1146
1147
1148         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
1149         keys->dbid     = reply->db_id;
1150         keys->num      = reply->count;
1151         
1152         keys->keys     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1153         keys->headers  = talloc_array(mem_ctx, struct ctdb_ltdb_header, keys->num);
1154         keys->data     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1155
1156         rec = (struct ctdb_rec_data *)&reply->data[0];
1157
1158         for (i=0;i<reply->count;i++) {
1159                 keys->keys[i].dptr = talloc_memdup(mem_ctx, &rec->data[0], rec->keylen);
1160                 keys->keys[i].dsize = rec->keylen;
1161                 
1162                 keys->data[i].dptr = talloc_memdup(mem_ctx, &rec->data[keys->keys[i].dsize], rec->datalen);
1163                 keys->data[i].dsize = rec->datalen;
1164
1165                 if (keys->data[i].dsize < sizeof(struct ctdb_ltdb_header)) {
1166                         DEBUG(0,(__location__ " bad ltdb record\n"));
1167                         return -1;
1168                 }
1169                 memcpy(&keys->headers[i], keys->data[i].dptr, sizeof(struct ctdb_ltdb_header));
1170                 keys->data[i].dptr += sizeof(struct ctdb_ltdb_header);
1171                 keys->data[i].dsize -= sizeof(struct ctdb_ltdb_header);
1172
1173                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1174         }           
1175
1176         talloc_free(outdata.dptr);
1177
1178         return 0;
1179 }
1180
1181 /*
1182   copy a tdb from one node to another node
1183  */
1184 int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t sourcenode, 
1185                      uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
1186 {
1187         int ret;
1188         TDB_DATA indata, outdata;
1189         int32_t res;
1190
1191         indata.dsize = 2*sizeof(uint32_t);
1192         indata.dptr  = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1193
1194         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1195         ((uint32_t *)(&indata.dptr[0]))[1] = lmaster;
1196
1197         DEBUG(3,("pulling dbid 0x%x from %u\n", dbid, sourcenode));
1198
1199         ret = ctdb_control(ctdb, sourcenode, 0, 
1200                            CTDB_CONTROL_PULL_DB, 0, indata, 
1201                            mem_ctx, &outdata, &res, &timeout, NULL);
1202         if (ret != 0 || res != 0) {
1203                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1204                 return -1;
1205         }
1206
1207         DEBUG(3,("pushing dbid 0x%x to %u\n", dbid, destnode));
1208
1209         ret = ctdb_control(ctdb, destnode, 0, 
1210                            CTDB_CONTROL_PUSH_DB, 0, outdata, 
1211                            mem_ctx, NULL, &res, &timeout, NULL);
1212         talloc_free(outdata.dptr);
1213         if (ret != 0 || res != 0) {
1214                 DEBUG(0,(__location__ " ctdb_control for pushdb failed\n"));
1215                 return -1;
1216         }
1217
1218         DEBUG(3,("copydb for dbid 0x%x done for %u to %u\n", 
1219                  dbid, sourcenode, destnode));
1220
1221         return 0;
1222 }
1223
1224 /*
1225   change dmaster for all keys in the database to the new value
1226  */
1227 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1228                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1229 {
1230         int ret;
1231         TDB_DATA indata;
1232         int32_t res;
1233
1234         indata.dsize = 2*sizeof(uint32_t);
1235         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1236
1237         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1238         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1239
1240         ret = ctdb_control(ctdb, destnode, 0, 
1241                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1242                            NULL, NULL, &res, &timeout, NULL);
1243         if (ret != 0 || res != 0) {
1244                 DEBUG(0,(__location__ " ctdb_control for setdmaster failed\n"));
1245                 return -1;
1246         }
1247
1248         return 0;
1249 }
1250
1251 /*
1252   ping a node, return number of clients connected
1253  */
1254 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1255 {
1256         int ret;
1257         int32_t res;
1258
1259         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1260                            tdb_null, NULL, NULL, &res, NULL, NULL);
1261         if (ret != 0) {
1262                 return -1;
1263         }
1264         return res;
1265 }
1266
1267 /*
1268   find the real path to a ltdb 
1269  */
1270 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1271                    const char **path)
1272 {
1273         int ret;
1274         int32_t res;
1275         TDB_DATA data;
1276
1277         data.dptr = (uint8_t *)&dbid;
1278         data.dsize = sizeof(dbid);
1279
1280         ret = ctdb_control(ctdb, destnode, 0, 
1281                            CTDB_CONTROL_GETDBPATH, 0, data, 
1282                            mem_ctx, &data, &res, &timeout, NULL);
1283         if (ret != 0 || res != 0) {
1284                 return -1;
1285         }
1286
1287         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1288         if ((*path) == NULL) {
1289                 return -1;
1290         }
1291
1292         talloc_free(data.dptr);
1293
1294         return 0;
1295 }
1296
1297 /*
1298   find the name of a db 
1299  */
1300 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1301                    const char **name)
1302 {
1303         int ret;
1304         int32_t res;
1305         TDB_DATA data;
1306
1307         data.dptr = (uint8_t *)&dbid;
1308         data.dsize = sizeof(dbid);
1309
1310         ret = ctdb_control(ctdb, destnode, 0, 
1311                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1312                            mem_ctx, &data, &res, &timeout, NULL);
1313         if (ret != 0 || res != 0) {
1314                 return -1;
1315         }
1316
1317         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1318         if ((*name) == NULL) {
1319                 return -1;
1320         }
1321
1322         talloc_free(data.dptr);
1323
1324         return 0;
1325 }
1326
1327 /*
1328   create a database
1329  */
1330 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name)
1331 {
1332         int ret;
1333         int32_t res;
1334         TDB_DATA data;
1335
1336         data.dptr = discard_const(name);
1337         data.dsize = strlen(name)+1;
1338
1339         ret = ctdb_control(ctdb, destnode, 0, 
1340                            CTDB_CONTROL_DB_ATTACH, 0, data, 
1341                            mem_ctx, &data, &res, &timeout, NULL);
1342
1343         if (ret != 0 || res != 0) {
1344                 return -1;
1345         }
1346
1347         return 0;
1348 }
1349
1350 /*
1351   get debug level on a node
1352  */
1353 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *level)
1354 {
1355         int ret;
1356         int32_t res;
1357         TDB_DATA data;
1358
1359         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1360                            ctdb, &data, &res, NULL, NULL);
1361         if (ret != 0 || res != 0) {
1362                 return -1;
1363         }
1364         if (data.dsize != sizeof(uint32_t)) {
1365                 DEBUG(0,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1366                          (unsigned)data.dsize));
1367                 return -1;
1368         }
1369         *level = *(uint32_t *)data.dptr;
1370         talloc_free(data.dptr);
1371         return 0;
1372 }
1373
1374 /*
1375   set debug level on a node
1376  */
1377 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t level)
1378 {
1379         int ret;
1380         int32_t res;
1381         TDB_DATA data;
1382
1383         data.dptr = (uint8_t *)&level;
1384         data.dsize = sizeof(level);
1385
1386         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1387                            NULL, NULL, &res, NULL, NULL);
1388         if (ret != 0 || res != 0) {
1389                 return -1;
1390         }
1391         return 0;
1392 }
1393
1394
1395 /*
1396   get a list of connected nodes
1397  */
1398 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1399                                 struct timeval timeout,
1400                                 TALLOC_CTX *mem_ctx,
1401                                 uint32_t *num_nodes)
1402 {
1403         struct ctdb_node_map *map=NULL;
1404         int ret, i;
1405         uint32_t *nodes;
1406
1407         *num_nodes = 0;
1408
1409         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1410         if (ret != 0) {
1411                 return NULL;
1412         }
1413
1414         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1415         if (nodes == NULL) {
1416                 return NULL;
1417         }
1418
1419         for (i=0;i<map->num;i++) {
1420                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1421                         nodes[*num_nodes] = map->nodes[i].vnn;
1422                         (*num_nodes)++;
1423                 }
1424         }
1425
1426         return nodes;
1427 }
1428
1429
1430 /*
1431   reset remote status
1432  */
1433 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1434 {
1435         int ret;
1436         int32_t res;
1437
1438         ret = ctdb_control(ctdb, destnode, 0, 
1439                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1440                            NULL, NULL, &res, NULL, NULL);
1441         if (ret != 0 || res != 0) {
1442                 DEBUG(0,(__location__ " ctdb_control for reset statistics failed\n"));
1443                 return -1;
1444         }
1445         return 0;
1446 }
1447
1448
1449 /*
1450   attach to a specific database - client call
1451 */
1452 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
1453 {
1454         struct ctdb_db_context *ctdb_db;
1455         TDB_DATA data;
1456         int ret;
1457         int32_t res;
1458
1459         ctdb_db = ctdb_db_handle(ctdb, name);
1460         if (ctdb_db) {
1461                 return ctdb_db;
1462         }
1463
1464         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1465         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1466
1467         ctdb_db->ctdb = ctdb;
1468         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1469         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1470
1471         data.dptr = discard_const(name);
1472         data.dsize = strlen(name)+1;
1473
1474         /* tell ctdb daemon to attach */
1475         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
1476                            0, data, ctdb_db, &data, &res, NULL, NULL);
1477         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1478                 DEBUG(0,("Failed to attach to database '%s'\n", name));
1479                 talloc_free(ctdb_db);
1480                 return NULL;
1481         }
1482         
1483         ctdb_db->db_id = *(uint32_t *)data.dptr;
1484         talloc_free(data.dptr);
1485
1486         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1487         if (ret != 0) {
1488                 DEBUG(0,("Failed to get dbpath for database '%s'\n", name));
1489                 talloc_free(ctdb_db);
1490                 return NULL;
1491         }
1492
1493         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 0, O_RDWR, 0);
1494         if (ctdb_db->ltdb == NULL) {
1495                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1496                 talloc_free(ctdb_db);
1497                 return NULL;
1498         }
1499
1500         DLIST_ADD(ctdb->db_list, ctdb_db);
1501
1502         return ctdb_db;
1503 }
1504
1505
1506 /*
1507   setup a call for a database
1508  */
1509 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1510 {
1511         TDB_DATA data;
1512         int32_t status;
1513         struct ctdb_control_set_call c;
1514         int ret;
1515         struct ctdb_registered_call *call;
1516
1517         c.db_id = ctdb_db->db_id;
1518         c.fn    = fn;
1519         c.id    = id;
1520
1521         data.dptr = (uint8_t *)&c;
1522         data.dsize = sizeof(c);
1523
1524         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1525                            data, NULL, NULL, &status, NULL, NULL);
1526         if (ret != 0 || status != 0) {
1527                 DEBUG(0,("ctdb_set_call failed for call %u\n", id));
1528                 return -1;
1529         }
1530
1531         /* also register locally */
1532         call = talloc(ctdb_db, struct ctdb_registered_call);
1533         call->fn = fn;
1534         call->id = id;
1535
1536         DLIST_ADD(ctdb_db->calls, call);        
1537         return 0;
1538 }
1539
1540
1541 struct traverse_state {
1542         bool done;
1543         uint32_t count;
1544         ctdb_traverse_func fn;
1545         void *private_data;
1546 };
1547
1548 /*
1549   called on each key during a ctdb_traverse
1550  */
1551 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1552 {
1553         struct traverse_state *state = (struct traverse_state *)p;
1554         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1555         TDB_DATA key;
1556
1557         if (data.dsize < sizeof(uint32_t) ||
1558             d->length != data.dsize) {
1559                 DEBUG(0,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1560                 state->done = True;
1561                 return;
1562         }
1563
1564         key.dsize = d->keylen;
1565         key.dptr  = &d->data[0];
1566         data.dsize = d->datalen;
1567         data.dptr = &d->data[d->keylen];
1568
1569         if (key.dsize == 0 && data.dsize == 0) {
1570                 /* end of traverse */
1571                 state->done = True;
1572                 return;
1573         }
1574
1575         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1576                 state->done = True;
1577         }
1578
1579         state->count++;
1580 }
1581
1582
1583 /*
1584   start a cluster wide traverse, calling the supplied fn on each record
1585   return the number of records traversed, or -1 on error
1586  */
1587 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1588 {
1589         TDB_DATA data;
1590         struct ctdb_traverse_start t;
1591         int32_t status;
1592         int ret;
1593         uint64_t srvid = (getpid() | 0xFLL<<60);
1594         struct traverse_state state;
1595
1596         state.done = False;
1597         state.count = 0;
1598         state.private_data = private_data;
1599         state.fn = fn;
1600
1601         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1602         if (ret != 0) {
1603                 DEBUG(0,("Failed to setup traverse handler\n"));
1604                 return -1;
1605         }
1606
1607         t.db_id = ctdb_db->db_id;
1608         t.srvid = srvid;
1609         t.reqid = 0;
1610
1611         data.dptr = (uint8_t *)&t;
1612         data.dsize = sizeof(t);
1613
1614         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1615                            data, NULL, NULL, &status, NULL, NULL);
1616         if (ret != 0 || status != 0) {
1617                 DEBUG(0,("ctdb_traverse_all failed\n"));
1618                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1619                 return -1;
1620         }
1621
1622         while (!state.done) {
1623                 event_loop_once(ctdb_db->ctdb->ev);
1624         }
1625
1626         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1627         if (ret != 0) {
1628                 DEBUG(0,("Failed to remove ctdb_traverse handler\n"));
1629                 return -1;
1630         }
1631
1632         return state.count;
1633 }
1634
1635 /*
1636   called on each key during a catdb
1637  */
1638 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1639 {
1640         FILE *f = (FILE *)p;
1641         char *keystr, *datastr;
1642         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1643
1644         keystr  = hex_encode_talloc(ctdb, key.dptr, key.dsize);
1645         datastr = hex_encode_talloc(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
1646
1647         fprintf(f, "dmaster: %u\n", h->dmaster);
1648         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1649         fprintf(f, "key: %s\ndata: %s\n", keystr, datastr);
1650
1651         talloc_free(keystr);
1652         talloc_free(datastr);
1653         return 0;
1654 }
1655
1656 /*
1657   convenience function to list all keys to stdout
1658  */
1659 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1660 {
1661         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1662 }
1663
1664 /*
1665   get the pid of a ctdb daemon
1666  */
1667 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1668 {
1669         int ret;
1670         int32_t res;
1671
1672         ret = ctdb_control(ctdb, destnode, 0, 
1673                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1674                            NULL, NULL, &res, &timeout, NULL);
1675         if (ret != 0) {
1676                 DEBUG(0,(__location__ " ctdb_control for getpid failed\n"));
1677                 return -1;
1678         }
1679
1680         *pid = res;
1681
1682         return 0;
1683 }
1684
1685
1686 /*
1687   freeze a node
1688  */
1689 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1690 {
1691         int ret;
1692         int32_t res;
1693
1694         ret = ctdb_control(ctdb, destnode, 0, 
1695                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1696                            NULL, NULL, &res, &timeout, NULL);
1697         if (ret != 0 || res != 0) {
1698                 DEBUG(0,(__location__ " ctdb_control freeze failed\n"));
1699                 return -1;
1700         }
1701
1702         return 0;
1703 }
1704
1705 /*
1706   thaw a node
1707  */
1708 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1709 {
1710         int ret;
1711         int32_t res;
1712
1713         ret = ctdb_control(ctdb, destnode, 0, 
1714                            CTDB_CONTROL_THAW, 0, tdb_null, 
1715                            NULL, NULL, &res, &timeout, NULL);
1716         if (ret != 0 || res != 0) {
1717                 DEBUG(0,(__location__ " ctdb_control thaw failed\n"));
1718                 return -1;
1719         }
1720
1721         return 0;
1722 }
1723
1724 /*
1725   get vnn of a node, or -1
1726  */
1727 int ctdb_ctrl_getvnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1728 {
1729         int ret;
1730         int32_t res;
1731
1732         ret = ctdb_control(ctdb, destnode, 0, 
1733                            CTDB_CONTROL_GET_VNN, 0, tdb_null, 
1734                            NULL, NULL, &res, &timeout, NULL);
1735         if (ret != 0) {
1736                 DEBUG(0,(__location__ " ctdb_control for getvnn failed\n"));
1737                 return -1;
1738         }
1739
1740         return res;
1741 }
1742
1743 /*
1744   set the monitoring mode of a remote node
1745  */
1746 int ctdb_ctrl_setmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t monmode)
1747 {
1748         int ret;
1749         TDB_DATA data;
1750         int32_t res;
1751
1752         data.dsize = sizeof(uint32_t);
1753         data.dptr = (uint8_t *)&monmode;
1754
1755         ret = ctdb_control(ctdb, destnode, 0, 
1756                            CTDB_CONTROL_SET_MONMODE, 0, data, 
1757                            NULL, NULL, &res, &timeout, NULL);
1758         if (ret != 0 || res != 0) {
1759                 DEBUG(0,(__location__ " ctdb_control for setmonmode failed\n"));
1760                 return -1;
1761         }
1762
1763         return 0;
1764 }
1765
1766 /*
1767   get the monitoring mode of a remote node
1768  */
1769 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1770 {
1771         int ret;
1772         int32_t res;
1773
1774         ret = ctdb_control(ctdb, destnode, 0, 
1775                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1776                            NULL, NULL, &res, &timeout, NULL);
1777         if (ret != 0) {
1778                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
1779                 return -1;
1780         }
1781
1782         *monmode = res;
1783
1784         return 0;
1785 }
1786
1787
1788 /*
1789   get maximum rsn for a db on a node
1790  */
1791 int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1792                           uint32_t destnode, uint32_t db_id, uint64_t *max_rsn)
1793 {
1794         TDB_DATA data, outdata;
1795         int ret;
1796         int32_t res;
1797
1798         data.dptr = (uint8_t *)&db_id;
1799         data.dsize = sizeof(db_id);
1800
1801         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_MAX_RSN, 0, data, ctdb,
1802                            &outdata, &res, &timeout, NULL);
1803         if (ret != 0 || res != 0 || outdata.dsize != sizeof(uint64_t)) {
1804                 DEBUG(0,(__location__ " ctdb_control for get_max_rsn failed\n"));
1805                 return -1;
1806         }
1807
1808         *max_rsn = *(uint64_t *)outdata.dptr;
1809         talloc_free(outdata.dptr);
1810
1811         return 0;       
1812 }
1813
1814 /*
1815   set the rsn on non-empty records to the given rsn
1816  */
1817 int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
1818                                uint32_t destnode, uint32_t db_id, uint64_t rsn)
1819 {
1820         TDB_DATA data;
1821         int ret;
1822         int32_t res;
1823         struct ctdb_control_set_rsn_nonempty p;
1824
1825         p.db_id = db_id;
1826         p.rsn = rsn;
1827
1828         data.dptr = (uint8_t *)&p;
1829         data.dsize = sizeof(p);
1830
1831         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, NULL,
1832                            NULL, &res, &timeout, NULL);
1833         if (ret != 0 || res != 0) {
1834                 DEBUG(0,(__location__ " ctdb_control for set_rsn_nonempty failed\n"));
1835                 return -1;
1836         }
1837
1838         return 0;       
1839 }
1840
1841 /*
1842   delete records which have a rsn below the given rsn
1843  */
1844 int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1845                              uint32_t destnode, uint32_t db_id, uint64_t rsn)
1846 {
1847         TDB_DATA data;
1848         int ret;
1849         int32_t res;
1850         struct ctdb_control_delete_low_rsn p;
1851
1852         p.db_id = db_id;
1853         p.rsn = rsn;
1854
1855         data.dptr = (uint8_t *)&p;
1856         data.dsize = sizeof(p);
1857
1858         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, NULL,
1859                            NULL, &res, &timeout, NULL);
1860         if (ret != 0 || res != 0) {
1861                 DEBUG(0,(__location__ " ctdb_control for delete_low_rsn failed\n"));
1862                 return -1;
1863         }
1864
1865         return 0;       
1866 }
1867
1868 /* 
1869   sent to a node to make it take over an ip address
1870 */
1871 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1872                           uint32_t destnode, struct ctdb_public_ip *ip)
1873 {
1874         TDB_DATA data;
1875         int ret;
1876         int32_t res;
1877
1878         data.dsize = sizeof(*ip);
1879         data.dptr  = (uint8_t *)ip;
1880
1881         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1882                            NULL, &res, &timeout, NULL);
1883
1884         if (ret != 0 || res != 0) {
1885                 DEBUG(0,(__location__ " ctdb_control for takeover_ip failed\n"));
1886                 return -1;
1887         }
1888
1889         return 0;       
1890 }
1891
1892
1893 /* 
1894   sent to a node to make it release an ip address
1895 */
1896 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1897                          uint32_t destnode, struct ctdb_public_ip *ip)
1898 {
1899         TDB_DATA data;
1900         int ret;
1901         int32_t res;
1902
1903         data.dsize = sizeof(*ip);
1904         data.dptr  = (uint8_t *)ip;
1905
1906         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1907                            NULL, &res, &timeout, NULL);
1908
1909         if (ret != 0 || res != 0) {
1910                 DEBUG(0,(__location__ " ctdb_control for release_ip failed\n"));
1911                 return -1;
1912         }
1913
1914         return 0;       
1915 }
1916
1917
1918 /*
1919   get a tunable
1920  */
1921 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1922                           struct timeval timeout, 
1923                           uint32_t destnode,
1924                           const char *name, uint32_t *value)
1925 {
1926         struct ctdb_control_get_tunable *t;
1927         TDB_DATA data, outdata;
1928         int32_t res;
1929         int ret;
1930
1931         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1932         data.dptr  = talloc_size(ctdb, data.dsize);
1933         CTDB_NO_MEMORY(ctdb, data.dptr);
1934
1935         t = (struct ctdb_control_get_tunable *)data.dptr;
1936         t->length = strlen(name)+1;
1937         memcpy(t->name, name, t->length);
1938
1939         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1940                            &outdata, &res, &timeout, NULL);
1941         talloc_free(data.dptr);
1942         if (ret != 0 || res != 0) {
1943                 DEBUG(0,(__location__ " ctdb_control for get_tunable failed\n"));
1944                 return -1;
1945         }
1946
1947         if (outdata.dsize != sizeof(uint32_t)) {
1948                 DEBUG(0,("Invalid return data in get_tunable\n"));
1949                 talloc_free(outdata.dptr);
1950                 return -1;
1951         }
1952         
1953         *value = *(uint32_t *)outdata.dptr;
1954         talloc_free(outdata.dptr);
1955
1956         return 0;
1957 }
1958
1959 /*
1960   set a tunable
1961  */
1962 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1963                           struct timeval timeout, 
1964                           uint32_t destnode,
1965                           const char *name, uint32_t value)
1966 {
1967         struct ctdb_control_set_tunable *t;
1968         TDB_DATA data;
1969         int32_t res;
1970         int ret;
1971
1972         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1973         data.dptr  = talloc_size(ctdb, data.dsize);
1974         CTDB_NO_MEMORY(ctdb, data.dptr);
1975
1976         t = (struct ctdb_control_set_tunable *)data.dptr;
1977         t->length = strlen(name)+1;
1978         memcpy(t->name, name, t->length);
1979         t->value = value;
1980
1981         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1982                            NULL, &res, &timeout, NULL);
1983         talloc_free(data.dptr);
1984         if (ret != 0 || res != 0) {
1985                 DEBUG(0,(__location__ " ctdb_control for set_tunable failed\n"));
1986                 return -1;
1987         }
1988
1989         return 0;
1990 }
1991
1992 /*
1993   list tunables
1994  */
1995 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1996                             struct timeval timeout, 
1997                             uint32_t destnode,
1998                             TALLOC_CTX *mem_ctx,
1999                             const char ***list, uint32_t *count)
2000 {
2001         TDB_DATA outdata;
2002         int32_t res;
2003         int ret;
2004         struct ctdb_control_list_tunable *t;
2005         char *p, *s, *ptr;
2006
2007         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2008                            mem_ctx, &outdata, &res, &timeout, NULL);
2009         if (ret != 0 || res != 0) {
2010                 DEBUG(0,(__location__ " ctdb_control for list_tunables failed\n"));
2011                 return -1;
2012         }
2013
2014         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2015         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2016             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2017                 DEBUG(0,("Invalid data in list_tunables reply\n"));
2018                 talloc_free(outdata.dptr);
2019                 return -1;              
2020         }
2021         
2022         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2023         CTDB_NO_MEMORY(ctdb, p);
2024
2025         talloc_free(outdata.dptr);
2026         
2027         (*list) = NULL;
2028         (*count) = 0;
2029
2030         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2031                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2032                 CTDB_NO_MEMORY(ctdb, *list);
2033                 (*list)[*count] = talloc_strdup(*list, s);
2034                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2035                 (*count)++;
2036         }
2037
2038         talloc_free(p);
2039
2040         return 0;
2041 }
2042
2043
2044 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2045                         struct timeval timeout, uint32_t destnode, 
2046                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2047 {
2048         int ret;
2049         TDB_DATA outdata;
2050         int32_t res;
2051
2052         ret = ctdb_control(ctdb, destnode, 0, 
2053                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2054                            mem_ctx, &outdata, &res, &timeout, NULL);
2055         if (ret != 0 || res != 0) {
2056                 DEBUG(0,(__location__ " ctdb_control for getpublicips failed\n"));
2057                 return -1;
2058         }
2059
2060         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2061         talloc_free(outdata.dptr);
2062                     
2063         return 0;
2064 }
2065
2066 /*
2067   set/clear the permanent disabled bit on a remote node
2068  */
2069 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2070                        uint32_t set, uint32_t clear)
2071 {
2072         int ret;
2073         TDB_DATA data;
2074         struct ctdb_node_modflags m;
2075         int32_t res;
2076
2077         m.set = set;
2078         m.clear = clear;
2079
2080         data.dsize = sizeof(m);
2081         data.dptr = (unsigned char *)&m;
2082
2083         ret = ctdb_control(ctdb, destnode, 0, 
2084                            CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
2085                            NULL, NULL, &res, &timeout, NULL);
2086         if (ret != 0 || res != 0) {
2087                 DEBUG(0,(__location__ " ctdb_control for modflags failed\n"));
2088                 return -1;
2089         }
2090
2091         return 0;
2092 }
2093
2094
2095 /*
2096   get all tunables
2097  */
2098 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2099                                struct timeval timeout, 
2100                                uint32_t destnode,
2101                                struct ctdb_tunable *tunables)
2102 {
2103         TDB_DATA outdata;
2104         int ret;
2105         int32_t res;
2106
2107         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2108                            &outdata, &res, &timeout, NULL);
2109         if (ret != 0 || res != 0) {
2110                 DEBUG(0,(__location__ " ctdb_control for get all tunables failed\n"));
2111                 return -1;
2112         }
2113
2114         if (outdata.dsize != sizeof(*tunables)) {
2115                 DEBUG(0,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2116                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2117                 return -1;              
2118         }
2119
2120         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2121         talloc_free(outdata.dptr);
2122         return 0;
2123 }
2124
2125
2126 /*
2127   kill a tcp connection
2128  */
2129 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2130                       struct timeval timeout, 
2131                       uint32_t destnode,
2132                       struct ctdb_control_killtcp *killtcp)
2133 {
2134         TDB_DATA data;
2135         int32_t res;
2136         int ret;
2137
2138         data.dsize = sizeof(struct ctdb_control_killtcp);
2139         data.dptr  = (unsigned char *)killtcp;
2140
2141         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2142                            NULL, &res, &timeout, NULL);
2143         if (ret != 0 || res != 0) {
2144                 DEBUG(0,(__location__ " ctdb_control for killtcp failed\n"));
2145                 return -1;
2146         }
2147
2148         return 0;
2149 }
2150
2151 /*
2152   get a list of all tcp tickles that a node knows about for a particular vnn
2153  */
2154 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2155                               struct timeval timeout, uint32_t destnode, 
2156                               TALLOC_CTX *mem_ctx, uint32_t vnn,
2157                               struct ctdb_control_tcp_tickle_list **list)
2158 {
2159         int ret;
2160         TDB_DATA data, outdata;
2161         int32_t status;
2162
2163         data.dptr = (uint8_t*)&vnn;
2164         data.dsize = sizeof(vnn);
2165
2166         ret = ctdb_control(ctdb, destnode, 0, 
2167                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2168                            mem_ctx, &outdata, &status, NULL, NULL);
2169         if (ret != 0) {
2170                 DEBUG(0,(__location__ " ctdb_control for get tcp tickles failed\n"));
2171                 return -1;
2172         }
2173
2174         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2175
2176         return status;
2177 }
2178
2179 /*
2180   initialise the ctdb daemon for client applications
2181
2182   NOTE: In current code the daemon does not fork. This is for testing purposes only
2183   and to simplify the code.
2184 */
2185 struct ctdb_context *ctdb_init(struct event_context *ev)
2186 {
2187         struct ctdb_context *ctdb;
2188
2189         ctdb = talloc_zero(ev, struct ctdb_context);
2190         ctdb->ev  = ev;
2191         ctdb->idr = idr_init(ctdb);
2192         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2193
2194         ctdb_set_socketname(ctdb, CTDB_PATH);
2195
2196         return ctdb;
2197 }
2198
2199
2200 /*
2201   set some ctdb flags
2202 */
2203 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2204 {
2205         ctdb->flags |= flags;
2206 }
2207
2208 /*
2209   setup the local socket name
2210 */
2211 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2212 {
2213         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2214         return 0;
2215 }
2216
2217 /*
2218   return the vnn of this node
2219 */
2220 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
2221 {
2222         return ctdb->vnn;
2223 }
2224