update lib/replace from samba4
[vlendec/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
30
31 /*
32   allocate a packet for use in client<->daemon communication
33  */
34 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
35                                             TALLOC_CTX *mem_ctx, 
36                                             enum ctdb_operation operation, 
37                                             size_t length, size_t slength,
38                                             const char *type)
39 {
40         int size;
41         struct ctdb_req_header *hdr;
42
43         length = MAX(length, slength);
44         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
45
46         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
47         if (hdr == NULL) {
48                 DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
49                          operation, (unsigned)length));
50                 return NULL;
51         }
52         talloc_set_name_const(hdr, type);
53         memset(hdr, 0, slength);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_VERSION;
58         hdr->srcnode      = ctdb->vnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, uint32_t caller)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88
89         for (fn=ctdb_db->calls;fn;fn=fn->next) {
90                 if (fn->id == call->call_id) break;
91         }
92         if (fn == NULL) {
93                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
94                 talloc_free(c);
95                 return -1;
96         }
97
98         if (fn->fn(c) != 0) {
99                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
100                 talloc_free(c);
101                 return -1;
102         }
103
104         if (header->laccessor != caller) {
105                 header->lacount = 0;
106         }
107         header->laccessor = caller;
108         header->lacount++;
109
110         /* we need to force the record to be written out if this was a remote access,
111            so that the lacount is updated */
112         if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
113                 c->new_data = &c->record_data;
114         }
115
116         if (c->new_data) {
117                 /* XXX check that we always have the lock here? */
118                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
119                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
120                         talloc_free(c);
121                         return -1;
122                 }
123         }
124
125         if (c->reply_data) {
126                 call->reply_data = *c->reply_data;
127                 talloc_steal(ctdb, call->reply_data.dptr);
128                 talloc_set_name_const(call->reply_data.dptr, __location__);
129         } else {
130                 call->reply_data.dptr = NULL;
131                 call->reply_data.dsize = 0;
132         }
133         call->status = c->status;
134
135         talloc_free(c);
136
137         return 0;
138 }
139
140
141 /*
142   queue a packet for sending from client to daemon
143 */
144 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
145 {
146         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
147 }
148
149
150 /*
151   state of a in-progress ctdb call in client
152 */
153 struct ctdb_client_call_state {
154         enum call_state state;
155         uint32_t reqid;
156         struct ctdb_db_context *ctdb_db;
157         struct ctdb_call call;
158 };
159
160 /*
161   called when a CTDB_REPLY_CALL packet comes in in the client
162
163   This packet comes in response to a CTDB_REQ_CALL request packet. It
164   contains any reply data from the call
165 */
166 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
167 {
168         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
169         struct ctdb_client_call_state *state;
170
171         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
172         if (state == NULL) {
173                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
174                 return;
175         }
176
177         if (hdr->reqid != state->reqid) {
178                 /* we found a record  but it was the wrong one */
179                 DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
180                 return;
181         }
182
183         state->call.reply_data.dptr = c->data;
184         state->call.reply_data.dsize = c->datalen;
185         state->call.status = c->status;
186
187         talloc_steal(state, c);
188
189         state->state = CTDB_CALL_DONE;
190 }
191
192 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
193
194 /*
195   this is called in the client, when data comes in from the daemon
196  */
197 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
198 {
199         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
200         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
201         TALLOC_CTX *tmp_ctx;
202
203         /* place the packet as a child of a tmp_ctx. We then use
204            talloc_free() below to free it. If any of the calls want
205            to keep it, then they will steal it somewhere else, and the
206            talloc_free() will be a no-op */
207         tmp_ctx = talloc_new(ctdb);
208         talloc_steal(tmp_ctx, hdr);
209
210         if (cnt == 0) {
211                 DEBUG(2,("Daemon has exited - shutting down client\n"));
212                 exit(0);
213         }
214
215         if (cnt < sizeof(*hdr)) {
216                 DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt));
217                 goto done;
218         }
219         if (cnt != hdr->length) {
220                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
221                                (unsigned)hdr->length, (unsigned)cnt);
222                 goto done;
223         }
224
225         if (hdr->ctdb_magic != CTDB_MAGIC) {
226                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
227                 goto done;
228         }
229
230         if (hdr->ctdb_version != CTDB_VERSION) {
231                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
232                 goto done;
233         }
234
235         switch (hdr->operation) {
236         case CTDB_REPLY_CALL:
237                 ctdb_client_reply_call(ctdb, hdr);
238                 break;
239
240         case CTDB_REQ_MESSAGE:
241                 ctdb_request_message(ctdb, hdr);
242                 break;
243
244         case CTDB_REPLY_CONTROL:
245                 ctdb_client_reply_control(ctdb, hdr);
246                 break;
247
248         default:
249                 DEBUG(0,("bogus operation code:%u\n",hdr->operation));
250         }
251
252 done:
253         talloc_free(tmp_ctx);
254 }
255
256 /*
257   connect to a unix domain socket
258 */
259 int ctdb_socket_connect(struct ctdb_context *ctdb)
260 {
261         struct sockaddr_un addr;
262
263         memset(&addr, 0, sizeof(addr));
264         addr.sun_family = AF_UNIX;
265         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
266
267         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
268         if (ctdb->daemon.sd == -1) {
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 return -1;
279         }
280
281         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
282                                               CTDB_DS_ALIGNMENT, 
283                                               ctdb_client_read_cb, ctdb);
284         return 0;
285 }
286
287
288 struct ctdb_record_handle {
289         struct ctdb_db_context *ctdb_db;
290         TDB_DATA key;
291         TDB_DATA *data;
292         struct ctdb_ltdb_header header;
293 };
294
295
296 /*
297   make a recv call to the local ctdb daemon - called from client context
298
299   This is called when the program wants to wait for a ctdb_call to complete and get the 
300   results. This call will block unless the call has already completed.
301 */
302 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
303 {
304         while (state->state < CTDB_CALL_DONE) {
305                 event_loop_once(state->ctdb_db->ctdb->ev);
306         }
307         if (state->state != CTDB_CALL_DONE) {
308                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
309                 talloc_free(state);
310                 return -1;
311         }
312
313         if (state->call.reply_data.dsize) {
314                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
315                                                       state->call.reply_data.dptr,
316                                                       state->call.reply_data.dsize);
317                 call->reply_data.dsize = state->call.reply_data.dsize;
318         } else {
319                 call->reply_data.dptr = NULL;
320                 call->reply_data.dsize = 0;
321         }
322         call->status = state->call.status;
323         talloc_free(state);
324
325         return 0;
326 }
327
328
329
330
331 /*
332   destroy a ctdb_call in client
333 */
334 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
335 {
336         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
337         return 0;
338 }
339
340 /*
341   construct an event driven local ctdb_call
342
343   this is used so that locally processed ctdb_call requests are processed
344   in an event driven manner
345 */
346 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
347                                                                   struct ctdb_call *call,
348                                                                   struct ctdb_ltdb_header *header,
349                                                                   TDB_DATA *data)
350 {
351         struct ctdb_client_call_state *state;
352         struct ctdb_context *ctdb = ctdb_db->ctdb;
353         int ret;
354
355         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
356         CTDB_NO_MEMORY_NULL(ctdb, state);
357
358         talloc_steal(state, data->dptr);
359
360         state->state = CTDB_CALL_DONE;
361         state->call = *call;
362         state->ctdb_db = ctdb_db;
363
364         ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn);
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                        struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if (ret == 0 && header.dmaster == ctdb->vnn) {
400                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
401                 talloc_free(data.dptr);
402                 ctdb_ltdb_unlock(ctdb_db, call->key);
403                 return state;
404         }
405
406         ctdb_ltdb_unlock(ctdb_db, call->key);
407         talloc_free(data.dptr);
408
409         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
410         if (state == NULL) {
411                 DEBUG(0, (__location__ " failed to allocate state\n"));
412                 return NULL;
413         }
414
415         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
416         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
417         if (c == NULL) {
418                 DEBUG(0, (__location__ " failed to allocate packet\n"));
419                 return NULL;
420         }
421
422         state->reqid     = ctdb_reqid_new(ctdb, state);
423         state->ctdb_db = ctdb_db;
424         talloc_set_destructor(state, ctdb_client_call_destructor);
425
426         c->hdr.reqid     = state->reqid;
427         c->flags         = call->flags;
428         c->db_id         = ctdb_db->db_id;
429         c->callid        = call->call_id;
430         c->hopcount      = 0;
431         c->keylen        = call->key.dsize;
432         c->calldatalen   = call->call_data.dsize;
433         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
434         memcpy(&c->data[call->key.dsize], 
435                call->call_data.dptr, call->call_data.dsize);
436         state->call                = *call;
437         state->call.call_data.dptr = &c->data[call->key.dsize];
438         state->call.key.dptr       = &c->data[0];
439
440         state->state  = CTDB_CALL_WAIT;
441
442
443         ctdb_client_queue_pkt(ctdb, &c->hdr);
444
445         return state;
446 }
447
448
449 /*
450   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
451 */
452 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
453 {
454         struct ctdb_client_call_state *state;
455
456         state = ctdb_call_send(ctdb_db, call);
457         return ctdb_call_recv(state, call);
458 }
459
460
461 /*
462   tell the daemon what messaging srvid we will use, and register the message
463   handler function in the client
464 */
465 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
466                              ctdb_message_fn_t handler,
467                              void *private_data)
468                                     
469 {
470         int res;
471         int32_t status;
472         
473         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
474                            tdb_null, NULL, NULL, &status, NULL, NULL);
475         if (res != 0 || status != 0) {
476                 DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));
477                 return -1;
478         }
479
480         /* also need to register the handler with our own ctdb structure */
481         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
482 }
483
484 /*
485   tell the daemon we no longer want a srvid
486 */
487 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
488 {
489         int res;
490         int32_t status;
491         
492         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
493                            tdb_null, NULL, NULL, &status, NULL, NULL);
494         if (res != 0 || status != 0) {
495                 DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
496                 return -1;
497         }
498
499         /* also need to register the handler with our own ctdb structure */
500         ctdb_deregister_message_handler(ctdb, srvid, private_data);
501         return 0;
502 }
503
504
505 /*
506   send a message - from client context
507  */
508 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
509                       uint64_t srvid, TDB_DATA data)
510 {
511         struct ctdb_req_message *r;
512         int len, res;
513
514         len = offsetof(struct ctdb_req_message, data) + data.dsize;
515         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
516                                len, struct ctdb_req_message);
517         CTDB_NO_MEMORY(ctdb, r);
518
519         r->hdr.destnode  = vnn;
520         r->srvid         = srvid;
521         r->datalen       = data.dsize;
522         memcpy(&r->data[0], data.dptr, data.dsize);
523         
524         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
525         if (res != 0) {
526                 return res;
527         }
528
529         talloc_free(r);
530         return 0;
531 }
532
533
534 /*
535   cancel a ctdb_fetch_lock operation, releasing the lock
536  */
537 static int fetch_lock_destructor(struct ctdb_record_handle *h)
538 {
539         ctdb_ltdb_unlock(h->ctdb_db, h->key);
540         return 0;
541 }
542
543 /*
544   force the migration of a record to this node
545  */
546 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
547 {
548         struct ctdb_call call;
549         ZERO_STRUCT(call);
550         call.call_id = CTDB_NULL_FUNC;
551         call.key = key;
552         call.flags = CTDB_IMMEDIATE_MIGRATION;
553         return ctdb_call(ctdb_db, &call);
554 }
555
556 /*
557   get a lock on a record, and return the records data. Blocks until it gets the lock
558  */
559 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
560                                            TDB_DATA key, TDB_DATA *data)
561 {
562         int ret;
563         struct ctdb_record_handle *h;
564
565         /*
566           procedure is as follows:
567
568           1) get the chain lock. 
569           2) check if we are dmaster
570           3) if we are the dmaster then return handle 
571           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
572              reply from ctdbd
573           5) when we get the reply, goto (1)
574          */
575
576         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
577         if (h == NULL) {
578                 return NULL;
579         }
580
581         h->ctdb_db = ctdb_db;
582         h->key     = key;
583         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
584         if (h->key.dptr == NULL) {
585                 talloc_free(h);
586                 return NULL;
587         }
588         h->data    = data;
589
590         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
591                  (const char *)key.dptr));
592
593 again:
594         /* step 1 - get the chain lock */
595         ret = ctdb_ltdb_lock(ctdb_db, key);
596         if (ret != 0) {
597                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
598                 talloc_free(h);
599                 return NULL;
600         }
601
602         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
603
604         talloc_set_destructor(h, fetch_lock_destructor);
605
606         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
607
608         /* when torturing, ensure we test the remote path */
609         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
610             random() % 5 == 0) {
611                 h->header.dmaster = (uint32_t)-1;
612         }
613
614
615         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
616
617         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->vnn) {
618                 ctdb_ltdb_unlock(ctdb_db, key);
619                 ret = ctdb_client_force_migration(ctdb_db, key);
620                 if (ret != 0) {
621                         DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
622                         talloc_free(h);
623                         return NULL;
624                 }
625                 goto again;
626         }
627
628         DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
629         return h;
630 }
631
632 /*
633   store some data to the record that was locked with ctdb_fetch_lock()
634 */
635 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
636 {
637         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
638 }
639
640 struct ctdb_client_control_state {
641         struct ctdb_context *ctdb;
642         uint32_t reqid;
643         int32_t status;
644         TDB_DATA outdata;
645         enum call_state state;
646         char *errormsg;
647 };
648
649 /*
650   called when a CTDB_REPLY_CONTROL packet comes in in the client
651
652   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
653   contains any reply data from the control
654 */
655 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
656                                       struct ctdb_req_header *hdr)
657 {
658         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
659         struct ctdb_client_control_state *state;
660
661         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
662         if (state == NULL) {
663                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
664                 return;
665         }
666
667         if (hdr->reqid != state->reqid) {
668                 /* we found a record  but it was the wrong one */
669                 DEBUG(0, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
670                 return;
671         }
672
673         state->outdata.dptr = c->data;
674         state->outdata.dsize = c->datalen;
675         state->status = c->status;
676         if (c->errorlen) {
677                 state->errormsg = talloc_strndup(state, 
678                                                  (char *)&c->data[c->datalen], 
679                                                  c->errorlen);
680         }
681
682         talloc_steal(state, c);
683
684         state->state = CTDB_CALL_DONE;
685 }
686
687
688 /* time out handler for ctdb_control */
689 static void timeout_func(struct event_context *ev, struct timed_event *te, 
690         struct timeval t, void *private_data)
691 {
692         uint32_t *timed_out = (uint32_t *)private_data;
693
694         *timed_out = 1;
695 }
696
697 /*
698   destroy a ctdb_control in client
699 */
700 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
701 {
702         ctdb_reqid_remove(state->ctdb, state->reqid);
703         return 0;
704 }
705
706 /*
707   send a ctdb control message
708   timeout specifies how long we should wait for a reply.
709   if timeout is NULL we wait indefinitely
710  */
711 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
712                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
713                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
714                  struct timeval *timeout,
715                  char **errormsg)
716 {
717         struct ctdb_client_control_state *state;
718         struct ctdb_req_control *c;
719         size_t len;
720         int ret;
721         uint32_t timed_out;
722
723         if (errormsg) {
724                 *errormsg = NULL;
725         }
726
727         /* if the domain socket is not yet open, open it */
728         if (ctdb->daemon.sd==-1) {
729                 ctdb_socket_connect(ctdb);
730         }
731
732         state = talloc_zero(ctdb, struct ctdb_client_control_state);
733         CTDB_NO_MEMORY(ctdb, state);
734
735         state->ctdb  = ctdb;
736         state->reqid = ctdb_reqid_new(ctdb, state);
737         state->state = CTDB_CALL_WAIT;
738         state->errormsg = NULL;
739
740         talloc_set_destructor(state, ctdb_control_destructor);
741
742         len = offsetof(struct ctdb_req_control, data) + data.dsize;
743         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
744                                len, struct ctdb_req_control);
745         CTDB_NO_MEMORY(ctdb, c);
746         
747         c->hdr.reqid        = state->reqid;
748         c->hdr.destnode     = destnode;
749         c->hdr.reqid        = state->reqid;
750         c->opcode           = opcode;
751         c->client_id        = 0;
752         c->flags            = flags;
753         c->srvid            = srvid;
754         c->datalen          = data.dsize;
755         if (data.dsize) {
756                 memcpy(&c->data[0], data.dptr, data.dsize);
757         }
758
759         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
760         if (ret != 0) {
761                 talloc_free(state);
762                 return -1;
763         }
764
765         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
766                 talloc_free(state);
767                 return 0;
768         }
769
770         /* semi-async operation */
771         timed_out = 0;
772         if (timeout && !timeval_is_zero(timeout)) {
773                 event_add_timed(ctdb->ev, state, *timeout, timeout_func, &timed_out);
774         }
775         while ((state->state == CTDB_CALL_WAIT)
776         &&      (timed_out == 0) ){
777                 event_loop_once(ctdb->ev);
778         }
779         if (timed_out) {
780                 talloc_free(state);
781                 if (errormsg) {
782                         (*errormsg) = talloc_strdup(mem_ctx, "control timed out");
783                 } else {
784                         DEBUG(0,("ctdb_control timed out\n"));
785                 }
786                 return -1;
787         }
788
789         if (outdata) {
790                 *outdata = state->outdata;
791                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
792         }
793
794         *status = state->status;
795
796         if (!errormsg && state->errormsg) {
797                 DEBUG(0,("ctdb_control error: '%s'\n", state->errormsg));
798         }
799
800         if (errormsg && state->errormsg) {
801                 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
802         }
803
804         talloc_free(state);
805
806         return 0;       
807 }
808
809
810
811 /*
812   a process exists call. Returns 0 if process exists, -1 otherwise
813  */
814 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
815 {
816         int ret;
817         TDB_DATA data;
818         int32_t status;
819
820         data.dptr = (uint8_t*)&pid;
821         data.dsize = sizeof(pid);
822
823         ret = ctdb_control(ctdb, destnode, 0, 
824                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
825                            NULL, NULL, &status, NULL, NULL);
826         if (ret != 0) {
827                 DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
828                 return -1;
829         }
830
831         return status;
832 }
833
834 /*
835   get remote statistics
836  */
837 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
838 {
839         int ret;
840         TDB_DATA data;
841         int32_t res;
842
843         ret = ctdb_control(ctdb, destnode, 0, 
844                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
845                            ctdb, &data, &res, NULL, NULL);
846         if (ret != 0 || res != 0) {
847                 DEBUG(0,(__location__ " ctdb_control for statistics failed\n"));
848                 return -1;
849         }
850
851         if (data.dsize != sizeof(struct ctdb_statistics)) {
852                 DEBUG(0,(__location__ " Wrong statistics size %u - expected %u\n",
853                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
854                       return -1;
855         }
856
857         *status = *(struct ctdb_statistics *)data.dptr;
858         talloc_free(data.dptr);
859                         
860         return 0;
861 }
862
863 /*
864   shutdown a remote ctdb node
865  */
866 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
867 {
868         int ret;
869         int32_t res;
870
871         ret = ctdb_control(ctdb, destnode, 0, 
872                            CTDB_CONTROL_SHUTDOWN, CTDB_CTRL_FLAG_NOREPLY, tdb_null, 
873                            NULL, NULL, &res, &timeout, NULL);
874         if (ret != 0) {
875                 DEBUG(0,(__location__ " ctdb_control for shutdown failed\n"));
876                 return -1;
877         }
878
879         return 0;
880 }
881
882 /*
883   get vnn map from a remote node
884  */
885 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
886 {
887         int ret;
888         TDB_DATA outdata;
889         int32_t res;
890         struct ctdb_vnn_map_wire *map;
891
892         ret = ctdb_control(ctdb, destnode, 0, 
893                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
894                            mem_ctx, &outdata, &res, &timeout, NULL);
895         if (ret != 0 || res != 0) {
896                 DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
897                 return -1;
898         }
899         
900         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
901         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
902             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
903                 DEBUG(0,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
904                 return -1;
905         }
906
907         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
908         CTDB_NO_MEMORY(ctdb, *vnnmap);
909         (*vnnmap)->generation = map->generation;
910         (*vnnmap)->size       = map->size;
911         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
912
913         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
914         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
915         talloc_free(outdata.dptr);
916                     
917         return 0;
918 }
919
920 /*
921   get the recovery mode of a remote node
922  */
923 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
924 {
925         int ret;
926         int32_t res;
927
928         ret = ctdb_control(ctdb, destnode, 0, 
929                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
930                            NULL, NULL, &res, &timeout, NULL);
931         if (ret != 0) {
932                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
933                 return -1;
934         }
935
936         *recmode = res;
937
938         return 0;
939 }
940
941 /*
942   set the recovery mode of a remote node
943  */
944 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
945 {
946         int ret;
947         TDB_DATA data;
948         int32_t res;
949
950         data.dsize = sizeof(uint32_t);
951         data.dptr = (unsigned char *)&recmode;
952
953         ret = ctdb_control(ctdb, destnode, 0, 
954                            CTDB_CONTROL_SET_RECMODE, 0, data, 
955                            NULL, NULL, &res, &timeout, NULL);
956         if (ret != 0 || res != 0) {
957                 DEBUG(0,(__location__ " ctdb_control for setrecmode failed\n"));
958                 return -1;
959         }
960
961         return 0;
962 }
963
964 /*
965   get the recovery master of a remote node
966  */
967 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
968 {
969         int ret;
970         int32_t res;
971
972         ret = ctdb_control(ctdb, destnode, 0, 
973                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
974                            NULL, NULL, &res, &timeout, NULL);
975         if (ret != 0) {
976                 DEBUG(0,(__location__ " ctdb_control for getrecmaster failed\n"));
977                 return -1;
978         }
979
980         *recmaster = res;
981
982         return 0;
983 }
984
985 /*
986   set the recovery master of a remote node
987  */
988 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
989 {
990         int ret;
991         TDB_DATA data;
992         int32_t res;
993
994         ZERO_STRUCT(data);
995         data.dsize = sizeof(uint32_t);
996         data.dptr = (unsigned char *)&recmaster;
997
998         ret = ctdb_control(ctdb, destnode, 0, 
999                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1000                            NULL, NULL, &res, &timeout, NULL);
1001         if (ret != 0 || res != 0) {
1002                 DEBUG(0,(__location__ " ctdb_control for setrecmaster failed\n"));
1003                 return -1;
1004         }
1005
1006         return 0;
1007 }
1008
1009
1010 /*
1011   get a list of databases off a remote node
1012  */
1013 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1014                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1015 {
1016         int ret;
1017         TDB_DATA outdata;
1018         int32_t res;
1019
1020         ret = ctdb_control(ctdb, destnode, 0, 
1021                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1022                            mem_ctx, &outdata, &res, &timeout, NULL);
1023         if (ret != 0 || res != 0) {
1024                 DEBUG(0,(__location__ " ctdb_control for getdbmap failed\n"));
1025                 return -1;
1026         }
1027
1028         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1029         talloc_free(outdata.dptr);
1030                     
1031         return 0;
1032 }
1033
1034
1035 /*
1036   get a list of nodes (vnn and flags ) from a remote node
1037  */
1038 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1039                 struct timeval timeout, uint32_t destnode, 
1040                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1041 {
1042         int ret;
1043         TDB_DATA outdata;
1044         int32_t res;
1045
1046         ret = ctdb_control(ctdb, destnode, 0, 
1047                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1048                            mem_ctx, &outdata, &res, &timeout, NULL);
1049         if (ret != 0 || res != 0) {
1050                 DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
1051                 return -1;
1052         }
1053
1054         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1055         talloc_free(outdata.dptr);
1056                     
1057         return 0;
1058 }
1059
1060 /*
1061   set vnn map on a node
1062  */
1063 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1064                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1065 {
1066         int ret;
1067         TDB_DATA data;
1068         int32_t res;
1069         struct ctdb_vnn_map_wire *map;
1070         size_t len;
1071
1072         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1073         map = talloc_size(mem_ctx, len);
1074         CTDB_NO_MEMORY_VOID(ctdb, map);
1075
1076         map->generation = vnnmap->generation;
1077         map->size = vnnmap->size;
1078         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1079         
1080         data.dsize = len;
1081         data.dptr  = (uint8_t *)map;
1082
1083         ret = ctdb_control(ctdb, destnode, 0, 
1084                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1085                            NULL, NULL, &res, &timeout, NULL);
1086         if (ret != 0 || res != 0) {
1087                 DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
1088                 return -1;
1089         }
1090
1091         talloc_free(map);
1092
1093         return 0;
1094 }
1095
1096 /*
1097   get all keys and records for a specific database
1098  */
1099 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster, 
1100                      TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
1101 {
1102         int i, ret;
1103         TDB_DATA indata, outdata;
1104         struct ctdb_control_pulldb pull;
1105         struct ctdb_control_pulldb_reply *reply;
1106         struct ctdb_rec_data *rec;
1107         int32_t res;
1108
1109         pull.db_id   = dbid;
1110         pull.lmaster = lmaster;
1111
1112         indata.dsize = sizeof(struct ctdb_control_pulldb);
1113         indata.dptr  = (unsigned char *)&pull;
1114
1115         ret = ctdb_control(ctdb, destnode, 0, 
1116                            CTDB_CONTROL_PULL_DB, 0, indata, 
1117                            mem_ctx, &outdata, &res, NULL, NULL);
1118         if (ret != 0 || res != 0) {
1119                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1120                 return -1;
1121         }
1122
1123
1124         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
1125         keys->dbid     = reply->db_id;
1126         keys->num      = reply->count;
1127         
1128         keys->keys     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1129         keys->headers  = talloc_array(mem_ctx, struct ctdb_ltdb_header, keys->num);
1130         keys->data     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1131
1132         rec = (struct ctdb_rec_data *)&reply->data[0];
1133
1134         for (i=0;i<reply->count;i++) {
1135                 keys->keys[i].dptr = talloc_memdup(mem_ctx, &rec->data[0], rec->keylen);
1136                 keys->keys[i].dsize = rec->keylen;
1137                 
1138                 keys->data[i].dptr = talloc_memdup(mem_ctx, &rec->data[keys->keys[i].dsize], rec->datalen);
1139                 keys->data[i].dsize = rec->datalen;
1140
1141                 if (keys->data[i].dsize < sizeof(struct ctdb_ltdb_header)) {
1142                         DEBUG(0,(__location__ " bad ltdb record\n"));
1143                         return -1;
1144                 }
1145                 memcpy(&keys->headers[i], keys->data[i].dptr, sizeof(struct ctdb_ltdb_header));
1146                 keys->data[i].dptr += sizeof(struct ctdb_ltdb_header);
1147                 keys->data[i].dsize -= sizeof(struct ctdb_ltdb_header);
1148
1149                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1150         }           
1151
1152         talloc_free(outdata.dptr);
1153
1154         return 0;
1155 }
1156
1157 /*
1158   copy a tdb from one node to another node
1159  */
1160 int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t sourcenode, 
1161                      uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
1162 {
1163         int ret;
1164         TDB_DATA indata, outdata;
1165         int32_t res;
1166
1167         indata.dsize = 2*sizeof(uint32_t);
1168         indata.dptr  = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1169
1170         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1171         ((uint32_t *)(&indata.dptr[0]))[1] = lmaster;
1172
1173         DEBUG(3,("pulling dbid 0x%x from %u\n", dbid, sourcenode));
1174
1175         ret = ctdb_control(ctdb, sourcenode, 0, 
1176                            CTDB_CONTROL_PULL_DB, 0, indata, 
1177                            mem_ctx, &outdata, &res, &timeout, NULL);
1178         if (ret != 0 || res != 0) {
1179                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1180                 return -1;
1181         }
1182
1183         DEBUG(3,("pushing dbid 0x%x to %u\n", dbid, destnode));
1184
1185         ret = ctdb_control(ctdb, destnode, 0, 
1186                            CTDB_CONTROL_PUSH_DB, 0, outdata, 
1187                            mem_ctx, NULL, &res, &timeout, NULL);
1188         talloc_free(outdata.dptr);
1189         if (ret != 0 || res != 0) {
1190                 DEBUG(0,(__location__ " ctdb_control for pushdb failed\n"));
1191                 return -1;
1192         }
1193
1194         DEBUG(3,("copydb for dbid 0x%x done for %u to %u\n", 
1195                  dbid, sourcenode, destnode));
1196
1197         return 0;
1198 }
1199
1200 /*
1201   change dmaster for all keys in the database to the new value
1202  */
1203 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1204                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1205 {
1206         int ret;
1207         TDB_DATA indata;
1208         int32_t res;
1209
1210         indata.dsize = 2*sizeof(uint32_t);
1211         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1212
1213         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1214         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1215
1216         ret = ctdb_control(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1218                            NULL, NULL, &res, &timeout, NULL);
1219         if (ret != 0 || res != 0) {
1220                 DEBUG(0,(__location__ " ctdb_control for setdmaster failed\n"));
1221                 return -1;
1222         }
1223
1224         return 0;
1225 }
1226
1227 /*
1228   ping a node, return number of clients connected
1229  */
1230 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1231 {
1232         int ret;
1233         int32_t res;
1234
1235         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1236                            tdb_null, NULL, NULL, &res, NULL, NULL);
1237         if (ret != 0) {
1238                 return -1;
1239         }
1240         return res;
1241 }
1242
1243 /*
1244   find the real path to a ltdb 
1245  */
1246 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1247                    const char **path)
1248 {
1249         int ret;
1250         int32_t res;
1251         TDB_DATA data;
1252
1253         data.dptr = (uint8_t *)&dbid;
1254         data.dsize = sizeof(dbid);
1255
1256         ret = ctdb_control(ctdb, destnode, 0, 
1257                            CTDB_CONTROL_GETDBPATH, 0, data, 
1258                            mem_ctx, &data, &res, &timeout, NULL);
1259         if (ret != 0 || res != 0) {
1260                 return -1;
1261         }
1262
1263         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1264         if ((*path) == NULL) {
1265                 return -1;
1266         }
1267
1268         talloc_free(data.dptr);
1269
1270         return 0;
1271 }
1272
1273 /*
1274   find the name of a db 
1275  */
1276 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1277                    const char **name)
1278 {
1279         int ret;
1280         int32_t res;
1281         TDB_DATA data;
1282
1283         data.dptr = (uint8_t *)&dbid;
1284         data.dsize = sizeof(dbid);
1285
1286         ret = ctdb_control(ctdb, destnode, 0, 
1287                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1288                            mem_ctx, &data, &res, &timeout, NULL);
1289         if (ret != 0 || res != 0) {
1290                 return -1;
1291         }
1292
1293         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1294         if ((*name) == NULL) {
1295                 return -1;
1296         }
1297
1298         talloc_free(data.dptr);
1299
1300         return 0;
1301 }
1302
1303 /*
1304   create a database
1305  */
1306 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name)
1307 {
1308         int ret;
1309         int32_t res;
1310         TDB_DATA data;
1311
1312         data.dptr = discard_const(name);
1313         data.dsize = strlen(name)+1;
1314
1315         ret = ctdb_control(ctdb, destnode, 0, 
1316                            CTDB_CONTROL_DB_ATTACH, 0, data, 
1317                            mem_ctx, &data, &res, &timeout, NULL);
1318
1319         if (ret != 0 || res != 0) {
1320                 return -1;
1321         }
1322
1323         return 0;
1324 }
1325
1326 /*
1327   get debug level on a node
1328  */
1329 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *level)
1330 {
1331         int ret;
1332         int32_t res;
1333         TDB_DATA data;
1334
1335         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1336                            ctdb, &data, &res, NULL, NULL);
1337         if (ret != 0 || res != 0) {
1338                 return -1;
1339         }
1340         if (data.dsize != sizeof(uint32_t)) {
1341                 DEBUG(0,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1342                          (unsigned)data.dsize));
1343                 return -1;
1344         }
1345         *level = *(uint32_t *)data.dptr;
1346         talloc_free(data.dptr);
1347         return 0;
1348 }
1349
1350 /*
1351   set debug level on a node
1352  */
1353 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t level)
1354 {
1355         int ret;
1356         int32_t res;
1357         TDB_DATA data;
1358
1359         data.dptr = (uint8_t *)&level;
1360         data.dsize = sizeof(level);
1361
1362         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1363                            NULL, NULL, &res, NULL, NULL);
1364         if (ret != 0 || res != 0) {
1365                 return -1;
1366         }
1367         return 0;
1368 }
1369
1370
1371 /*
1372   get a list of connected nodes
1373  */
1374 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1375                                 struct timeval timeout,
1376                                 TALLOC_CTX *mem_ctx,
1377                                 uint32_t *num_nodes)
1378 {
1379         struct ctdb_node_map *map=NULL;
1380         int ret, i;
1381         uint32_t *nodes;
1382
1383         *num_nodes = 0;
1384
1385         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1386         if (ret != 0) {
1387                 return NULL;
1388         }
1389
1390         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1391         if (nodes == NULL) {
1392                 return NULL;
1393         }
1394
1395         for (i=0;i<map->num;i++) {
1396                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1397                         nodes[*num_nodes] = map->nodes[i].vnn;
1398                         (*num_nodes)++;
1399                 }
1400         }
1401
1402         return nodes;
1403 }
1404
1405
1406 /*
1407   reset remote status
1408  */
1409 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1410 {
1411         int ret;
1412         int32_t res;
1413
1414         ret = ctdb_control(ctdb, destnode, 0, 
1415                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1416                            NULL, NULL, &res, NULL, NULL);
1417         if (ret != 0 || res != 0) {
1418                 DEBUG(0,(__location__ " ctdb_control for reset statistics failed\n"));
1419                 return -1;
1420         }
1421         return 0;
1422 }
1423
1424
1425 /*
1426   attach to a specific database - client call
1427 */
1428 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
1429 {
1430         struct ctdb_db_context *ctdb_db;
1431         TDB_DATA data;
1432         int ret;
1433         int32_t res;
1434
1435         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1436         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1437
1438         ctdb_db->ctdb = ctdb;
1439         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1440         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1441
1442         data.dptr = discard_const(name);
1443         data.dsize = strlen(name)+1;
1444
1445         /* tell ctdb daemon to attach */
1446         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
1447                            0, data, ctdb_db, &data, &res, NULL, NULL);
1448         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1449                 DEBUG(0,("Failed to attach to database '%s'\n", name));
1450                 talloc_free(ctdb_db);
1451                 return NULL;
1452         }
1453         
1454         ctdb_db->db_id = *(uint32_t *)data.dptr;
1455         talloc_free(data.dptr);
1456
1457         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1458         if (ret != 0) {
1459                 DEBUG(0,("Failed to get dbpath for database '%s'\n", name));
1460                 talloc_free(ctdb_db);
1461                 return NULL;
1462         }
1463
1464         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 0, O_RDWR, 0);
1465         if (ctdb_db->ltdb == NULL) {
1466                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1467                 talloc_free(ctdb_db);
1468                 return NULL;
1469         }
1470
1471         DLIST_ADD(ctdb->db_list, ctdb_db);
1472
1473         return ctdb_db;
1474 }
1475
1476
1477 /*
1478   setup a call for a database
1479  */
1480 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1481 {
1482         TDB_DATA data;
1483         int32_t status;
1484         struct ctdb_control_set_call c;
1485         int ret;
1486         struct ctdb_registered_call *call;
1487
1488         c.db_id = ctdb_db->db_id;
1489         c.fn    = fn;
1490         c.id    = id;
1491
1492         data.dptr = (uint8_t *)&c;
1493         data.dsize = sizeof(c);
1494
1495         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1496                            data, NULL, NULL, &status, NULL, NULL);
1497         if (ret != 0 || status != 0) {
1498                 DEBUG(0,("ctdb_set_call failed for call %u\n", id));
1499                 return -1;
1500         }
1501
1502         /* also register locally */
1503         call = talloc(ctdb_db, struct ctdb_registered_call);
1504         call->fn = fn;
1505         call->id = id;
1506
1507         DLIST_ADD(ctdb_db->calls, call);        
1508         return 0;
1509 }
1510
1511
1512 struct traverse_state {
1513         bool done;
1514         uint32_t count;
1515         ctdb_traverse_func fn;
1516         void *private_data;
1517 };
1518
1519 /*
1520   called on each key during a ctdb_traverse
1521  */
1522 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1523 {
1524         struct traverse_state *state = (struct traverse_state *)p;
1525         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1526         TDB_DATA key;
1527
1528         if (data.dsize < sizeof(uint32_t) ||
1529             d->length != data.dsize) {
1530                 DEBUG(0,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1531                 state->done = True;
1532                 return;
1533         }
1534
1535         key.dsize = d->keylen;
1536         key.dptr  = &d->data[0];
1537         data.dsize = d->datalen;
1538         data.dptr = &d->data[d->keylen];
1539
1540         if (key.dsize == 0 && data.dsize == 0) {
1541                 /* end of traverse */
1542                 state->done = True;
1543                 return;
1544         }
1545
1546         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1547                 state->done = True;
1548         }
1549
1550         state->count++;
1551 }
1552
1553
1554 /*
1555   start a cluster wide traverse, calling the supplied fn on each record
1556   return the number of records traversed, or -1 on error
1557  */
1558 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1559 {
1560         TDB_DATA data;
1561         struct ctdb_traverse_start t;
1562         int32_t status;
1563         int ret;
1564         uint64_t srvid = (getpid() | 0xFLL<<60);
1565         struct traverse_state state;
1566
1567         state.done = False;
1568         state.count = 0;
1569         state.private_data = private_data;
1570         state.fn = fn;
1571
1572         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1573         if (ret != 0) {
1574                 DEBUG(0,("Failed to setup traverse handler\n"));
1575                 return -1;
1576         }
1577
1578         t.db_id = ctdb_db->db_id;
1579         t.srvid = srvid;
1580         t.reqid = 0;
1581
1582         data.dptr = (uint8_t *)&t;
1583         data.dsize = sizeof(t);
1584
1585         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1586                            data, NULL, NULL, &status, NULL, NULL);
1587         if (ret != 0 || status != 0) {
1588                 DEBUG(0,("ctdb_traverse_all failed\n"));
1589                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1590                 return -1;
1591         }
1592
1593         while (!state.done) {
1594                 event_loop_once(ctdb_db->ctdb->ev);
1595         }
1596
1597         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1598         if (ret != 0) {
1599                 DEBUG(0,("Failed to remove ctdb_traverse handler\n"));
1600                 return -1;
1601         }
1602
1603         return state.count;
1604 }
1605
1606 /*
1607   called on each key during a catdb
1608  */
1609 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1610 {
1611         FILE *f = (FILE *)p;
1612         char *keystr, *datastr;
1613         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1614
1615         keystr  = hex_encode(ctdb, key.dptr, key.dsize);
1616         datastr = hex_encode(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
1617
1618         fprintf(f, "dmaster: %u\n", h->dmaster);
1619         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1620         fprintf(f, "key: %s\ndata: %s\n", keystr, datastr);
1621
1622         talloc_free(keystr);
1623         talloc_free(datastr);
1624         return 0;
1625 }
1626
1627 /*
1628   convenience function to list all keys to stdout
1629  */
1630 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1631 {
1632         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1633 }
1634
1635 /*
1636   get the pid of a ctdb daemon
1637  */
1638 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1639 {
1640         int ret;
1641         int32_t res;
1642
1643         ret = ctdb_control(ctdb, destnode, 0, 
1644                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1645                            NULL, NULL, &res, &timeout, NULL);
1646         if (ret != 0) {
1647                 DEBUG(0,(__location__ " ctdb_control for getpid failed\n"));
1648                 return -1;
1649         }
1650
1651         *pid = res;
1652
1653         return 0;
1654 }
1655
1656
1657 /*
1658   freeze a node
1659  */
1660 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1661 {
1662         int ret;
1663         int32_t res;
1664
1665         ret = ctdb_control(ctdb, destnode, 0, 
1666                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1667                            NULL, NULL, &res, &timeout, NULL);
1668         if (ret != 0 || res != 0) {
1669                 DEBUG(0,(__location__ " ctdb_control freeze failed\n"));
1670                 return -1;
1671         }
1672
1673         return 0;
1674 }
1675
1676 /*
1677   thaw a node
1678  */
1679 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1680 {
1681         int ret;
1682         int32_t res;
1683
1684         ret = ctdb_control(ctdb, destnode, 0, 
1685                            CTDB_CONTROL_THAW, 0, tdb_null, 
1686                            NULL, NULL, &res, &timeout, NULL);
1687         if (ret != 0 || res != 0) {
1688                 DEBUG(0,(__location__ " ctdb_control thaw failed\n"));
1689                 return -1;
1690         }
1691
1692         return 0;
1693 }
1694
1695 /*
1696   get vnn of a node, or -1
1697  */
1698 int ctdb_ctrl_getvnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1699 {
1700         int ret;
1701         int32_t res;
1702
1703         ret = ctdb_control(ctdb, destnode, 0, 
1704                            CTDB_CONTROL_GET_VNN, 0, tdb_null, 
1705                            NULL, NULL, &res, &timeout, NULL);
1706         if (ret != 0) {
1707                 DEBUG(0,(__location__ " ctdb_control for getvnn failed\n"));
1708                 return -1;
1709         }
1710
1711         return res;
1712 }
1713
1714 /*
1715   set the monitoring mode of a remote node
1716  */
1717 int ctdb_ctrl_setmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t monmode)
1718 {
1719         int ret;
1720         TDB_DATA data;
1721         int32_t res;
1722
1723         data.dsize = sizeof(uint32_t);
1724         data.dptr = (uint8_t *)&monmode;
1725
1726         ret = ctdb_control(ctdb, destnode, 0, 
1727                            CTDB_CONTROL_SET_MONMODE, 0, data, 
1728                            NULL, NULL, &res, &timeout, NULL);
1729         if (ret != 0 || res != 0) {
1730                 DEBUG(0,(__location__ " ctdb_control for setmonmode failed\n"));
1731                 return -1;
1732         }
1733
1734         return 0;
1735 }
1736
1737 /*
1738   get the monitoring mode of a remote node
1739  */
1740 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1741 {
1742         int ret;
1743         int32_t res;
1744
1745         ret = ctdb_control(ctdb, destnode, 0, 
1746                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1747                            NULL, NULL, &res, &timeout, NULL);
1748         if (ret != 0) {
1749                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
1750                 return -1;
1751         }
1752
1753         *monmode = res;
1754
1755         return 0;
1756 }
1757
1758
1759 /*
1760   get maximum rsn for a db on a node
1761  */
1762 int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1763                           uint32_t destnode, uint32_t db_id, uint64_t *max_rsn)
1764 {
1765         TDB_DATA data, outdata;
1766         int ret;
1767         int32_t res;
1768
1769         data.dptr = (uint8_t *)&db_id;
1770         data.dsize = sizeof(db_id);
1771
1772         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_MAX_RSN, 0, data, ctdb,
1773                            &outdata, &res, &timeout, NULL);
1774         if (ret != 0 || res != 0 || outdata.dsize != sizeof(uint64_t)) {
1775                 DEBUG(0,(__location__ " ctdb_control for get_max_rsn failed\n"));
1776                 return -1;
1777         }
1778
1779         *max_rsn = *(uint64_t *)outdata.dptr;
1780         talloc_free(outdata.dptr);
1781
1782         return 0;       
1783 }
1784
1785 /*
1786   set the rsn on non-empty records to the given rsn
1787  */
1788 int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
1789                                uint32_t destnode, uint32_t db_id, uint64_t rsn)
1790 {
1791         TDB_DATA data;
1792         int ret;
1793         int32_t res;
1794         struct ctdb_control_set_rsn_nonempty p;
1795
1796         p.db_id = db_id;
1797         p.rsn = rsn;
1798
1799         data.dptr = (uint8_t *)&p;
1800         data.dsize = sizeof(p);
1801
1802         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, NULL,
1803                            NULL, &res, &timeout, NULL);
1804         if (ret != 0 || res != 0) {
1805                 DEBUG(0,(__location__ " ctdb_control for set_rsn_nonempty failed\n"));
1806                 return -1;
1807         }
1808
1809         return 0;       
1810 }
1811
1812 /*
1813   delete records which have a rsn below the given rsn
1814  */
1815 int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1816                              uint32_t destnode, uint32_t db_id, uint64_t rsn)
1817 {
1818         TDB_DATA data;
1819         int ret;
1820         int32_t res;
1821         struct ctdb_control_delete_low_rsn p;
1822
1823         p.db_id = db_id;
1824         p.rsn = rsn;
1825
1826         data.dptr = (uint8_t *)&p;
1827         data.dsize = sizeof(p);
1828
1829         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, NULL,
1830                            NULL, &res, &timeout, NULL);
1831         if (ret != 0 || res != 0) {
1832                 DEBUG(0,(__location__ " ctdb_control for delete_low_rsn failed\n"));
1833                 return -1;
1834         }
1835
1836         return 0;       
1837 }
1838
1839 /* 
1840   sent to a node to make it take over an ip address
1841 */
1842 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1843                           uint32_t destnode, struct ctdb_public_ip *ip)
1844 {
1845         TDB_DATA data;
1846         int ret;
1847         int32_t res;
1848
1849         data.dsize = sizeof(*ip);
1850         data.dptr  = (uint8_t *)ip;
1851
1852         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1853                            NULL, &res, &timeout, NULL);
1854
1855         if (ret != 0 || res != 0) {
1856                 DEBUG(0,(__location__ " ctdb_control for takeover_ip failed\n"));
1857                 return -1;
1858         }
1859
1860         return 0;       
1861 }
1862
1863
1864 /* 
1865   sent to a node to make it release an ip address
1866 */
1867 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1868                          uint32_t destnode, struct ctdb_public_ip *ip)
1869 {
1870         TDB_DATA data;
1871         int ret;
1872         int32_t res;
1873
1874         data.dsize = sizeof(*ip);
1875         data.dptr  = (uint8_t *)ip;
1876
1877         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1878                            NULL, &res, &timeout, NULL);
1879
1880         if (ret != 0 || res != 0) {
1881                 DEBUG(0,(__location__ " ctdb_control for release_ip failed\n"));
1882                 return -1;
1883         }
1884
1885         return 0;       
1886 }
1887
1888
1889 /*
1890   get a tunable
1891  */
1892 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1893                           struct timeval timeout, 
1894                           uint32_t destnode,
1895                           const char *name, uint32_t *value)
1896 {
1897         struct ctdb_control_get_tunable *t;
1898         TDB_DATA data, outdata;
1899         int32_t res;
1900         int ret;
1901
1902         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1903         data.dptr  = talloc_size(ctdb, data.dsize);
1904         CTDB_NO_MEMORY(ctdb, data.dptr);
1905
1906         t = (struct ctdb_control_get_tunable *)data.dptr;
1907         t->length = strlen(name)+1;
1908         memcpy(t->name, name, t->length);
1909
1910         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1911                            &outdata, &res, &timeout, NULL);
1912         talloc_free(data.dptr);
1913         if (ret != 0 || res != 0) {
1914                 DEBUG(0,(__location__ " ctdb_control for get_tunable failed\n"));
1915                 return -1;
1916         }
1917
1918         if (outdata.dsize != sizeof(uint32_t)) {
1919                 DEBUG(0,("Invalid return data in get_tunable\n"));
1920                 talloc_free(outdata.dptr);
1921                 return -1;
1922         }
1923         
1924         *value = *(uint32_t *)outdata.dptr;
1925         talloc_free(outdata.dptr);
1926
1927         return 0;
1928 }
1929
1930 /*
1931   set a tunable
1932  */
1933 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1934                           struct timeval timeout, 
1935                           uint32_t destnode,
1936                           const char *name, uint32_t value)
1937 {
1938         struct ctdb_control_set_tunable *t;
1939         TDB_DATA data;
1940         int32_t res;
1941         int ret;
1942
1943         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1944         data.dptr  = talloc_size(ctdb, data.dsize);
1945         CTDB_NO_MEMORY(ctdb, data.dptr);
1946
1947         t = (struct ctdb_control_set_tunable *)data.dptr;
1948         t->length = strlen(name)+1;
1949         memcpy(t->name, name, t->length);
1950         t->value = value;
1951
1952         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1953                            NULL, &res, &timeout, NULL);
1954         talloc_free(data.dptr);
1955         if (ret != 0 || res != 0) {
1956                 DEBUG(0,(__location__ " ctdb_control for set_tunable failed\n"));
1957                 return -1;
1958         }
1959
1960         return 0;
1961 }
1962
1963 /*
1964   list tunables
1965  */
1966 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1967                             struct timeval timeout, 
1968                             uint32_t destnode,
1969                             TALLOC_CTX *mem_ctx,
1970                             const char ***list, uint32_t *count)
1971 {
1972         TDB_DATA outdata;
1973         int32_t res;
1974         int ret;
1975         struct ctdb_control_list_tunable *t;
1976         char *p, *s, *ptr;
1977
1978         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1979                            mem_ctx, &outdata, &res, &timeout, NULL);
1980         if (ret != 0 || res != 0) {
1981                 DEBUG(0,(__location__ " ctdb_control for list_tunables failed\n"));
1982                 return -1;
1983         }
1984
1985         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1986         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1987             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1988                 DEBUG(0,("Invalid data in list_tunables reply\n"));
1989                 talloc_free(outdata.dptr);
1990                 return -1;              
1991         }
1992         
1993         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1994         CTDB_NO_MEMORY(ctdb, p);
1995
1996         talloc_free(outdata.dptr);
1997         
1998         (*list) = NULL;
1999         (*count) = 0;
2000
2001         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2002                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2003                 CTDB_NO_MEMORY(ctdb, *list);
2004                 (*list)[*count] = talloc_strdup(*list, s);
2005                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2006                 (*count)++;
2007         }
2008
2009         talloc_free(p);
2010
2011         return 0;
2012 }
2013
2014
2015 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2016                         struct timeval timeout, uint32_t destnode, 
2017                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2018 {
2019         int ret;
2020         TDB_DATA outdata;
2021         int32_t res;
2022
2023         ret = ctdb_control(ctdb, destnode, 0, 
2024                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2025                            mem_ctx, &outdata, &res, &timeout, NULL);
2026         if (ret != 0 || res != 0) {
2027                 DEBUG(0,(__location__ " ctdb_control for getpublicips failed\n"));
2028                 return -1;
2029         }
2030
2031         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2032         talloc_free(outdata.dptr);
2033                     
2034         return 0;
2035 }
2036
2037 /*
2038   set/clear the permanent disabled bit on a remote node
2039  */
2040 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2041                        uint32_t set, uint32_t clear)
2042 {
2043         int ret;
2044         TDB_DATA data;
2045         struct ctdb_node_modflags m;
2046         int32_t res;
2047
2048         m.set = set;
2049         m.clear = clear;
2050
2051         data.dsize = sizeof(m);
2052         data.dptr = (unsigned char *)&m;
2053
2054         ret = ctdb_control(ctdb, destnode, 0, 
2055                            CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
2056                            NULL, NULL, &res, &timeout, NULL);
2057         if (ret != 0 || res != 0) {
2058                 DEBUG(0,(__location__ " ctdb_control for modflags failed\n"));
2059                 return -1;
2060         }
2061
2062         return 0;
2063 }
2064
2065
2066 /*
2067   get all tunables
2068  */
2069 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2070                                struct timeval timeout, 
2071                                uint32_t destnode,
2072                                struct ctdb_tunable *tunables)
2073 {
2074         TDB_DATA outdata;
2075         int ret;
2076         int32_t res;
2077
2078         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2079                            &outdata, &res, &timeout, NULL);
2080         if (ret != 0 || res != 0) {
2081                 DEBUG(0,(__location__ " ctdb_control for get all tunables failed\n"));
2082                 return -1;
2083         }
2084
2085         if (outdata.dsize != sizeof(*tunables)) {
2086                 DEBUG(0,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2087                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2088                 return -1;              
2089         }
2090
2091         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2092         talloc_free(outdata.dptr);
2093         return 0;
2094 }
2095
2096
2097 /*
2098   initialise the ctdb daemon for client applications
2099
2100   NOTE: In current code the daemon does not fork. This is for testing purposes only
2101   and to simplify the code.
2102 */
2103 struct ctdb_context *ctdb_init(struct event_context *ev)
2104 {
2105         struct ctdb_context *ctdb;
2106
2107         ctdb = talloc_zero(ev, struct ctdb_context);
2108         ctdb->ev  = ev;
2109         ctdb->idr = idr_init(ctdb);
2110         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2111
2112         return ctdb;
2113 }
2114
2115
2116 /*
2117   set some ctdb flags
2118 */
2119 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2120 {
2121         ctdb->flags |= flags;
2122 }
2123
2124 /*
2125   setup the local socket name
2126 */
2127 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2128 {
2129         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2130         return 0;
2131 }
2132
2133 /*
2134   return the vnn of this node
2135 */
2136 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
2137 {
2138         return ctdb->vnn;
2139 }
2140