clean out some more cruft
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 #include "includes.h"
23 #include "db_wrap.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   queue a packet for sending from client to daemon
34 */
35 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
36 {
37         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
38 }
39
40
41 /*
42   state of a in-progress ctdb call in client
43 */
44 struct ctdb_client_call_state {
45         enum call_state state;
46         uint32_t reqid;
47         struct ctdb_db_context *ctdb_db;
48         struct ctdb_call call;
49 };
50
51 /*
52   called when a CTDB_REPLY_CALL packet comes in in the client
53
54   This packet comes in response to a CTDB_REQ_CALL request packet. It
55   contains any reply data from the call
56 */
57 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
58 {
59         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
60         struct ctdb_client_call_state *state;
61
62         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
63         if (state == NULL) {
64                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
65                 return;
66         }
67
68         if (hdr->reqid != state->reqid) {
69                 /* we found a record  but it was the wrong one */
70                 DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
71                 return;
72         }
73
74         state->call.reply_data.dptr = c->data;
75         state->call.reply_data.dsize = c->datalen;
76         state->call.status = c->status;
77
78         talloc_steal(state, c);
79
80         state->state = CTDB_CALL_DONE;
81 }
82
83 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
84
85 /*
86   this is called in the client, when data comes in from the daemon
87  */
88 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
89 {
90         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
91         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
92         TALLOC_CTX *tmp_ctx;
93
94         /* place the packet as a child of a tmp_ctx. We then use
95            talloc_free() below to free it. If any of the calls want
96            to keep it, then they will steal it somewhere else, and the
97            talloc_free() will be a no-op */
98         tmp_ctx = talloc_new(ctdb);
99         talloc_steal(tmp_ctx, hdr);
100
101         if (cnt == 0) {
102                 DEBUG(2,("Daemon has exited - shutting down client\n"));
103                 exit(0);
104         }
105
106         if (cnt < sizeof(*hdr)) {
107                 DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt));
108                 goto done;
109         }
110         if (cnt != hdr->length) {
111                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
112                                (unsigned)hdr->length, (unsigned)cnt);
113                 goto done;
114         }
115
116         if (hdr->ctdb_magic != CTDB_MAGIC) {
117                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
118                 goto done;
119         }
120
121         if (hdr->ctdb_version != CTDB_VERSION) {
122                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
123                 goto done;
124         }
125
126         switch (hdr->operation) {
127         case CTDB_REPLY_CALL:
128                 ctdb_client_reply_call(ctdb, hdr);
129                 break;
130
131         case CTDB_REQ_MESSAGE:
132                 ctdb_request_message(ctdb, hdr);
133                 break;
134
135         case CTDB_REPLY_CONTROL:
136                 ctdb_client_reply_control(ctdb, hdr);
137                 break;
138
139         default:
140                 DEBUG(0,("bogus operation code:%u\n",hdr->operation));
141         }
142
143 done:
144         talloc_free(tmp_ctx);
145 }
146
147 /*
148   connect to a unix domain socket
149 */
150 int ctdb_socket_connect(struct ctdb_context *ctdb)
151 {
152         struct sockaddr_un addr;
153
154         memset(&addr, 0, sizeof(addr));
155         addr.sun_family = AF_UNIX;
156         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
157
158         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
159         if (ctdb->daemon.sd == -1) {
160                 return -1;
161         }
162
163         set_nonblocking(ctdb->daemon.sd);
164         set_close_on_exec(ctdb->daemon.sd);
165         
166         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
167                 close(ctdb->daemon.sd);
168                 ctdb->daemon.sd = -1;
169                 return -1;
170         }
171
172         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
173                                               CTDB_DS_ALIGNMENT, 
174                                               ctdb_client_read_cb, ctdb);
175         return 0;
176 }
177
178
179 struct ctdb_record_handle {
180         struct ctdb_db_context *ctdb_db;
181         TDB_DATA key;
182         TDB_DATA *data;
183         struct ctdb_ltdb_header header;
184 };
185
186
187 /*
188   make a recv call to the local ctdb daemon - called from client context
189
190   This is called when the program wants to wait for a ctdb_call to complete and get the 
191   results. This call will block unless the call has already completed.
192 */
193 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
194 {
195         while (state->state < CTDB_CALL_DONE) {
196                 event_loop_once(state->ctdb_db->ctdb->ev);
197         }
198         if (state->state != CTDB_CALL_DONE) {
199                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
200                 talloc_free(state);
201                 return -1;
202         }
203
204         if (state->call.reply_data.dsize) {
205                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
206                                                       state->call.reply_data.dptr,
207                                                       state->call.reply_data.dsize);
208                 call->reply_data.dsize = state->call.reply_data.dsize;
209         } else {
210                 call->reply_data.dptr = NULL;
211                 call->reply_data.dsize = 0;
212         }
213         call->status = state->call.status;
214         talloc_free(state);
215
216         return 0;
217 }
218
219
220
221
222 /*
223   destroy a ctdb_call in client
224 */
225 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
226 {
227         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
228         return 0;
229 }
230
231 /*
232   construct an event driven local ctdb_call
233
234   this is used so that locally processed ctdb_call requests are processed
235   in an event driven manner
236 */
237 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
238                                                                   struct ctdb_call *call,
239                                                                   struct ctdb_ltdb_header *header,
240                                                                   TDB_DATA *data)
241 {
242         struct ctdb_client_call_state *state;
243         struct ctdb_context *ctdb = ctdb_db->ctdb;
244         int ret;
245
246         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
247         CTDB_NO_MEMORY_NULL(ctdb, state);
248
249         talloc_steal(state, data->dptr);
250
251         state->state = CTDB_CALL_DONE;
252         state->call = *call;
253         state->ctdb_db = ctdb_db;
254
255         ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn);
256
257         return state;
258 }
259
260 /*
261   make a ctdb call to the local daemon - async send. Called from client context.
262
263   This constructs a ctdb_call request and queues it for processing. 
264   This call never blocks.
265 */
266 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
267                                        struct ctdb_call *call)
268 {
269         struct ctdb_client_call_state *state;
270         struct ctdb_context *ctdb = ctdb_db->ctdb;
271         struct ctdb_ltdb_header header;
272         TDB_DATA data;
273         int ret;
274         size_t len;
275         struct ctdb_req_call *c;
276
277         /* if the domain socket is not yet open, open it */
278         if (ctdb->daemon.sd==-1) {
279                 ctdb_socket_connect(ctdb);
280         }
281
282         ret = ctdb_ltdb_lock(ctdb_db, call->key);
283         if (ret != 0) {
284                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
285                 return NULL;
286         }
287
288         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
289
290         if (ret == 0 && header.dmaster == ctdb->vnn) {
291                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
292                 talloc_free(data.dptr);
293                 ctdb_ltdb_unlock(ctdb_db, call->key);
294                 return state;
295         }
296
297         ctdb_ltdb_unlock(ctdb_db, call->key);
298         talloc_free(data.dptr);
299
300         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
301         if (state == NULL) {
302                 DEBUG(0, (__location__ " failed to allocate state\n"));
303                 return NULL;
304         }
305
306         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
307         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
308         if (c == NULL) {
309                 DEBUG(0, (__location__ " failed to allocate packet\n"));
310                 return NULL;
311         }
312
313         state->reqid     = ctdb_reqid_new(ctdb, state);
314         state->ctdb_db = ctdb_db;
315         talloc_set_destructor(state, ctdb_client_call_destructor);
316
317         c->hdr.reqid     = state->reqid;
318         c->flags         = call->flags;
319         c->db_id         = ctdb_db->db_id;
320         c->callid        = call->call_id;
321         c->hopcount      = 0;
322         c->keylen        = call->key.dsize;
323         c->calldatalen   = call->call_data.dsize;
324         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
325         memcpy(&c->data[call->key.dsize], 
326                call->call_data.dptr, call->call_data.dsize);
327         state->call                = *call;
328         state->call.call_data.dptr = &c->data[call->key.dsize];
329         state->call.key.dptr       = &c->data[0];
330
331         state->state  = CTDB_CALL_WAIT;
332
333
334         ctdb_client_queue_pkt(ctdb, &c->hdr);
335
336         return state;
337 }
338
339
340 /*
341   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
342 */
343 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
344 {
345         struct ctdb_client_call_state *state;
346
347         state = ctdb_call_send(ctdb_db, call);
348         return ctdb_call_recv(state, call);
349 }
350
351
352 /*
353   tell the daemon what messaging srvid we will use, and register the message
354   handler function in the client
355 */
356 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
357                              ctdb_message_fn_t handler,
358                              void *private_data)
359                                     
360 {
361         int res;
362         int32_t status;
363         
364         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
365                            tdb_null, NULL, NULL, &status, NULL, NULL);
366         if (res != 0 || status != 0) {
367                 DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));
368                 return -1;
369         }
370
371         /* also need to register the handler with our own ctdb structure */
372         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
373 }
374
375 /*
376   tell the daemon we no longer want a srvid
377 */
378 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
379 {
380         int res;
381         int32_t status;
382         
383         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
384                            tdb_null, NULL, NULL, &status, NULL, NULL);
385         if (res != 0 || status != 0) {
386                 DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
387                 return -1;
388         }
389
390         /* also need to register the handler with our own ctdb structure */
391         ctdb_deregister_message_handler(ctdb, srvid, private_data);
392         return 0;
393 }
394
395
396 /*
397   send a message - from client context
398  */
399 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
400                       uint64_t srvid, TDB_DATA data)
401 {
402         struct ctdb_req_message *r;
403         int len, res;
404
405         len = offsetof(struct ctdb_req_message, data) + data.dsize;
406         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
407                                len, struct ctdb_req_message);
408         CTDB_NO_MEMORY(ctdb, r);
409
410         r->hdr.destnode  = vnn;
411         r->srvid         = srvid;
412         r->datalen       = data.dsize;
413         memcpy(&r->data[0], data.dptr, data.dsize);
414         
415         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
416         if (res != 0) {
417                 return res;
418         }
419
420         talloc_free(r);
421         return 0;
422 }
423
424
425 /*
426   cancel a ctdb_fetch_lock operation, releasing the lock
427  */
428 static int fetch_lock_destructor(struct ctdb_record_handle *h)
429 {
430         ctdb_ltdb_unlock(h->ctdb_db, h->key);
431         return 0;
432 }
433
434 /*
435   force the migration of a record to this node
436  */
437 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
438 {
439         struct ctdb_call call;
440         ZERO_STRUCT(call);
441         call.call_id = CTDB_NULL_FUNC;
442         call.key = key;
443         call.flags = CTDB_IMMEDIATE_MIGRATION;
444         return ctdb_call(ctdb_db, &call);
445 }
446
447 /*
448   get a lock on a record, and return the records data. Blocks until it gets the lock
449  */
450 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
451                                            TDB_DATA key, TDB_DATA *data)
452 {
453         int ret;
454         struct ctdb_record_handle *h;
455
456         /*
457           procedure is as follows:
458
459           1) get the chain lock. 
460           2) check if we are dmaster
461           3) if we are the dmaster then return handle 
462           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
463              reply from ctdbd
464           5) when we get the reply, goto (1)
465          */
466
467         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
468         if (h == NULL) {
469                 return NULL;
470         }
471
472         h->ctdb_db = ctdb_db;
473         h->key     = key;
474         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
475         if (h->key.dptr == NULL) {
476                 talloc_free(h);
477                 return NULL;
478         }
479         h->data    = data;
480
481         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
482                  (const char *)key.dptr));
483
484 again:
485         /* step 1 - get the chain lock */
486         ret = ctdb_ltdb_lock(ctdb_db, key);
487         if (ret != 0) {
488                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
489                 talloc_free(h);
490                 return NULL;
491         }
492
493         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
494
495         talloc_set_destructor(h, fetch_lock_destructor);
496
497         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
498
499         /* when torturing, ensure we test the remote path */
500         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
501             random() % 5 == 0) {
502                 h->header.dmaster = (uint32_t)-1;
503         }
504
505
506         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
507
508         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->vnn) {
509                 ctdb_ltdb_unlock(ctdb_db, key);
510                 ret = ctdb_client_force_migration(ctdb_db, key);
511                 if (ret != 0) {
512                         DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
513                         talloc_free(h);
514                         return NULL;
515                 }
516                 goto again;
517         }
518
519         DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
520         return h;
521 }
522
523 /*
524   store some data to the record that was locked with ctdb_fetch_lock()
525 */
526 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
527 {
528         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
529 }
530
531 struct ctdb_client_control_state {
532         struct ctdb_context *ctdb;
533         uint32_t reqid;
534         int32_t status;
535         TDB_DATA outdata;
536         enum call_state state;
537         char *errormsg;
538 };
539
540 /*
541   called when a CTDB_REPLY_CONTROL packet comes in in the client
542
543   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
544   contains any reply data from the control
545 */
546 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
547                                       struct ctdb_req_header *hdr)
548 {
549         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
550         struct ctdb_client_control_state *state;
551
552         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
553         if (state == NULL) {
554                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
555                 return;
556         }
557
558         if (hdr->reqid != state->reqid) {
559                 /* we found a record  but it was the wrong one */
560                 DEBUG(0, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
561                 return;
562         }
563
564         state->outdata.dptr = c->data;
565         state->outdata.dsize = c->datalen;
566         state->status = c->status;
567         if (c->errorlen) {
568                 state->errormsg = talloc_strndup(state, 
569                                                  (char *)&c->data[c->datalen], 
570                                                  c->errorlen);
571         }
572
573         talloc_steal(state, c);
574
575         state->state = CTDB_CALL_DONE;
576 }
577
578
579 /* time out handler for ctdb_control */
580 static void timeout_func(struct event_context *ev, struct timed_event *te, 
581         struct timeval t, void *private_data)
582 {
583         uint32_t *timed_out = (uint32_t *)private_data;
584
585         *timed_out = 1;
586 }
587
588 /*
589   destroy a ctdb_control in client
590 */
591 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
592 {
593         ctdb_reqid_remove(state->ctdb, state->reqid);
594         return 0;
595 }
596
597 /*
598   send a ctdb control message
599   timeout specifies how long we should wait for a reply.
600   if timeout is NULL we wait indefinitely
601  */
602 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
603                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
604                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
605                  struct timeval *timeout,
606                  char **errormsg)
607 {
608         struct ctdb_client_control_state *state;
609         struct ctdb_req_control *c;
610         size_t len;
611         int ret;
612         uint32_t timed_out;
613
614         if (errormsg) {
615                 *errormsg = NULL;
616         }
617
618         /* if the domain socket is not yet open, open it */
619         if (ctdb->daemon.sd==-1) {
620                 ctdb_socket_connect(ctdb);
621         }
622
623         state = talloc_zero(ctdb, struct ctdb_client_control_state);
624         CTDB_NO_MEMORY(ctdb, state);
625
626         state->ctdb  = ctdb;
627         state->reqid = ctdb_reqid_new(ctdb, state);
628         state->state = CTDB_CALL_WAIT;
629         state->errormsg = NULL;
630
631         talloc_set_destructor(state, ctdb_control_destructor);
632
633         len = offsetof(struct ctdb_req_control, data) + data.dsize;
634         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
635                                len, struct ctdb_req_control);
636         CTDB_NO_MEMORY(ctdb, c);
637         
638         c->hdr.reqid        = state->reqid;
639         c->hdr.destnode     = destnode;
640         c->hdr.reqid        = state->reqid;
641         c->opcode           = opcode;
642         c->client_id        = 0;
643         c->flags            = flags;
644         c->srvid            = srvid;
645         c->datalen          = data.dsize;
646         if (data.dsize) {
647                 memcpy(&c->data[0], data.dptr, data.dsize);
648         }
649
650         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
651         if (ret != 0) {
652                 talloc_free(state);
653                 return -1;
654         }
655
656         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
657                 talloc_free(state);
658                 return 0;
659         }
660
661         /* semi-async operation */
662         timed_out = 0;
663         if (timeout && !timeval_is_zero(timeout)) {
664                 event_add_timed(ctdb->ev, state, *timeout, timeout_func, &timed_out);
665         }
666         while ((state->state == CTDB_CALL_WAIT)
667         &&      (timed_out == 0) ){
668                 event_loop_once(ctdb->ev);
669         }
670         if (timed_out) {
671                 talloc_free(state);
672                 if (errormsg) {
673                         (*errormsg) = talloc_strdup(mem_ctx, "control timed out");
674                 } else {
675                         DEBUG(0,("ctdb_control timed out\n"));
676                 }
677                 return -1;
678         }
679
680         if (outdata) {
681                 *outdata = state->outdata;
682                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
683         }
684
685         *status = state->status;
686
687         if (!errormsg && state->errormsg) {
688                 DEBUG(0,("ctdb_control error: '%s'\n", state->errormsg));
689         }
690
691         if (errormsg && state->errormsg) {
692                 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
693         }
694
695         talloc_free(state);
696
697         return 0;       
698 }
699
700
701
702 /*
703   a process exists call. Returns 0 if process exists, -1 otherwise
704  */
705 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
706 {
707         int ret;
708         TDB_DATA data;
709         int32_t status;
710
711         data.dptr = (uint8_t*)&pid;
712         data.dsize = sizeof(pid);
713
714         ret = ctdb_control(ctdb, destnode, 0, 
715                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
716                            NULL, NULL, &status, NULL, NULL);
717         if (ret != 0) {
718                 DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
719                 return -1;
720         }
721
722         return status;
723 }
724
725 /*
726   get remote statistics
727  */
728 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
729 {
730         int ret;
731         TDB_DATA data;
732         int32_t res;
733
734         ret = ctdb_control(ctdb, destnode, 0, 
735                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
736                            ctdb, &data, &res, NULL, NULL);
737         if (ret != 0 || res != 0) {
738                 DEBUG(0,(__location__ " ctdb_control for statistics failed\n"));
739                 return -1;
740         }
741
742         if (data.dsize != sizeof(struct ctdb_statistics)) {
743                 DEBUG(0,(__location__ " Wrong statistics size %u - expected %u\n",
744                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
745                       return -1;
746         }
747
748         *status = *(struct ctdb_statistics *)data.dptr;
749         talloc_free(data.dptr);
750                         
751         return 0;
752 }
753
754 /*
755   shutdown a remote ctdb node
756  */
757 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
758 {
759         int ret;
760         int32_t res;
761
762         ret = ctdb_control(ctdb, destnode, 0, 
763                            CTDB_CONTROL_SHUTDOWN, CTDB_CTRL_FLAG_NOREPLY, tdb_null, 
764                            NULL, NULL, &res, &timeout, NULL);
765         if (ret != 0) {
766                 DEBUG(0,(__location__ " ctdb_control for shutdown failed\n"));
767                 return -1;
768         }
769
770         return 0;
771 }
772
773 /*
774   get vnn map from a remote node
775  */
776 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
777 {
778         int ret;
779         TDB_DATA outdata;
780         int32_t res;
781         struct ctdb_vnn_map_wire *map;
782
783         ret = ctdb_control(ctdb, destnode, 0, 
784                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
785                            mem_ctx, &outdata, &res, &timeout, NULL);
786         if (ret != 0 || res != 0) {
787                 DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
788                 return -1;
789         }
790         
791         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
792         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
793             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
794                 DEBUG(0,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
795                 return -1;
796         }
797
798         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
799         CTDB_NO_MEMORY(ctdb, *vnnmap);
800         (*vnnmap)->generation = map->generation;
801         (*vnnmap)->size       = map->size;
802         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
803
804         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
805         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
806         talloc_free(outdata.dptr);
807                     
808         return 0;
809 }
810
811 /*
812   get the recovery mode of a remote node
813  */
814 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
815 {
816         int ret;
817         int32_t res;
818
819         ret = ctdb_control(ctdb, destnode, 0, 
820                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
821                            NULL, NULL, &res, &timeout, NULL);
822         if (ret != 0) {
823                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
824                 return -1;
825         }
826
827         *recmode = res;
828
829         return 0;
830 }
831
832 /*
833   set the recovery mode of a remote node
834  */
835 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
836 {
837         int ret;
838         TDB_DATA data;
839         int32_t res;
840
841         data.dsize = sizeof(uint32_t);
842         data.dptr = (unsigned char *)&recmode;
843
844         ret = ctdb_control(ctdb, destnode, 0, 
845                            CTDB_CONTROL_SET_RECMODE, 0, data, 
846                            NULL, NULL, &res, &timeout, NULL);
847         if (ret != 0 || res != 0) {
848                 DEBUG(0,(__location__ " ctdb_control for setrecmode failed\n"));
849                 return -1;
850         }
851
852         return 0;
853 }
854
855 /*
856   get the recovery master of a remote node
857  */
858 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
859 {
860         int ret;
861         int32_t res;
862
863         ret = ctdb_control(ctdb, destnode, 0, 
864                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
865                            NULL, NULL, &res, &timeout, NULL);
866         if (ret != 0) {
867                 DEBUG(0,(__location__ " ctdb_control for getrecmaster failed\n"));
868                 return -1;
869         }
870
871         *recmaster = res;
872
873         return 0;
874 }
875
876 /*
877   set the recovery master of a remote node
878  */
879 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
880 {
881         int ret;
882         TDB_DATA data;
883         int32_t res;
884
885         ZERO_STRUCT(data);
886         data.dsize = sizeof(uint32_t);
887         data.dptr = (unsigned char *)&recmaster;
888
889         ret = ctdb_control(ctdb, destnode, 0, 
890                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
891                            NULL, NULL, &res, &timeout, NULL);
892         if (ret != 0 || res != 0) {
893                 DEBUG(0,(__location__ " ctdb_control for setrecmaster failed\n"));
894                 return -1;
895         }
896
897         return 0;
898 }
899
900
901 /*
902   get a list of databases off a remote node
903  */
904 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
905                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
906 {
907         int ret;
908         TDB_DATA outdata;
909         int32_t res;
910
911         ret = ctdb_control(ctdb, destnode, 0, 
912                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
913                            mem_ctx, &outdata, &res, &timeout, NULL);
914         if (ret != 0 || res != 0) {
915                 DEBUG(0,(__location__ " ctdb_control for getdbmap failed\n"));
916                 return -1;
917         }
918
919         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
920         talloc_free(outdata.dptr);
921                     
922         return 0;
923 }
924
925
926 /*
927   get a list of nodes (vnn and flags ) from a remote node
928  */
929 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
930                 struct timeval timeout, uint32_t destnode, 
931                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
932 {
933         int ret;
934         TDB_DATA outdata;
935         int32_t res;
936
937         ret = ctdb_control(ctdb, destnode, 0, 
938                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
939                            mem_ctx, &outdata, &res, &timeout, NULL);
940         if (ret != 0 || res != 0) {
941                 DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
942                 return -1;
943         }
944
945         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
946         talloc_free(outdata.dptr);
947                     
948         return 0;
949 }
950
951 /*
952   set vnn map on a node
953  */
954 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
955                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
956 {
957         int ret;
958         TDB_DATA data;
959         int32_t res;
960         struct ctdb_vnn_map_wire *map;
961         size_t len;
962
963         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
964         map = talloc_size(mem_ctx, len);
965         CTDB_NO_MEMORY_VOID(ctdb, map);
966
967         map->generation = vnnmap->generation;
968         map->size = vnnmap->size;
969         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
970         
971         data.dsize = len;
972         data.dptr  = (uint8_t *)map;
973
974         ret = ctdb_control(ctdb, destnode, 0, 
975                            CTDB_CONTROL_SETVNNMAP, 0, data, 
976                            NULL, NULL, &res, &timeout, NULL);
977         if (ret != 0 || res != 0) {
978                 DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
979                 return -1;
980         }
981
982         talloc_free(map);
983
984         return 0;
985 }
986
987 /*
988   get all keys and records for a specific database
989  */
990 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster, 
991                      TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
992 {
993         int i, ret;
994         TDB_DATA indata, outdata;
995         struct ctdb_control_pulldb pull;
996         struct ctdb_control_pulldb_reply *reply;
997         struct ctdb_rec_data *rec;
998         int32_t res;
999
1000         pull.db_id   = dbid;
1001         pull.lmaster = lmaster;
1002
1003         indata.dsize = sizeof(struct ctdb_control_pulldb);
1004         indata.dptr  = (unsigned char *)&pull;
1005
1006         ret = ctdb_control(ctdb, destnode, 0, 
1007                            CTDB_CONTROL_PULL_DB, 0, indata, 
1008                            mem_ctx, &outdata, &res, NULL, NULL);
1009         if (ret != 0 || res != 0) {
1010                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1011                 return -1;
1012         }
1013
1014
1015         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
1016         keys->dbid     = reply->db_id;
1017         keys->num      = reply->count;
1018         
1019         keys->keys     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1020         keys->headers  = talloc_array(mem_ctx, struct ctdb_ltdb_header, keys->num);
1021         keys->data     = talloc_array(mem_ctx, TDB_DATA, keys->num);
1022
1023         rec = (struct ctdb_rec_data *)&reply->data[0];
1024
1025         for (i=0;i<reply->count;i++) {
1026                 keys->keys[i].dptr = talloc_memdup(mem_ctx, &rec->data[0], rec->keylen);
1027                 keys->keys[i].dsize = rec->keylen;
1028                 
1029                 keys->data[i].dptr = talloc_memdup(mem_ctx, &rec->data[keys->keys[i].dsize], rec->datalen);
1030                 keys->data[i].dsize = rec->datalen;
1031
1032                 if (keys->data[i].dsize < sizeof(struct ctdb_ltdb_header)) {
1033                         DEBUG(0,(__location__ " bad ltdb record\n"));
1034                         return -1;
1035                 }
1036                 memcpy(&keys->headers[i], keys->data[i].dptr, sizeof(struct ctdb_ltdb_header));
1037                 keys->data[i].dptr += sizeof(struct ctdb_ltdb_header);
1038                 keys->data[i].dsize -= sizeof(struct ctdb_ltdb_header);
1039
1040                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1041         }           
1042
1043         talloc_free(outdata.dptr);
1044
1045         return 0;
1046 }
1047
1048 /*
1049   copy a tdb from one node to another node
1050  */
1051 int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t sourcenode, 
1052                      uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
1053 {
1054         int ret;
1055         TDB_DATA indata, outdata;
1056         int32_t res;
1057
1058         indata.dsize = 2*sizeof(uint32_t);
1059         indata.dptr  = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1060
1061         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1062         ((uint32_t *)(&indata.dptr[0]))[1] = lmaster;
1063
1064         ret = ctdb_control(ctdb, sourcenode, 0, 
1065                            CTDB_CONTROL_PULL_DB, 0, indata, 
1066                            mem_ctx, &outdata, &res, &timeout, NULL);
1067         if (ret != 0 || res != 0) {
1068                 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1069                 return -1;
1070         }
1071
1072         ret = ctdb_control(ctdb, destnode, 0, 
1073                            CTDB_CONTROL_PUSH_DB, 0, outdata, 
1074                            mem_ctx, NULL, &res, &timeout, NULL);
1075         talloc_free(outdata.dptr);
1076         if (ret != 0 || res != 0) {
1077                 DEBUG(0,(__location__ " ctdb_control for pushdb failed\n"));
1078                 return -1;
1079         }
1080
1081         return 0;
1082 }
1083
1084 /*
1085   change dmaster for all keys in the database to the new value
1086  */
1087 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1088                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1089 {
1090         int ret;
1091         TDB_DATA indata;
1092         int32_t res;
1093
1094         indata.dsize = 2*sizeof(uint32_t);
1095         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1096
1097         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1098         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1099
1100         ret = ctdb_control(ctdb, destnode, 0, 
1101                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1102                            NULL, NULL, &res, &timeout, NULL);
1103         if (ret != 0 || res != 0) {
1104                 DEBUG(0,(__location__ " ctdb_control for setdmaster failed\n"));
1105                 return -1;
1106         }
1107
1108         return 0;
1109 }
1110
1111 /*
1112   ping a node, return number of clients connected
1113  */
1114 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1115 {
1116         int ret;
1117         int32_t res;
1118
1119         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1120                            tdb_null, NULL, NULL, &res, NULL, NULL);
1121         if (ret != 0) {
1122                 return -1;
1123         }
1124         return res;
1125 }
1126
1127 /*
1128   find the real path to a ltdb 
1129  */
1130 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1131                    const char **path)
1132 {
1133         int ret;
1134         int32_t res;
1135         TDB_DATA data;
1136
1137         data.dptr = (uint8_t *)&dbid;
1138         data.dsize = sizeof(dbid);
1139
1140         ret = ctdb_control(ctdb, destnode, 0, 
1141                            CTDB_CONTROL_GETDBPATH, 0, data, 
1142                            mem_ctx, &data, &res, &timeout, NULL);
1143         if (ret != 0 || res != 0) {
1144                 return -1;
1145         }
1146
1147         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1148         if ((*path) == NULL) {
1149                 return -1;
1150         }
1151
1152         talloc_free(data.dptr);
1153
1154         return 0;
1155 }
1156
1157 /*
1158   find the name of a db 
1159  */
1160 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1161                    const char **name)
1162 {
1163         int ret;
1164         int32_t res;
1165         TDB_DATA data;
1166
1167         data.dptr = (uint8_t *)&dbid;
1168         data.dsize = sizeof(dbid);
1169
1170         ret = ctdb_control(ctdb, destnode, 0, 
1171                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1172                            mem_ctx, &data, &res, &timeout, NULL);
1173         if (ret != 0 || res != 0) {
1174                 return -1;
1175         }
1176
1177         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1178         if ((*name) == NULL) {
1179                 return -1;
1180         }
1181
1182         talloc_free(data.dptr);
1183
1184         return 0;
1185 }
1186
1187 /*
1188   create a database
1189  */
1190 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name)
1191 {
1192         int ret;
1193         int32_t res;
1194         TDB_DATA data;
1195
1196         data.dptr = discard_const(name);
1197         data.dsize = strlen(name)+1;
1198
1199         ret = ctdb_control(ctdb, destnode, 0, 
1200                            CTDB_CONTROL_DB_ATTACH, 0, data, 
1201                            mem_ctx, &data, &res, &timeout, NULL);
1202
1203         if (ret != 0 || res != 0) {
1204                 return -1;
1205         }
1206
1207         return 0;
1208 }
1209
1210 /*
1211   get debug level on a node
1212  */
1213 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *level)
1214 {
1215         int ret;
1216         int32_t res;
1217         TDB_DATA data;
1218
1219         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1220                            ctdb, &data, &res, NULL, NULL);
1221         if (ret != 0 || res != 0) {
1222                 return -1;
1223         }
1224         if (data.dsize != sizeof(uint32_t)) {
1225                 DEBUG(0,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1226                          (unsigned)data.dsize));
1227                 return -1;
1228         }
1229         *level = *(uint32_t *)data.dptr;
1230         talloc_free(data.dptr);
1231         return 0;
1232 }
1233
1234 /*
1235   set debug level on a node
1236  */
1237 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t level)
1238 {
1239         int ret;
1240         int32_t res;
1241         TDB_DATA data;
1242
1243         data.dptr = (uint8_t *)&level;
1244         data.dsize = sizeof(level);
1245
1246         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1247                            NULL, NULL, &res, NULL, NULL);
1248         if (ret != 0 || res != 0) {
1249                 return -1;
1250         }
1251         return 0;
1252 }
1253
1254
1255 /*
1256   get a list of connected nodes
1257  */
1258 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1259                                 struct timeval timeout,
1260                                 TALLOC_CTX *mem_ctx,
1261                                 uint32_t *num_nodes)
1262 {
1263         struct ctdb_node_map *map=NULL;
1264         int ret, i;
1265         uint32_t *nodes;
1266
1267         *num_nodes = 0;
1268
1269         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1270         if (ret != 0) {
1271                 return NULL;
1272         }
1273
1274         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1275         if (nodes == NULL) {
1276                 return NULL;
1277         }
1278
1279         for (i=0;i<map->num;i++) {
1280                 if (map->nodes[i].flags & NODE_FLAGS_CONNECTED) {
1281                         nodes[*num_nodes] = map->nodes[i].vnn;
1282                         (*num_nodes)++;
1283                 }
1284         }
1285
1286         return nodes;
1287 }
1288
1289
1290 /*
1291   reset remote status
1292  */
1293 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1294 {
1295         int ret;
1296         int32_t res;
1297
1298         ret = ctdb_control(ctdb, destnode, 0, 
1299                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1300                            NULL, NULL, &res, NULL, NULL);
1301         if (ret != 0 || res != 0) {
1302                 DEBUG(0,(__location__ " ctdb_control for reset statistics failed\n"));
1303                 return -1;
1304         }
1305         return 0;
1306 }
1307
1308
1309 /*
1310   attach to a specific database - client call
1311 */
1312 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
1313 {
1314         struct ctdb_db_context *ctdb_db;
1315         TDB_DATA data;
1316         int ret;
1317         int32_t res;
1318
1319         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1320         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1321
1322         ctdb_db->ctdb = ctdb;
1323         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1324         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1325
1326         data.dptr = discard_const(name);
1327         data.dsize = strlen(name)+1;
1328
1329         /* tell ctdb daemon to attach */
1330         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
1331                            0, data, ctdb_db, &data, &res, NULL, NULL);
1332         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1333                 DEBUG(0,("Failed to attach to database '%s'\n", name));
1334                 talloc_free(ctdb_db);
1335                 return NULL;
1336         }
1337         
1338         ctdb_db->db_id = *(uint32_t *)data.dptr;
1339         talloc_free(data.dptr);
1340
1341         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1342         if (ret != 0) {
1343                 DEBUG(0,("Failed to get dbpath for database '%s'\n", name));
1344                 talloc_free(ctdb_db);
1345                 return NULL;
1346         }
1347
1348         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 0, O_RDWR, 0);
1349         if (ctdb_db->ltdb == NULL) {
1350                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1351                 talloc_free(ctdb_db);
1352                 return NULL;
1353         }
1354
1355         DLIST_ADD(ctdb->db_list, ctdb_db);
1356
1357         return ctdb_db;
1358 }
1359
1360
1361 /*
1362   setup a call for a database
1363  */
1364 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1365 {
1366         TDB_DATA data;
1367         int32_t status;
1368         struct ctdb_control_set_call c;
1369         int ret;
1370         struct ctdb_registered_call *call;
1371
1372         c.db_id = ctdb_db->db_id;
1373         c.fn    = fn;
1374         c.id    = id;
1375
1376         data.dptr = (uint8_t *)&c;
1377         data.dsize = sizeof(c);
1378
1379         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1380                            data, NULL, NULL, &status, NULL, NULL);
1381         if (ret != 0 || status != 0) {
1382                 DEBUG(0,("ctdb_set_call failed for call %u\n", id));
1383                 return -1;
1384         }
1385
1386         /* also register locally */
1387         call = talloc(ctdb_db, struct ctdb_registered_call);
1388         call->fn = fn;
1389         call->id = id;
1390
1391         DLIST_ADD(ctdb_db->calls, call);        
1392         return 0;
1393 }
1394
1395
1396 struct traverse_state {
1397         bool done;
1398         uint32_t count;
1399         ctdb_traverse_func fn;
1400         void *private_data;
1401 };
1402
1403 /*
1404   called on each key during a ctdb_traverse
1405  */
1406 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1407 {
1408         struct traverse_state *state = (struct traverse_state *)p;
1409         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1410         TDB_DATA key;
1411
1412         if (data.dsize < sizeof(uint32_t) ||
1413             d->length != data.dsize) {
1414                 DEBUG(0,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1415                 state->done = True;
1416                 return;
1417         }
1418
1419         key.dsize = d->keylen;
1420         key.dptr  = &d->data[0];
1421         data.dsize = d->datalen;
1422         data.dptr = &d->data[d->keylen];
1423
1424         if (key.dsize == 0 && data.dsize == 0) {
1425                 /* end of traverse */
1426                 state->done = True;
1427                 return;
1428         }
1429
1430         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1431                 state->done = True;
1432         }
1433
1434         state->count++;
1435 }
1436
1437
1438 /*
1439   start a cluster wide traverse, calling the supplied fn on each record
1440   return the number of records traversed, or -1 on error
1441  */
1442 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1443 {
1444         TDB_DATA data;
1445         struct ctdb_traverse_start t;
1446         int32_t status;
1447         int ret;
1448         uint64_t srvid = (getpid() | 0xFLL<<60);
1449         struct traverse_state state;
1450
1451         state.done = False;
1452         state.count = 0;
1453         state.private_data = private_data;
1454         state.fn = fn;
1455
1456         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1457         if (ret != 0) {
1458                 DEBUG(0,("Failed to setup traverse handler\n"));
1459                 return -1;
1460         }
1461
1462         t.db_id = ctdb_db->db_id;
1463         t.srvid = srvid;
1464         t.reqid = 0;
1465
1466         data.dptr = (uint8_t *)&t;
1467         data.dsize = sizeof(t);
1468
1469         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1470                            data, NULL, NULL, &status, NULL, NULL);
1471         if (ret != 0 || status != 0) {
1472                 DEBUG(0,("ctdb_traverse_all failed\n"));
1473                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1474                 return -1;
1475         }
1476
1477         while (!state.done) {
1478                 event_loop_once(ctdb_db->ctdb->ev);
1479         }
1480
1481         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1482         if (ret != 0) {
1483                 DEBUG(0,("Failed to remove ctdb_traverse handler\n"));
1484                 return -1;
1485         }
1486
1487         return state.count;
1488 }
1489
1490 /*
1491   called on each key during a catdb
1492  */
1493 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1494 {
1495         FILE *f = (FILE *)p;
1496         char *keystr, *datastr;
1497         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1498
1499         keystr  = hex_encode(ctdb, key.dptr, key.dsize);
1500         datastr = hex_encode(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
1501
1502         fprintf(f, "dmaster: %u\n", h->dmaster);
1503         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1504         fprintf(f, "key: %s\ndata: %s\n", keystr, datastr);
1505
1506         talloc_free(keystr);
1507         talloc_free(datastr);
1508         return 0;
1509 }
1510
1511 /*
1512   convenience function to list all keys to stdout
1513  */
1514 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1515 {
1516         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1517 }
1518
1519 /*
1520   get the pid of a ctdb daemon
1521  */
1522 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1523 {
1524         int ret;
1525         int32_t res;
1526
1527         ret = ctdb_control(ctdb, destnode, 0, 
1528                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1529                            NULL, NULL, &res, &timeout, NULL);
1530         if (ret != 0) {
1531                 DEBUG(0,(__location__ " ctdb_control for getpid failed\n"));
1532                 return -1;
1533         }
1534
1535         *pid = res;
1536
1537         return 0;
1538 }
1539
1540
1541 /*
1542   freeze a node
1543  */
1544 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1545 {
1546         int ret;
1547         int32_t res;
1548
1549         ret = ctdb_control(ctdb, destnode, 0, 
1550                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1551                            NULL, NULL, &res, &timeout, NULL);
1552         if (ret != 0 || res != 0) {
1553                 DEBUG(0,(__location__ " ctdb_control freeze failed\n"));
1554                 return -1;
1555         }
1556
1557         return 0;
1558 }
1559
1560 /*
1561   thaw a node
1562  */
1563 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1564 {
1565         int ret;
1566         int32_t res;
1567
1568         ret = ctdb_control(ctdb, destnode, 0, 
1569                            CTDB_CONTROL_THAW, 0, tdb_null, 
1570                            NULL, NULL, &res, &timeout, NULL);
1571         if (ret != 0 || res != 0) {
1572                 DEBUG(0,(__location__ " ctdb_control thaw failed\n"));
1573                 return -1;
1574         }
1575
1576         return 0;
1577 }
1578
1579 /*
1580   get vnn of a node, or -1
1581  */
1582 int ctdb_ctrl_getvnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1583 {
1584         int ret;
1585         int32_t res;
1586
1587         ret = ctdb_control(ctdb, destnode, 0, 
1588                            CTDB_CONTROL_GET_VNN, 0, tdb_null, 
1589                            NULL, NULL, &res, &timeout, NULL);
1590         if (ret != 0) {
1591                 DEBUG(0,(__location__ " ctdb_control for getvnn failed\n"));
1592                 return -1;
1593         }
1594
1595         return res;
1596 }
1597
1598 /*
1599   set the monitoring mode of a remote node
1600  */
1601 int ctdb_ctrl_setmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t monmode)
1602 {
1603         int ret;
1604         TDB_DATA data;
1605         int32_t res;
1606
1607         data.dsize = sizeof(uint32_t);
1608         data.dptr = (uint8_t *)&monmode;
1609
1610         ret = ctdb_control(ctdb, destnode, 0, 
1611                            CTDB_CONTROL_SET_MONMODE, 0, data, 
1612                            NULL, NULL, &res, &timeout, NULL);
1613         if (ret != 0 || res != 0) {
1614                 DEBUG(0,(__location__ " ctdb_control for setmonmode failed\n"));
1615                 return -1;
1616         }
1617
1618         return 0;
1619 }
1620
1621 /*
1622   get the monitoring mode of a remote node
1623  */
1624 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1625 {
1626         int ret;
1627         int32_t res;
1628
1629         ret = ctdb_control(ctdb, destnode, 0, 
1630                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1631                            NULL, NULL, &res, &timeout, NULL);
1632         if (ret != 0) {
1633                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
1634                 return -1;
1635         }
1636
1637         *monmode = res;
1638
1639         return 0;
1640 }
1641
1642
1643 /*
1644   get maximum rsn for a db on a node
1645  */
1646 int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1647                           uint32_t destnode, uint32_t db_id, uint64_t *max_rsn)
1648 {
1649         TDB_DATA data, outdata;
1650         int ret;
1651         int32_t res;
1652
1653         data.dptr = (uint8_t *)&db_id;
1654         data.dsize = sizeof(db_id);
1655
1656         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_MAX_RSN, 0, data, ctdb,
1657                            &outdata, &res, &timeout, NULL);
1658         if (ret != 0 || res != 0 || outdata.dsize != sizeof(uint64_t)) {
1659                 DEBUG(0,(__location__ " ctdb_control for get_max_rsn failed\n"));
1660                 return -1;
1661         }
1662
1663         *max_rsn = *(uint64_t *)outdata.dptr;
1664         talloc_free(outdata.dptr);
1665
1666         return 0;       
1667 }
1668
1669 /*
1670   set the rsn on non-empty records to the given rsn
1671  */
1672 int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
1673                                uint32_t destnode, uint32_t db_id, uint64_t rsn)
1674 {
1675         TDB_DATA data;
1676         int ret;
1677         int32_t res;
1678         struct ctdb_control_set_rsn_nonempty p;
1679
1680         p.db_id = db_id;
1681         p.rsn = rsn;
1682
1683         data.dptr = (uint8_t *)&p;
1684         data.dsize = sizeof(p);
1685
1686         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, NULL,
1687                            NULL, &res, &timeout, NULL);
1688         if (ret != 0 || res != 0) {
1689                 DEBUG(0,(__location__ " ctdb_control for set_rsn_nonempty failed\n"));
1690                 return -1;
1691         }
1692
1693         return 0;       
1694 }
1695
1696 /*
1697   delete records which have a rsn below the given rsn
1698  */
1699 int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
1700                              uint32_t destnode, uint32_t db_id, uint64_t rsn)
1701 {
1702         TDB_DATA data;
1703         int ret;
1704         int32_t res;
1705         struct ctdb_control_delete_low_rsn p;
1706
1707         p.db_id = db_id;
1708         p.rsn = rsn;
1709
1710         data.dptr = (uint8_t *)&p;
1711         data.dsize = sizeof(p);
1712
1713         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, NULL,
1714                            NULL, &res, &timeout, NULL);
1715         if (ret != 0 || res != 0) {
1716                 DEBUG(0,(__location__ " ctdb_control for delete_low_rsn failed\n"));
1717                 return -1;
1718         }
1719
1720         return 0;       
1721 }
1722
1723 /* 
1724   sent to a node to make it take over an ip address
1725 */
1726 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1727                           uint32_t destnode, struct ctdb_public_ip *ip)
1728 {
1729         TDB_DATA data;
1730         int ret;
1731         int32_t res;
1732
1733         data.dsize = sizeof(*ip);
1734         data.dptr  = (uint8_t *)ip;
1735
1736         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1737                            NULL, &res, &timeout, NULL);
1738
1739         if (ret != 0 || res != 0) {
1740                 DEBUG(0,(__location__ " ctdb_control for takeover_ip failed\n"));
1741                 return -1;
1742         }
1743
1744         return 0;       
1745 }
1746
1747
1748 /* 
1749   sent to a node to make it release an ip address
1750 */
1751 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1752                          uint32_t destnode, struct ctdb_public_ip *ip)
1753 {
1754         TDB_DATA data;
1755         int ret;
1756         int32_t res;
1757
1758         data.dsize = sizeof(*ip);
1759         data.dptr  = (uint8_t *)ip;
1760
1761         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1762                            NULL, &res, &timeout, NULL);
1763
1764         if (ret != 0 || res != 0) {
1765                 DEBUG(0,(__location__ " ctdb_control for release_ip failed\n"));
1766                 return -1;
1767         }
1768
1769         return 0;       
1770 }
1771
1772
1773 /*
1774   get a tunable
1775  */
1776 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1777                           struct timeval timeout, 
1778                           uint32_t destnode,
1779                           const char *name, uint32_t *value)
1780 {
1781         struct ctdb_control_get_tunable *t;
1782         TDB_DATA data, outdata;
1783         int32_t res;
1784         int ret;
1785
1786         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1787         data.dptr  = talloc_size(ctdb, data.dsize);
1788         CTDB_NO_MEMORY(ctdb, data.dptr);
1789
1790         t = (struct ctdb_control_get_tunable *)data.dptr;
1791         t->length = strlen(name)+1;
1792         memcpy(t->name, name, t->length);
1793
1794         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1795                            &outdata, &res, &timeout, NULL);
1796         talloc_free(data.dptr);
1797         if (ret != 0 || res != 0) {
1798                 DEBUG(0,(__location__ " ctdb_control for get_tunable failed\n"));
1799                 return -1;
1800         }
1801
1802         if (outdata.dsize != sizeof(uint32_t)) {
1803                 DEBUG(0,("Invalid return data in get_tunable\n"));
1804                 talloc_free(outdata.dptr);
1805                 return -1;
1806         }
1807         
1808         *value = *(uint32_t *)outdata.dptr;
1809         talloc_free(outdata.dptr);
1810
1811         return 0;
1812 }
1813
1814 /*
1815   set a tunable
1816  */
1817 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1818                           struct timeval timeout, 
1819                           uint32_t destnode,
1820                           const char *name, uint32_t value)
1821 {
1822         struct ctdb_control_set_tunable *t;
1823         TDB_DATA data;
1824         int32_t res;
1825         int ret;
1826
1827         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1828         data.dptr  = talloc_size(ctdb, data.dsize);
1829         CTDB_NO_MEMORY(ctdb, data.dptr);
1830
1831         t = (struct ctdb_control_set_tunable *)data.dptr;
1832         t->length = strlen(name)+1;
1833         memcpy(t->name, name, t->length);
1834         t->value = value;
1835
1836         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1837                            NULL, &res, &timeout, NULL);
1838         talloc_free(data.dptr);
1839         if (ret != 0 || res != 0) {
1840                 DEBUG(0,(__location__ " ctdb_control for set_tunable failed\n"));
1841                 return -1;
1842         }
1843
1844         return 0;
1845 }
1846
1847 /*
1848   list tunables
1849  */
1850 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1851                             struct timeval timeout, 
1852                             uint32_t destnode,
1853                             TALLOC_CTX *mem_ctx,
1854                             const char ***list, uint32_t *count)
1855 {
1856         TDB_DATA outdata;
1857         int32_t res;
1858         int ret;
1859         struct ctdb_control_list_tunable *t;
1860         char *p, *s, *ptr;
1861
1862         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1863                            mem_ctx, &outdata, &res, &timeout, NULL);
1864         if (ret != 0 || res != 0) {
1865                 DEBUG(0,(__location__ " ctdb_control for list_tunables failed\n"));
1866                 return -1;
1867         }
1868
1869         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1870         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1871             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1872                 DEBUG(0,("Invalid data in list_tunables reply\n"));
1873                 talloc_free(outdata.dptr);
1874                 return -1;              
1875         }
1876         
1877         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1878         CTDB_NO_MEMORY(ctdb, p);
1879
1880         talloc_free(outdata.dptr);
1881         
1882         (*list) = NULL;
1883         (*count) = 0;
1884
1885         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1886                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1887                 CTDB_NO_MEMORY(ctdb, *list);
1888                 (*list)[*count] = talloc_strdup(*list, s);
1889                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1890                 (*count)++;
1891         }
1892
1893         talloc_free(p);
1894
1895         return 0;
1896 }
1897
1898
1899 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
1900                         struct timeval timeout, uint32_t destnode, 
1901                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1902 {
1903         int ret;
1904         TDB_DATA outdata;
1905         int32_t res;
1906
1907         ret = ctdb_control(ctdb, destnode, 0, 
1908                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
1909                            mem_ctx, &outdata, &res, &timeout, NULL);
1910         if (ret != 0 || res != 0) {
1911                 DEBUG(0,(__location__ " ctdb_control for getpublicips failed\n"));
1912                 return -1;
1913         }
1914
1915         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1916         talloc_free(outdata.dptr);
1917                     
1918         return 0;
1919 }
1920