ctdb-daemon: Rename struct ctdb_tcp_connection to ctdb_connection
[gd/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/locale.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/time.h"
35 #include "lib/util/debug.h"
36 #include "lib/util/samba_util.h"
37
38 #include "ctdb_logging.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
41
42 #include "common/reqid.h"
43 #include "common/system.h"
44 #include "common/common.h"
45
46 /*
47   allocate a packet for use in client<->daemon communication
48  */
49 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
50                                             TALLOC_CTX *mem_ctx, 
51                                             enum ctdb_operation operation, 
52                                             size_t length, size_t slength,
53                                             const char *type)
54 {
55         int size;
56         struct ctdb_req_header *hdr;
57
58         length = MAX(length, slength);
59         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
60
61         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
62         if (hdr == NULL) {
63                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
64                          operation, (unsigned)length));
65                 return NULL;
66         }
67         talloc_set_name_const(hdr, type);
68         hdr->length       = length;
69         hdr->operation    = operation;
70         hdr->ctdb_magic   = CTDB_MAGIC;
71         hdr->ctdb_version = CTDB_PROTOCOL;
72         hdr->srcnode      = ctdb->pnn;
73         if (ctdb->vnn_map) {
74                 hdr->generation = ctdb->vnn_map->generation;
75         }
76
77         return hdr;
78 }
79
80 /*
81   local version of ctdb_call
82 */
83 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
84                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
85                     TDB_DATA *data, bool updatetdb)
86 {
87         struct ctdb_call_info *c;
88         struct ctdb_registered_call *fn;
89         struct ctdb_context *ctdb = ctdb_db->ctdb;
90         
91         c = talloc(ctdb, struct ctdb_call_info);
92         CTDB_NO_MEMORY(ctdb, c);
93
94         c->key = call->key;
95         c->call_data = &call->call_data;
96         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
97         c->record_data.dsize = data->dsize;
98         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
99         c->new_data = NULL;
100         c->reply_data = NULL;
101         c->status = 0;
102         c->header = header;
103
104         for (fn=ctdb_db->calls;fn;fn=fn->next) {
105                 if (fn->id == call->call_id) break;
106         }
107         if (fn == NULL) {
108                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
109                 talloc_free(c);
110                 return -1;
111         }
112
113         if (fn->fn(c) != 0) {
114                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
115                 talloc_free(c);
116                 return -1;
117         }
118
119         /* we need to force the record to be written out if this was a remote access */
120         if (c->new_data == NULL) {
121                 c->new_data = &c->record_data;
122         }
123
124         if (c->new_data && updatetdb) {
125                 /* XXX check that we always have the lock here? */
126                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
127                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
128                         talloc_free(c);
129                         return -1;
130                 }
131         }
132
133         if (c->reply_data) {
134                 call->reply_data = *c->reply_data;
135
136                 talloc_steal(call, call->reply_data.dptr);
137                 talloc_set_name_const(call->reply_data.dptr, __location__);
138         } else {
139                 call->reply_data.dptr = NULL;
140                 call->reply_data.dsize = 0;
141         }
142         call->status = c->status;
143
144         talloc_free(c);
145
146         return 0;
147 }
148
149
150 /*
151   queue a packet for sending from client to daemon
152 */
153 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
156 }
157
158
159 /*
160   called when a CTDB_REPLY_CALL packet comes in in the client
161
162   This packet comes in response to a CTDB_REQ_CALL request packet. It
163   contains any reply data from the call
164 */
165 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
166 {
167         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
168         struct ctdb_client_call_state *state;
169
170         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
171         if (state == NULL) {
172                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
173                 return;
174         }
175
176         if (hdr->reqid != state->reqid) {
177                 /* we found a record  but it was the wrong one */
178                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
179                 return;
180         }
181
182         state->call->reply_data.dptr = c->data;
183         state->call->reply_data.dsize = c->datalen;
184         state->call->status = c->status;
185
186         talloc_steal(state, c);
187
188         state->state = CTDB_CALL_DONE;
189
190         if (state->async.fn) {
191                 state->async.fn(state);
192         }
193 }
194
195 void ctdb_request_message(struct ctdb_context *ctdb,
196                           struct ctdb_req_header *hdr)
197 {
198         struct ctdb_req_message_old *c = (struct ctdb_req_message_old *)hdr;
199         TDB_DATA data;
200
201         data.dsize = c->datalen;
202         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
203         if (data.dptr == NULL) {
204                 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
205                 return;
206         }
207
208         srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
209 }
210
211 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
212
213 /*
214   this is called in the client, when data comes in from the daemon
215  */
216 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
217 {
218         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
219         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
220         TALLOC_CTX *tmp_ctx;
221
222         /* place the packet as a child of a tmp_ctx. We then use
223            talloc_free() below to free it. If any of the calls want
224            to keep it, then they will steal it somewhere else, and the
225            talloc_free() will be a no-op */
226         tmp_ctx = talloc_new(ctdb);
227         talloc_steal(tmp_ctx, hdr);
228
229         if (cnt == 0) {
230                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
231                 exit(1);
232         }
233
234         if (cnt < sizeof(*hdr)) {
235                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
236                 goto done;
237         }
238         if (cnt != hdr->length) {
239                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
240                                (unsigned)hdr->length, (unsigned)cnt);
241                 goto done;
242         }
243
244         if (hdr->ctdb_magic != CTDB_MAGIC) {
245                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
246                 goto done;
247         }
248
249         if (hdr->ctdb_version != CTDB_PROTOCOL) {
250                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
251                 goto done;
252         }
253
254         switch (hdr->operation) {
255         case CTDB_REPLY_CALL:
256                 ctdb_client_reply_call(ctdb, hdr);
257                 break;
258
259         case CTDB_REQ_MESSAGE:
260                 ctdb_request_message(ctdb, hdr);
261                 break;
262
263         case CTDB_REPLY_CONTROL:
264                 ctdb_client_reply_control(ctdb, hdr);
265                 break;
266
267         default:
268                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
269         }
270
271 done:
272         talloc_free(tmp_ctx);
273 }
274
275 /*
276   connect to a unix domain socket
277 */
278 int ctdb_socket_connect(struct ctdb_context *ctdb)
279 {
280         struct sockaddr_un addr;
281
282         memset(&addr, 0, sizeof(addr));
283         addr.sun_family = AF_UNIX;
284         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
285
286         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
287         if (ctdb->daemon.sd == -1) {
288                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
289                 return -1;
290         }
291
292         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
293                 close(ctdb->daemon.sd);
294                 ctdb->daemon.sd = -1;
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 return -1;
297         }
298
299         set_nonblocking(ctdb->daemon.sd);
300         set_close_on_exec(ctdb->daemon.sd);
301         
302         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
303                                               CTDB_DS_ALIGNMENT, 
304                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
305         return 0;
306 }
307
308
309 struct ctdb_record_handle {
310         struct ctdb_db_context *ctdb_db;
311         TDB_DATA key;
312         TDB_DATA *data;
313         struct ctdb_ltdb_header header;
314 };
315
316
317 /*
318   make a recv call to the local ctdb daemon - called from client context
319
320   This is called when the program wants to wait for a ctdb_call to complete and get the 
321   results. This call will block unless the call has already completed.
322 */
323 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
324 {
325         if (state == NULL) {
326                 return -1;
327         }
328
329         while (state->state < CTDB_CALL_DONE) {
330                 tevent_loop_once(state->ctdb_db->ctdb->ev);
331         }
332         if (state->state != CTDB_CALL_DONE) {
333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
334                 talloc_free(state);
335                 return -1;
336         }
337
338         if (state->call->reply_data.dsize) {
339                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
340                                                       state->call->reply_data.dptr,
341                                                       state->call->reply_data.dsize);
342                 call->reply_data.dsize = state->call->reply_data.dsize;
343         } else {
344                 call->reply_data.dptr = NULL;
345                 call->reply_data.dsize = 0;
346         }
347         call->status = state->call->status;
348         talloc_free(state);
349
350         return call->status;
351 }
352
353
354
355
356 /*
357   destroy a ctdb_call in client
358 */
359 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
360 {
361         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
362         return 0;
363 }
364
365 /*
366   construct an event driven local ctdb_call
367
368   this is used so that locally processed ctdb_call requests are processed
369   in an event driven manner
370 */
371 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
372                                                                   struct ctdb_call *call,
373                                                                   struct ctdb_ltdb_header *header,
374                                                                   TDB_DATA *data)
375 {
376         struct ctdb_client_call_state *state;
377         struct ctdb_context *ctdb = ctdb_db->ctdb;
378         int ret;
379
380         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
381         CTDB_NO_MEMORY_NULL(ctdb, state);
382         state->call = talloc_zero(state, struct ctdb_call);
383         CTDB_NO_MEMORY_NULL(ctdb, state->call);
384
385         talloc_steal(state, data->dptr);
386
387         state->state   = CTDB_CALL_DONE;
388         *(state->call) = *call;
389         state->ctdb_db = ctdb_db;
390
391         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
392         if (ret != 0) {
393                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
394         }
395
396         return state;
397 }
398
399 /*
400   make a ctdb call to the local daemon - async send. Called from client context.
401
402   This constructs a ctdb_call request and queues it for processing. 
403   This call never blocks.
404 */
405 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
406                                               struct ctdb_call *call)
407 {
408         struct ctdb_client_call_state *state;
409         struct ctdb_context *ctdb = ctdb_db->ctdb;
410         struct ctdb_ltdb_header header;
411         TDB_DATA data;
412         int ret;
413         size_t len;
414         struct ctdb_req_call_old *c;
415
416         /* if the domain socket is not yet open, open it */
417         if (ctdb->daemon.sd==-1) {
418                 ctdb_socket_connect(ctdb);
419         }
420
421         ret = ctdb_ltdb_lock(ctdb_db, call->key);
422         if (ret != 0) {
423                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
424                 return NULL;
425         }
426
427         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
428
429         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
430                 ret = -1;
431         }
432
433         if (ret == 0 && header.dmaster == ctdb->pnn) {
434                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
435                 talloc_free(data.dptr);
436                 ctdb_ltdb_unlock(ctdb_db, call->key);
437                 return state;
438         }
439
440         ctdb_ltdb_unlock(ctdb_db, call->key);
441         talloc_free(data.dptr);
442
443         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
444         if (state == NULL) {
445                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
446                 return NULL;
447         }
448         state->call = talloc_zero(state, struct ctdb_call);
449         if (state->call == NULL) {
450                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
451                 return NULL;
452         }
453
454         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
455         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call_old);
456         if (c == NULL) {
457                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
458                 return NULL;
459         }
460
461         state->reqid     = reqid_new(ctdb->idr, state);
462         state->ctdb_db = ctdb_db;
463         talloc_set_destructor(state, ctdb_client_call_destructor);
464
465         c->hdr.reqid     = state->reqid;
466         c->flags         = call->flags;
467         c->db_id         = ctdb_db->db_id;
468         c->callid        = call->call_id;
469         c->hopcount      = 0;
470         c->keylen        = call->key.dsize;
471         c->calldatalen   = call->call_data.dsize;
472         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
473         memcpy(&c->data[call->key.dsize], 
474                call->call_data.dptr, call->call_data.dsize);
475         *(state->call)              = *call;
476         state->call->call_data.dptr = &c->data[call->key.dsize];
477         state->call->key.dptr       = &c->data[0];
478
479         state->state  = CTDB_CALL_WAIT;
480
481
482         ctdb_client_queue_pkt(ctdb, &c->hdr);
483
484         return state;
485 }
486
487
488 /*
489   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
490 */
491 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
492 {
493         struct ctdb_client_call_state *state;
494
495         state = ctdb_call_send(ctdb_db, call);
496         return ctdb_call_recv(state, call);
497 }
498
499
500 /*
501   tell the daemon what messaging srvid we will use, and register the message
502   handler function in the client
503 */
504 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
505                                     srvid_handler_fn handler,
506                                     void *private_data)
507 {
508         int res;
509         int32_t status;
510
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
512                            CTDB_CONTROL_REGISTER_SRVID, 0,
513                            tdb_null, NULL, NULL, &status, NULL, NULL);
514         if (res != 0 || status != 0) {
515                 DEBUG(DEBUG_ERR,
516                       ("Failed to register srvid %llu\n",
517                        (unsigned long long)srvid));
518                 return -1;
519         }
520
521         /* also need to register the handler with our own ctdb structure */
522         return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
523 }
524
525 /*
526   tell the daemon we no longer want a srvid
527 */
528 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
529                                        uint64_t srvid, void *private_data)
530 {
531         int res;
532         int32_t status;
533
534         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
535                            CTDB_CONTROL_DEREGISTER_SRVID, 0,
536                            tdb_null, NULL, NULL, &status, NULL, NULL);
537         if (res != 0 || status != 0) {
538                 DEBUG(DEBUG_ERR,
539                       ("Failed to deregister srvid %llu\n",
540                        (unsigned long long)srvid));
541                 return -1;
542         }
543
544         /* also need to register the handler with our own ctdb structure */
545         srvid_deregister(ctdb->srv, srvid, private_data);
546         return 0;
547 }
548
549 /*
550  * check server ids
551  */
552 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
553                                        uint8_t *result)
554 {
555         TDB_DATA indata, outdata;
556         int res;
557         int32_t status;
558         int i;
559
560         indata.dptr = (uint8_t *)ids;
561         indata.dsize = num * sizeof(*ids);
562
563         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
564                            indata, ctdb, &outdata, &status, NULL, NULL);
565         if (res != 0 || status != 0) {
566                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
567                 return -1;
568         }
569
570         if (outdata.dsize != num*sizeof(uint8_t)) {
571                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
572                                   (long unsigned int)num*sizeof(uint8_t),
573                                   outdata.dsize));
574                 talloc_free(outdata.dptr);
575                 return -1;
576         }
577
578         for (i=0; i<num; i++) {
579                 result[i] = outdata.dptr[i];
580         }
581
582         talloc_free(outdata.dptr);
583         return 0;
584 }
585
586 /*
587   send a message - from client context
588  */
589 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
590                       uint64_t srvid, TDB_DATA data)
591 {
592         struct ctdb_req_message_old *r;
593         int len, res;
594
595         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
596         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
597                                len, struct ctdb_req_message_old);
598         CTDB_NO_MEMORY(ctdb, r);
599
600         r->hdr.destnode  = pnn;
601         r->srvid         = srvid;
602         r->datalen       = data.dsize;
603         memcpy(&r->data[0], data.dptr, data.dsize);
604         
605         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
606         talloc_free(r);
607         return res;
608 }
609
610
611 /*
612   cancel a ctdb_fetch_lock operation, releasing the lock
613  */
614 static int fetch_lock_destructor(struct ctdb_record_handle *h)
615 {
616         ctdb_ltdb_unlock(h->ctdb_db, h->key);
617         return 0;
618 }
619
620 /*
621   force the migration of a record to this node
622  */
623 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
624 {
625         struct ctdb_call call;
626         ZERO_STRUCT(call);
627         call.call_id = CTDB_NULL_FUNC;
628         call.key = key;
629         call.flags = CTDB_IMMEDIATE_MIGRATION;
630         return ctdb_call(ctdb_db, &call);
631 }
632
633 /*
634   try to fetch a readonly copy of a record
635  */
636 static int
637 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
638 {
639         int ret;
640
641         struct ctdb_call call;
642         ZERO_STRUCT(call);
643
644         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
645         call.call_data.dptr = NULL;
646         call.call_data.dsize = 0;
647         call.key = key;
648         call.flags = CTDB_WANT_READONLY;
649         ret = ctdb_call(ctdb_db, &call);
650
651         if (ret != 0) {
652                 return -1;
653         }
654         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
655                 return -1;
656         }
657
658         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
659         if (*hdr == NULL) {
660                 talloc_free(call.reply_data.dptr);
661                 return -1;
662         }
663
664         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
665         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
666         if (data->dptr == NULL) {
667                 talloc_free(call.reply_data.dptr);
668                 talloc_free(hdr);
669                 return -1;
670         }
671
672         return 0;
673 }
674
675 /*
676   get a lock on a record, and return the records data. Blocks until it gets the lock
677  */
678 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
679                                            TDB_DATA key, TDB_DATA *data)
680 {
681         int ret;
682         struct ctdb_record_handle *h;
683
684         /*
685           procedure is as follows:
686
687           1) get the chain lock. 
688           2) check if we are dmaster
689           3) if we are the dmaster then return handle 
690           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
691              reply from ctdbd
692           5) when we get the reply, goto (1)
693          */
694
695         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
696         if (h == NULL) {
697                 return NULL;
698         }
699
700         h->ctdb_db = ctdb_db;
701         h->key     = key;
702         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
703         if (h->key.dptr == NULL) {
704                 talloc_free(h);
705                 return NULL;
706         }
707         h->data    = data;
708
709         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
710                  (const char *)key.dptr));
711
712 again:
713         /* step 1 - get the chain lock */
714         ret = ctdb_ltdb_lock(ctdb_db, key);
715         if (ret != 0) {
716                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
717                 talloc_free(h);
718                 return NULL;
719         }
720
721         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
722
723         talloc_set_destructor(h, fetch_lock_destructor);
724
725         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
726
727         /* when torturing, ensure we test the remote path */
728         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
729             random() % 5 == 0) {
730                 h->header.dmaster = (uint32_t)-1;
731         }
732
733
734         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
735
736         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
737                 ctdb_ltdb_unlock(ctdb_db, key);
738                 ret = ctdb_client_force_migration(ctdb_db, key);
739                 if (ret != 0) {
740                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
741                         talloc_free(h);
742                         return NULL;
743                 }
744                 goto again;
745         }
746
747         /* if this is a request for read/write and we have delegations
748            we have to revoke all delegations first
749         */
750         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
751             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
752                 ctdb_ltdb_unlock(ctdb_db, key);
753                 ret = ctdb_client_force_migration(ctdb_db, key);
754                 if (ret != 0) {
755                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
756                         talloc_free(h);
757                         return NULL;
758                 }
759                 goto again;
760         }
761
762         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
763         return h;
764 }
765
766 /*
767   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
768  */
769 struct ctdb_record_handle *
770 ctdb_fetch_readonly_lock(
771         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
772         TDB_DATA key, TDB_DATA *data,
773         int read_only)
774 {
775         int ret;
776         struct ctdb_record_handle *h;
777         struct ctdb_ltdb_header *roheader = NULL;
778
779         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
780         if (h == NULL) {
781                 return NULL;
782         }
783
784         h->ctdb_db = ctdb_db;
785         h->key     = key;
786         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
787         if (h->key.dptr == NULL) {
788                 talloc_free(h);
789                 return NULL;
790         }
791         h->data    = data;
792
793         data->dptr = NULL;
794         data->dsize = 0;
795
796
797 again:
798         talloc_free(roheader);
799         roheader = NULL;
800
801         talloc_free(data->dptr);
802         data->dptr = NULL;
803         data->dsize = 0;
804
805         /* Lock the record/chain */
806         ret = ctdb_ltdb_lock(ctdb_db, key);
807         if (ret != 0) {
808                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
809                 talloc_free(h);
810                 return NULL;
811         }
812
813         talloc_set_destructor(h, fetch_lock_destructor);
814
815         /* Check if record exists yet in the TDB */
816         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
817         if (ret != 0) {
818                 ctdb_ltdb_unlock(ctdb_db, key);
819                 ret = ctdb_client_force_migration(ctdb_db, key);
820                 if (ret != 0) {
821                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
822                         talloc_free(h);
823                         return NULL;
824                 }
825                 goto again;
826         }
827
828         /* if this is a request for read/write and we have delegations
829            we have to revoke all delegations first
830         */
831         if ((read_only == 0) 
832         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
833         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
834                 ctdb_ltdb_unlock(ctdb_db, key);
835                 ret = ctdb_client_force_migration(ctdb_db, key);
836                 if (ret != 0) {
837                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                         talloc_free(h);
839                         return NULL;
840                 }
841                 goto again;
842         }
843
844         /* if we are dmaster, just return the handle */
845         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
846                 return h;
847         }
848
849         if (read_only != 0) {
850                 TDB_DATA rodata = {NULL, 0};
851
852                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
853                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
854                         return h;
855                 }
856
857                 ctdb_ltdb_unlock(ctdb_db, key);
858                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
859                 if (ret != 0) {
860                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
861                         ret = ctdb_client_force_migration(ctdb_db, key);
862                         if (ret != 0) {
863                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
864                                 talloc_free(h);
865                                 return NULL;
866                         }
867
868                         goto again;
869                 }
870
871                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
872                         ret = ctdb_client_force_migration(ctdb_db, key);
873                         if (ret != 0) {
874                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
875                                 talloc_free(h);
876                                 return NULL;
877                         }
878
879                         goto again;
880                 }
881
882                 ret = ctdb_ltdb_lock(ctdb_db, key);
883                 if (ret != 0) {
884                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
885                         talloc_free(h);
886                         return NULL;
887                 }
888
889                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
890                 if (ret != 0) {
891                         ctdb_ltdb_unlock(ctdb_db, key);
892
893                         ret = ctdb_client_force_migration(ctdb_db, key);
894                         if (ret != 0) {
895                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
896                                 talloc_free(h);
897                                 return NULL;
898                         }
899
900                         goto again;
901                 }
902
903                 return h;
904         }
905
906         /* we are not dmaster and this was not a request for a readonly lock
907          * so unlock the record, migrate it and try again
908          */
909         ctdb_ltdb_unlock(ctdb_db, key);
910         ret = ctdb_client_force_migration(ctdb_db, key);
911         if (ret != 0) {
912                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
913                 talloc_free(h);
914                 return NULL;
915         }
916         goto again;
917 }
918
919 /*
920   store some data to the record that was locked with ctdb_fetch_lock()
921 */
922 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
923 {
924         if (h->ctdb_db->persistent) {
925                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
926                 return -1;
927         }
928
929         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
930 }
931
932 /*
933   non-locking fetch of a record
934  */
935 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
936                TDB_DATA key, TDB_DATA *data)
937 {
938         struct ctdb_call call;
939         int ret;
940
941         call.call_id = CTDB_FETCH_FUNC;
942         call.call_data.dptr = NULL;
943         call.call_data.dsize = 0;
944         call.key = key;
945
946         ret = ctdb_call(ctdb_db, &call);
947
948         if (ret == 0) {
949                 *data = call.reply_data;
950                 talloc_steal(mem_ctx, data->dptr);
951         }
952
953         return ret;
954 }
955
956
957
958 /*
959    called when a control completes or timesout to invoke the callback
960    function the user provided
961 */
962 static void invoke_control_callback(struct tevent_context *ev,
963                                     struct tevent_timer *te,
964                                     struct timeval t, void *private_data)
965 {
966         struct ctdb_client_control_state *state;
967         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
968         int ret;
969
970         state = talloc_get_type(private_data, struct ctdb_client_control_state);
971         talloc_steal(tmp_ctx, state);
972
973         ret = ctdb_control_recv(state->ctdb, state, state,
974                         NULL, 
975                         NULL, 
976                         NULL);
977         if (ret != 0) {
978                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
979         }
980
981         talloc_free(tmp_ctx);
982 }
983
984 /*
985   called when a CTDB_REPLY_CONTROL packet comes in in the client
986
987   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
988   contains any reply data from the control
989 */
990 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
991                                       struct ctdb_req_header *hdr)
992 {
993         struct ctdb_reply_control_old *c = (struct ctdb_reply_control_old *)hdr;
994         struct ctdb_client_control_state *state;
995
996         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
997         if (state == NULL) {
998                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
999                 return;
1000         }
1001
1002         if (hdr->reqid != state->reqid) {
1003                 /* we found a record  but it was the wrong one */
1004                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
1005                 return;
1006         }
1007
1008         state->outdata.dptr = c->data;
1009         state->outdata.dsize = c->datalen;
1010         state->status = c->status;
1011         if (c->errorlen) {
1012                 state->errormsg = talloc_strndup(state, 
1013                                                  (char *)&c->data[c->datalen], 
1014                                                  c->errorlen);
1015         }
1016
1017         /* state->outdata now uses resources from c so we dont want c
1018            to just dissappear from under us while state is still alive
1019         */
1020         talloc_steal(state, c);
1021
1022         state->state = CTDB_CONTROL_DONE;
1023
1024         /* if we had a callback registered for this control, pull the response
1025            and call the callback.
1026         */
1027         if (state->async.fn) {
1028                 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1029                                  invoke_control_callback, state);
1030         }
1031 }
1032
1033
1034 /*
1035   destroy a ctdb_control in client
1036 */
1037 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
1038 {
1039         reqid_remove(state->ctdb->idr, state->reqid);
1040         return 0;
1041 }
1042
1043
1044 /* time out handler for ctdb_control */
1045 static void control_timeout_func(struct tevent_context *ev,
1046                                  struct tevent_timer *te,
1047                                  struct timeval t, void *private_data)
1048 {
1049         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1050
1051         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1052                          "dstnode:%u\n", state->reqid, state->c->opcode,
1053                          state->c->hdr.destnode));
1054
1055         state->state = CTDB_CONTROL_TIMEOUT;
1056
1057         /* if we had a callback registered for this control, pull the response
1058            and call the callback.
1059         */
1060         if (state->async.fn) {
1061                 tevent_add_timer(state->ctdb->ev, state, timeval_zero(),
1062                                  invoke_control_callback, state);
1063         }
1064 }
1065
1066 /* async version of send control request */
1067 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1068                 uint32_t destnode, uint64_t srvid, 
1069                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1070                 TALLOC_CTX *mem_ctx,
1071                 struct timeval *timeout,
1072                 char **errormsg)
1073 {
1074         struct ctdb_client_control_state *state;
1075         size_t len;
1076         struct ctdb_req_control_old *c;
1077         int ret;
1078
1079         if (errormsg) {
1080                 *errormsg = NULL;
1081         }
1082
1083         /* if the domain socket is not yet open, open it */
1084         if (ctdb->daemon.sd==-1) {
1085                 ctdb_socket_connect(ctdb);
1086         }
1087
1088         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1089         CTDB_NO_MEMORY_NULL(ctdb, state);
1090
1091         state->ctdb       = ctdb;
1092         state->reqid      = reqid_new(ctdb->idr, state);
1093         state->state      = CTDB_CONTROL_WAIT;
1094         state->errormsg   = NULL;
1095
1096         talloc_set_destructor(state, ctdb_client_control_destructor);
1097
1098         len = offsetof(struct ctdb_req_control_old, data) + data.dsize;
1099         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1100                                len, struct ctdb_req_control_old);
1101         state->c            = c;        
1102         CTDB_NO_MEMORY_NULL(ctdb, c);
1103         c->hdr.reqid        = state->reqid;
1104         c->hdr.destnode     = destnode;
1105         c->opcode           = opcode;
1106         c->client_id        = 0;
1107         c->flags            = flags;
1108         c->srvid            = srvid;
1109         c->datalen          = data.dsize;
1110         if (data.dsize) {
1111                 memcpy(&c->data[0], data.dptr, data.dsize);
1112         }
1113
1114         /* timeout */
1115         if (timeout && !timeval_is_zero(timeout)) {
1116                 tevent_add_timer(ctdb->ev, state, *timeout,
1117                                  control_timeout_func, state);
1118         }
1119
1120         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1121         if (ret != 0) {
1122                 talloc_free(state);
1123                 return NULL;
1124         }
1125
1126         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1127                 talloc_free(state);
1128                 return NULL;
1129         }
1130
1131         return state;
1132 }
1133
1134
1135 /* async version of receive control reply */
1136 int ctdb_control_recv(struct ctdb_context *ctdb, 
1137                 struct ctdb_client_control_state *state, 
1138                 TALLOC_CTX *mem_ctx,
1139                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1140 {
1141         TALLOC_CTX *tmp_ctx;
1142
1143         if (status != NULL) {
1144                 *status = -1;
1145         }
1146         if (errormsg != NULL) {
1147                 *errormsg = NULL;
1148         }
1149
1150         if (state == NULL) {
1151                 return -1;
1152         }
1153
1154         /* prevent double free of state */
1155         tmp_ctx = talloc_new(ctdb);
1156         talloc_steal(tmp_ctx, state);
1157
1158         /* loop one event at a time until we either timeout or the control
1159            completes.
1160         */
1161         while (state->state == CTDB_CONTROL_WAIT) {
1162                 tevent_loop_once(ctdb->ev);
1163         }
1164
1165         if (state->state != CTDB_CONTROL_DONE) {
1166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1167                 if (state->async.fn) {
1168                         state->async.fn(state);
1169                 }
1170                 talloc_free(tmp_ctx);
1171                 return -1;
1172         }
1173
1174         if (state->errormsg) {
1175                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1176                 if (errormsg) {
1177                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1178                 }
1179                 if (state->async.fn) {
1180                         state->async.fn(state);
1181                 }
1182                 talloc_free(tmp_ctx);
1183                 return (status == 0 ? -1 : state->status);
1184         }
1185
1186         if (outdata) {
1187                 *outdata = state->outdata;
1188                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1189         }
1190
1191         if (status) {
1192                 *status = state->status;
1193         }
1194
1195         if (state->async.fn) {
1196                 state->async.fn(state);
1197         }
1198
1199         talloc_free(tmp_ctx);
1200         return 0;
1201 }
1202
1203
1204
1205 /*
1206   send a ctdb control message
1207   timeout specifies how long we should wait for a reply.
1208   if timeout is NULL we wait indefinitely
1209  */
1210 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1211                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1212                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1213                  struct timeval *timeout,
1214                  char **errormsg)
1215 {
1216         struct ctdb_client_control_state *state;
1217
1218         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1219                         flags, data, mem_ctx,
1220                         timeout, errormsg);
1221
1222         /* FIXME: Error conditions in ctdb_control_send return NULL without
1223          * setting errormsg.  So, there is no way to distinguish between sucess
1224          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1225         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1226                 if (status != NULL) {
1227                         *status = 0;
1228                 }
1229                 return 0;
1230         }
1231
1232         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1233                         errormsg);
1234 }
1235
1236
1237
1238
1239 /*
1240   a process exists call. Returns 0 if process exists, -1 otherwise
1241  */
1242 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1243 {
1244         int ret;
1245         TDB_DATA data;
1246         int32_t status;
1247
1248         data.dptr = (uint8_t*)&pid;
1249         data.dsize = sizeof(pid);
1250
1251         ret = ctdb_control(ctdb, destnode, 0, 
1252                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1253                            NULL, NULL, &status, NULL, NULL);
1254         if (ret != 0) {
1255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1256                 return -1;
1257         }
1258
1259         return status;
1260 }
1261
1262 /*
1263   get remote statistics
1264  */
1265 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1266 {
1267         int ret;
1268         TDB_DATA data;
1269         int32_t res;
1270
1271         ret = ctdb_control(ctdb, destnode, 0, 
1272                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1273                            ctdb, &data, &res, NULL, NULL);
1274         if (ret != 0 || res != 0) {
1275                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1276                 return -1;
1277         }
1278
1279         if (data.dsize != sizeof(struct ctdb_statistics)) {
1280                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1281                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1282                       return -1;
1283         }
1284
1285         *status = *(struct ctdb_statistics *)data.dptr;
1286         talloc_free(data.dptr);
1287                         
1288         return 0;
1289 }
1290
1291 /*
1292  * get db statistics
1293  */
1294 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1295                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics_old **dbstat)
1296 {
1297         int ret;
1298         TDB_DATA indata, outdata;
1299         int32_t res;
1300         struct ctdb_db_statistics_old *wire, *s;
1301         char *ptr;
1302         int i;
1303
1304         indata.dptr = (uint8_t *)&dbid;
1305         indata.dsize = sizeof(dbid);
1306
1307         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1308                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1309         if (ret != 0 || res != 0) {
1310                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1311                 return -1;
1312         }
1313
1314         if (outdata.dsize < offsetof(struct ctdb_db_statistics_old, hot_keys_wire)) {
1315                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1316                                  outdata.dsize,
1317                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1318                 return -1;
1319         }
1320
1321         s = talloc_zero(mem_ctx, struct ctdb_db_statistics_old);
1322         if (s == NULL) {
1323                 talloc_free(outdata.dptr);
1324                 CTDB_NO_MEMORY(ctdb, s);
1325         }
1326
1327         wire = (struct ctdb_db_statistics_old *)outdata.dptr;
1328         memcpy(s, wire, offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1329         ptr = &wire->hot_keys_wire[0];
1330         for (i=0; i<wire->num_hot_keys; i++) {
1331                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1332                 if (s->hot_keys[i].key.dptr == NULL) {
1333                         talloc_free(outdata.dptr);
1334                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1335                 }
1336
1337                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1338                 ptr += wire->hot_keys[i].key.dsize;
1339         }
1340
1341         talloc_free(outdata.dptr);
1342         *dbstat = s;
1343         return 0;
1344 }
1345
1346 /*
1347   shutdown a remote ctdb node
1348  */
1349 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1350 {
1351         struct ctdb_client_control_state *state;
1352
1353         state = ctdb_control_send(ctdb, destnode, 0, 
1354                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1355                            NULL, &timeout, NULL);
1356         if (state == NULL) {
1357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1358                 return -1;
1359         }
1360
1361         return 0;
1362 }
1363
1364 /*
1365   get vnn map from a remote node
1366  */
1367 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1368 {
1369         int ret;
1370         TDB_DATA outdata;
1371         int32_t res;
1372         struct ctdb_vnn_map_wire *map;
1373
1374         ret = ctdb_control(ctdb, destnode, 0, 
1375                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1376                            mem_ctx, &outdata, &res, &timeout, NULL);
1377         if (ret != 0 || res != 0) {
1378                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1379                 return -1;
1380         }
1381         
1382         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1383         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1384             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1385                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1386                 return -1;
1387         }
1388
1389         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1390         CTDB_NO_MEMORY(ctdb, *vnnmap);
1391         (*vnnmap)->generation = map->generation;
1392         (*vnnmap)->size       = map->size;
1393         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1394
1395         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1396         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1397         talloc_free(outdata.dptr);
1398                     
1399         return 0;
1400 }
1401
1402
1403 /*
1404   get the recovery mode of a remote node
1405  */
1406 struct ctdb_client_control_state *
1407 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1408 {
1409         return ctdb_control_send(ctdb, destnode, 0, 
1410                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1411                            mem_ctx, &timeout, NULL);
1412 }
1413
1414 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1415 {
1416         int ret;
1417         int32_t res;
1418
1419         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1420         if (ret != 0) {
1421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1422                 return -1;
1423         }
1424
1425         if (recmode) {
1426                 *recmode = (uint32_t)res;
1427         }
1428
1429         return 0;
1430 }
1431
1432 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1433 {
1434         struct ctdb_client_control_state *state;
1435
1436         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1437         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1438 }
1439
1440
1441
1442
1443 /*
1444   set the recovery mode of a remote node
1445  */
1446 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1447 {
1448         int ret;
1449         TDB_DATA data;
1450         int32_t res;
1451
1452         data.dsize = sizeof(uint32_t);
1453         data.dptr = (unsigned char *)&recmode;
1454
1455         ret = ctdb_control(ctdb, destnode, 0, 
1456                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1457                            NULL, NULL, &res, &timeout, NULL);
1458         if (ret != 0 || res != 0) {
1459                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1460                 return -1;
1461         }
1462
1463         return 0;
1464 }
1465
1466
1467
1468 /*
1469   get the recovery master of a remote node
1470  */
1471 struct ctdb_client_control_state *
1472 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1473                         struct timeval timeout, uint32_t destnode)
1474 {
1475         return ctdb_control_send(ctdb, destnode, 0, 
1476                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1477                            mem_ctx, &timeout, NULL);
1478 }
1479
1480 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1481 {
1482         int ret;
1483         int32_t res;
1484
1485         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1486         if (ret != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1488                 return -1;
1489         }
1490
1491         if (recmaster) {
1492                 *recmaster = (uint32_t)res;
1493         }
1494
1495         return 0;
1496 }
1497
1498 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1499 {
1500         struct ctdb_client_control_state *state;
1501
1502         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1503         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1504 }
1505
1506
1507 /*
1508   set the recovery master of a remote node
1509  */
1510 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1511 {
1512         int ret;
1513         TDB_DATA data;
1514         int32_t res;
1515
1516         ZERO_STRUCT(data);
1517         data.dsize = sizeof(uint32_t);
1518         data.dptr = (unsigned char *)&recmaster;
1519
1520         ret = ctdb_control(ctdb, destnode, 0, 
1521                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1522                            NULL, NULL, &res, &timeout, NULL);
1523         if (ret != 0 || res != 0) {
1524                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1525                 return -1;
1526         }
1527
1528         return 0;
1529 }
1530
1531
1532 /*
1533   get a list of databases off a remote node
1534  */
1535 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1536                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map_old **dbmap)
1537 {
1538         int ret;
1539         TDB_DATA outdata;
1540         int32_t res;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1544                            mem_ctx, &outdata, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1547                 return -1;
1548         }
1549
1550         *dbmap = (struct ctdb_dbid_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1551         talloc_free(outdata.dptr);
1552                     
1553         return 0;
1554 }
1555
1556 /*
1557   get a list of nodes (vnn and flags ) from a remote node
1558  */
1559 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1560                 struct timeval timeout, uint32_t destnode, 
1561                 TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1562 {
1563         int ret;
1564         TDB_DATA outdata;
1565         int32_t res;
1566
1567         ret = ctdb_control(ctdb, destnode, 0,
1568                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1569                            mem_ctx, &outdata, &res, &timeout, NULL);
1570         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1571                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1572                 return -1;
1573         }
1574
1575         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1576         talloc_free(outdata.dptr);
1577         return 0;
1578 }
1579
1580 /*
1581   load nodes file on a remote node and return as a node map
1582  */
1583 int ctdb_ctrl_getnodesfile(struct ctdb_context *ctdb,
1584                            struct timeval timeout, uint32_t destnode,
1585                            TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1586 {
1587         int ret;
1588         TDB_DATA outdata;
1589         int32_t res;
1590
1591         ret = ctdb_control(ctdb, destnode, 0,
1592                            CTDB_CONTROL_GET_NODES_FILE, 0, tdb_null,
1593                            mem_ctx, &outdata, &res, &timeout, NULL);
1594         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1596                 return -1;
1597         }
1598
1599         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1600         talloc_free(outdata.dptr);
1601
1602         return 0;
1603 }
1604
1605 /*
1606   drop the transport, reload the nodes file and restart the transport
1607  */
1608 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1609                     struct timeval timeout, uint32_t destnode)
1610 {
1611         int ret;
1612         int32_t res;
1613
1614         ret = ctdb_control(ctdb, destnode, 0, 
1615                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1616                            NULL, NULL, &res, &timeout, NULL);
1617         if (ret != 0 || res != 0) {
1618                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1619                 return -1;
1620         }
1621
1622         return 0;
1623 }
1624
1625
1626 /*
1627   set vnn map on a node
1628  */
1629 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1630                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1631 {
1632         int ret;
1633         TDB_DATA data;
1634         int32_t res;
1635         struct ctdb_vnn_map_wire *map;
1636         size_t len;
1637
1638         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1639         map = talloc_size(mem_ctx, len);
1640         CTDB_NO_MEMORY(ctdb, map);
1641
1642         map->generation = vnnmap->generation;
1643         map->size = vnnmap->size;
1644         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1645         
1646         data.dsize = len;
1647         data.dptr  = (uint8_t *)map;
1648
1649         ret = ctdb_control(ctdb, destnode, 0, 
1650                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1651                            NULL, NULL, &res, &timeout, NULL);
1652         if (ret != 0 || res != 0) {
1653                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1654                 return -1;
1655         }
1656
1657         talloc_free(map);
1658
1659         return 0;
1660 }
1661
1662
1663 /*
1664   async send for pull database
1665  */
1666 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1667         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1668         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1669 {
1670         TDB_DATA indata;
1671         struct ctdb_control_pulldb *pull;
1672         struct ctdb_client_control_state *state;
1673
1674         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1675         CTDB_NO_MEMORY_NULL(ctdb, pull);
1676
1677         pull->db_id   = dbid;
1678         pull->lmaster = lmaster;
1679
1680         indata.dsize = sizeof(struct ctdb_control_pulldb);
1681         indata.dptr  = (unsigned char *)pull;
1682
1683         state = ctdb_control_send(ctdb, destnode, 0, 
1684                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1685                                   mem_ctx, &timeout, NULL);
1686         talloc_free(pull);
1687
1688         return state;
1689 }
1690
1691 /*
1692   async recv for pull database
1693  */
1694 int ctdb_ctrl_pulldb_recv(
1695         struct ctdb_context *ctdb, 
1696         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1697         TDB_DATA *outdata)
1698 {
1699         int ret;
1700         int32_t res;
1701
1702         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1703         if ( (ret != 0) || (res != 0) ){
1704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1705                 return -1;
1706         }
1707
1708         return 0;
1709 }
1710
1711 /*
1712   pull all keys and records for a specific database on a node
1713  */
1714 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1715                 uint32_t dbid, uint32_t lmaster, 
1716                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1717                 TDB_DATA *outdata)
1718 {
1719         struct ctdb_client_control_state *state;
1720
1721         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1722                                       timeout);
1723         
1724         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1725 }
1726
1727
1728 /*
1729   change dmaster for all keys in the database to the new value
1730  */
1731 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1732                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1733 {
1734         int ret;
1735         TDB_DATA indata;
1736         int32_t res;
1737
1738         indata.dsize = 2*sizeof(uint32_t);
1739         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1740
1741         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1742         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1743
1744         ret = ctdb_control(ctdb, destnode, 0, 
1745                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1746                            NULL, NULL, &res, &timeout, NULL);
1747         if (ret != 0 || res != 0) {
1748                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1749                 return -1;
1750         }
1751
1752         return 0;
1753 }
1754
1755 /*
1756   ping a node, return number of clients connected
1757  */
1758 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1759 {
1760         int ret;
1761         int32_t res;
1762
1763         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1764                            tdb_null, NULL, NULL, &res, NULL, NULL);
1765         if (ret != 0) {
1766                 return -1;
1767         }
1768         return res;
1769 }
1770
1771 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1772                            struct timeval timeout, 
1773                            uint32_t destnode,
1774                            uint32_t *runstate)
1775 {
1776         TDB_DATA outdata;
1777         int32_t res;
1778         int ret;
1779
1780         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1781                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1782         if (ret != 0 || res != 0) {
1783                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1784                 return ret != 0 ? ret : res;
1785         }
1786
1787         if (outdata.dsize != sizeof(uint32_t)) {
1788                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1789                 talloc_free(outdata.dptr);
1790                 return -1;
1791         }
1792
1793         if (runstate != NULL) {
1794                 *runstate = *(uint32_t *)outdata.dptr;
1795         }
1796         talloc_free(outdata.dptr);
1797
1798         return 0;
1799 }
1800
1801 /*
1802   find the real path to a ltdb 
1803  */
1804 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1805                    const char **path)
1806 {
1807         int ret;
1808         int32_t res;
1809         TDB_DATA data;
1810
1811         data.dptr = (uint8_t *)&dbid;
1812         data.dsize = sizeof(dbid);
1813
1814         ret = ctdb_control(ctdb, destnode, 0, 
1815                            CTDB_CONTROL_GETDBPATH, 0, data, 
1816                            mem_ctx, &data, &res, &timeout, NULL);
1817         if (ret != 0 || res != 0) {
1818                 return -1;
1819         }
1820
1821         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1822         if ((*path) == NULL) {
1823                 return -1;
1824         }
1825
1826         talloc_free(data.dptr);
1827
1828         return 0;
1829 }
1830
1831 /*
1832   find the name of a db 
1833  */
1834 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1835                    const char **name)
1836 {
1837         int ret;
1838         int32_t res;
1839         TDB_DATA data;
1840
1841         data.dptr = (uint8_t *)&dbid;
1842         data.dsize = sizeof(dbid);
1843
1844         ret = ctdb_control(ctdb, destnode, 0, 
1845                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1846                            mem_ctx, &data, &res, &timeout, NULL);
1847         if (ret != 0 || res != 0) {
1848                 return -1;
1849         }
1850
1851         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1852         if ((*name) == NULL) {
1853                 return -1;
1854         }
1855
1856         talloc_free(data.dptr);
1857
1858         return 0;
1859 }
1860
1861 /*
1862   get the health status of a db
1863  */
1864 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1865                           struct timeval timeout,
1866                           uint32_t destnode,
1867                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1868                           const char **reason)
1869 {
1870         int ret;
1871         int32_t res;
1872         TDB_DATA data;
1873
1874         data.dptr = (uint8_t *)&dbid;
1875         data.dsize = sizeof(dbid);
1876
1877         ret = ctdb_control(ctdb, destnode, 0,
1878                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1879                            mem_ctx, &data, &res, &timeout, NULL);
1880         if (ret != 0 || res != 0) {
1881                 return -1;
1882         }
1883
1884         if (data.dsize == 0) {
1885                 (*reason) = NULL;
1886                 return 0;
1887         }
1888
1889         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1890         if ((*reason) == NULL) {
1891                 return -1;
1892         }
1893
1894         talloc_free(data.dptr);
1895
1896         return 0;
1897 }
1898
1899 /*
1900  * get db sequence number
1901  */
1902 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1903                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1904 {
1905         int ret;
1906         int32_t res;
1907         TDB_DATA data, outdata;
1908
1909         data.dptr = (uint8_t *)&dbid;
1910         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1911
1912         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1913                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1914         if (ret != 0 || res != 0) {
1915                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1916                 return -1;
1917         }
1918
1919         if (outdata.dsize != sizeof(uint64_t)) {
1920                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1921                 talloc_free(outdata.dptr);
1922                 return -1;
1923         }
1924
1925         if (seqnum != NULL) {
1926                 *seqnum = *(uint64_t *)outdata.dptr;
1927         }
1928         talloc_free(outdata.dptr);
1929
1930         return 0;
1931 }
1932
1933 /*
1934   create a database
1935  */
1936 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1937                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1938 {
1939         int ret;
1940         int32_t res;
1941         TDB_DATA data;
1942         uint64_t tdb_flags = 0;
1943
1944         data.dptr = discard_const(name);
1945         data.dsize = strlen(name)+1;
1946
1947         /* Make sure that volatile databases use jenkins hash */
1948         if (!persistent) {
1949                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1950         }
1951
1952 #ifdef TDB_MUTEX_LOCKING
1953         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1954                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
1955         }
1956 #endif
1957
1958         ret = ctdb_control(ctdb, destnode, tdb_flags,
1959                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1960                            0, data, 
1961                            mem_ctx, &data, &res, &timeout, NULL);
1962
1963         if (ret != 0 || res != 0) {
1964                 return -1;
1965         }
1966
1967         return 0;
1968 }
1969
1970 /*
1971   get debug level on a node
1972  */
1973 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1974 {
1975         int ret;
1976         int32_t res;
1977         TDB_DATA data;
1978
1979         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1980                            ctdb, &data, &res, NULL, NULL);
1981         if (ret != 0 || res != 0) {
1982                 return -1;
1983         }
1984         if (data.dsize != sizeof(int32_t)) {
1985                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1986                          (unsigned)data.dsize));
1987                 return -1;
1988         }
1989         *level = *(int32_t *)data.dptr;
1990         talloc_free(data.dptr);
1991         return 0;
1992 }
1993
1994 /*
1995   set debug level on a node
1996  */
1997 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1998 {
1999         int ret;
2000         int32_t res;
2001         TDB_DATA data;
2002
2003         data.dptr = (uint8_t *)&level;
2004         data.dsize = sizeof(level);
2005
2006         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
2007                            NULL, NULL, &res, NULL, NULL);
2008         if (ret != 0 || res != 0) {
2009                 return -1;
2010         }
2011         return 0;
2012 }
2013
2014
2015 /*
2016   get a list of connected nodes
2017  */
2018 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
2019                                 struct timeval timeout,
2020                                 TALLOC_CTX *mem_ctx,
2021                                 uint32_t *num_nodes)
2022 {
2023         struct ctdb_node_map_old *map=NULL;
2024         int ret, i;
2025         uint32_t *nodes;
2026
2027         *num_nodes = 0;
2028
2029         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2030         if (ret != 0) {
2031                 return NULL;
2032         }
2033
2034         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2035         if (nodes == NULL) {
2036                 return NULL;
2037         }
2038
2039         for (i=0;i<map->num;i++) {
2040                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2041                         nodes[*num_nodes] = map->nodes[i].pnn;
2042                         (*num_nodes)++;
2043                 }
2044         }
2045
2046         return nodes;
2047 }
2048
2049
2050 /*
2051   reset remote status
2052  */
2053 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2054 {
2055         int ret;
2056         int32_t res;
2057
2058         ret = ctdb_control(ctdb, destnode, 0, 
2059                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2060                            NULL, NULL, &res, NULL, NULL);
2061         if (ret != 0 || res != 0) {
2062                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2063                 return -1;
2064         }
2065         return 0;
2066 }
2067
2068 /*
2069   attach to a specific database - client call
2070 */
2071 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2072                                     struct timeval timeout,
2073                                     const char *name,
2074                                     bool persistent,
2075                                     uint32_t tdb_flags)
2076 {
2077         struct ctdb_db_context *ctdb_db;
2078         TDB_DATA data;
2079         int ret;
2080         int32_t res;
2081 #ifdef TDB_MUTEX_LOCKING
2082         uint32_t mutex_enabled = 0;
2083 #endif
2084
2085         ctdb_db = ctdb_db_handle(ctdb, name);
2086         if (ctdb_db) {
2087                 return ctdb_db;
2088         }
2089
2090         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2091         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2092
2093         ctdb_db->ctdb = ctdb;
2094         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2095         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2096
2097         data.dptr = discard_const(name);
2098         data.dsize = strlen(name)+1;
2099
2100         /* CTDB has switched to using jenkins hash for volatile databases.
2101          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2102          * always set it.
2103          */
2104         if (!persistent) {
2105                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2106         }
2107
2108 #ifdef TDB_MUTEX_LOCKING
2109         if (!persistent) {
2110                 ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
2111                                             CTDB_CURRENT_NODE,
2112                                             "TDBMutexEnabled",
2113                                             &mutex_enabled);
2114                 if (ret != 0) {
2115                         DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
2116                 }
2117
2118                 if (mutex_enabled == 1) {
2119                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2120                 }
2121         }
2122 #endif
2123
2124         /* tell ctdb daemon to attach */
2125         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2126                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2127                            0, data, ctdb_db, &data, &res, NULL, NULL);
2128         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2129                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2130                 talloc_free(ctdb_db);
2131                 return NULL;
2132         }
2133         
2134         ctdb_db->db_id = *(uint32_t *)data.dptr;
2135         talloc_free(data.dptr);
2136
2137         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2138         if (ret != 0) {
2139                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2140                 talloc_free(ctdb_db);
2141                 return NULL;
2142         }
2143
2144         if (persistent) {
2145                 tdb_flags = TDB_DEFAULT;
2146         } else {
2147                 tdb_flags = TDB_NOSYNC;
2148 #ifdef TDB_MUTEX_LOCKING
2149                 if (mutex_enabled) {
2150                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2151                 }
2152 #endif
2153         }
2154         if (ctdb->valgrinding) {
2155                 tdb_flags |= TDB_NOMMAP;
2156         }
2157         tdb_flags |= TDB_DISALLOW_NESTING;
2158
2159         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2160                                       O_RDWR, 0);
2161         if (ctdb_db->ltdb == NULL) {
2162                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2163                 talloc_free(ctdb_db);
2164                 return NULL;
2165         }
2166
2167         ctdb_db->persistent = persistent;
2168
2169         DLIST_ADD(ctdb->db_list, ctdb_db);
2170
2171         /* add well known functions */
2172         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2173         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2174         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2175
2176         return ctdb_db;
2177 }
2178
2179 /*
2180  * detach from a specific database - client call
2181  */
2182 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2183 {
2184         int ret;
2185         int32_t status;
2186         TDB_DATA data;
2187
2188         data.dsize = sizeof(db_id);
2189         data.dptr = (uint8_t *)&db_id;
2190
2191         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2192                            0, data, NULL, NULL, &status, NULL, NULL);
2193         if (ret != 0 || status != 0) {
2194                 return -1;
2195         }
2196         return 0;
2197 }
2198
2199 /*
2200   setup a call for a database
2201  */
2202 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2203 {
2204         struct ctdb_registered_call *call;
2205
2206         /* register locally */
2207         call = talloc(ctdb_db, struct ctdb_registered_call);
2208         call->fn = fn;
2209         call->id = id;
2210
2211         DLIST_ADD(ctdb_db->calls, call);
2212         return 0;
2213 }
2214
2215
2216 struct traverse_state {
2217         bool done;
2218         uint32_t count;
2219         ctdb_traverse_func fn;
2220         void *private_data;
2221         bool listemptyrecords;
2222 };
2223
2224 /*
2225   called on each key during a ctdb_traverse
2226  */
2227 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
2228 {
2229         struct traverse_state *state = (struct traverse_state *)p;
2230         struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
2231         TDB_DATA key;
2232
2233         if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) {
2234                 DEBUG(DEBUG_ERR, ("Bad data size %u in traverse_handler\n",
2235                                   (unsigned)data.dsize));
2236                 state->done = true;
2237                 return;
2238         }
2239
2240         key.dsize = d->keylen;
2241         key.dptr  = &d->data[0];
2242         data.dsize = d->datalen;
2243         data.dptr = &d->data[d->keylen];
2244
2245         if (key.dsize == 0 && data.dsize == 0) {
2246                 /* end of traverse */
2247                 state->done = true;
2248                 return;
2249         }
2250
2251         if (!state->listemptyrecords &&
2252             data.dsize == sizeof(struct ctdb_ltdb_header))
2253         {
2254                 /* empty records are deleted records in ctdb */
2255                 return;
2256         }
2257
2258         if (state->fn(key, data, state->private_data) != 0) {
2259                 state->done = true;
2260         }
2261
2262         state->count++;
2263 }
2264
2265 /**
2266  * start a cluster wide traverse, calling the supplied fn on each record
2267  * return the number of records traversed, or -1 on error
2268  *
2269  * Extendet variant with a flag to signal whether empty records should
2270  * be listed.
2271  */
2272 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2273                              ctdb_traverse_func fn,
2274                              bool withemptyrecords,
2275                              void *private_data)
2276 {
2277         TDB_DATA data;
2278         struct ctdb_traverse_start_ext t;
2279         int32_t status;
2280         int ret;
2281         uint64_t srvid = (getpid() | 0xFLL<<60);
2282         struct traverse_state state;
2283
2284         state.done = false;
2285         state.count = 0;
2286         state.private_data = private_data;
2287         state.fn = fn;
2288         state.listemptyrecords = withemptyrecords;
2289
2290         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2291         if (ret != 0) {
2292                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2293                 return -1;
2294         }
2295
2296         t.db_id = ctdb_db->db_id;
2297         t.srvid = srvid;
2298         t.reqid = 0;
2299         t.withemptyrecords = withemptyrecords;
2300
2301         data.dptr = (uint8_t *)&t;
2302         data.dsize = sizeof(t);
2303
2304         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2305                            data, NULL, NULL, &status, NULL, NULL);
2306         if (ret != 0 || status != 0) {
2307                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2308                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2309                 return -1;
2310         }
2311
2312         while (!state.done) {
2313                 tevent_loop_once(ctdb_db->ctdb->ev);
2314         }
2315
2316         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2317         if (ret != 0) {
2318                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2319                 return -1;
2320         }
2321
2322         return state.count;
2323 }
2324
2325 /**
2326  * start a cluster wide traverse, calling the supplied fn on each record
2327  * return the number of records traversed, or -1 on error
2328  *
2329  * Standard version which does not list the empty records:
2330  * These are considered deleted.
2331  */
2332 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2333 {
2334         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2335 }
2336
2337 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2338 /*
2339   called on each key during a catdb
2340  */
2341 int ctdb_dumpdb_record(TDB_DATA key, TDB_DATA data, void *p)
2342 {
2343         int i;
2344         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2345         FILE *f = c->f;
2346         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2347
2348         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2349         for (i=0;i<key.dsize;i++) {
2350                 if (ISASCII(key.dptr[i])) {
2351                         fprintf(f, "%c", key.dptr[i]);
2352                 } else {
2353                         fprintf(f, "\\%02X", key.dptr[i]);
2354                 }
2355         }
2356         fprintf(f, "\"\n");
2357
2358         fprintf(f, "dmaster: %u\n", h->dmaster);
2359         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2360
2361         if (c->printlmaster && c->ctdb->vnn_map != NULL) {
2362                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(c->ctdb, &key));
2363         }
2364
2365         if (c->printhash) {
2366                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2367         }
2368
2369         if (c->printrecordflags) {
2370                 fprintf(f, "flags: 0x%08x", h->flags);
2371                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2372                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2373                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2374                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2375                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2376                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2377                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2378                 fprintf(f, "\n");
2379         }
2380
2381         if (c->printdatasize) {
2382                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2383         } else {
2384                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2385                 for (i=sizeof(*h);i<data.dsize;i++) {
2386                         if (ISASCII(data.dptr[i])) {
2387                                 fprintf(f, "%c", data.dptr[i]);
2388                         } else {
2389                                 fprintf(f, "\\%02X", data.dptr[i]);
2390                         }
2391                 }
2392                 fprintf(f, "\"\n");
2393         }
2394
2395         fprintf(f, "\n");
2396
2397         return 0;
2398 }
2399
2400 /*
2401   convenience function to list all keys to stdout
2402  */
2403 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2404                  struct ctdb_dump_db_context *ctx)
2405 {
2406         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2407                                  ctx->printemptyrecords, ctx);
2408 }
2409
2410 /*
2411   get the pid of a ctdb daemon
2412  */
2413 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2414 {
2415         int ret;
2416         int32_t res;
2417
2418         ret = ctdb_control(ctdb, destnode, 0, 
2419                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2420                            NULL, NULL, &res, &timeout, NULL);
2421         if (ret != 0) {
2422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2423                 return -1;
2424         }
2425
2426         *pid = res;
2427
2428         return 0;
2429 }
2430
2431
2432 /*
2433   async freeze send control
2434  */
2435 struct ctdb_client_control_state *
2436 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2437 {
2438         return ctdb_control_send(ctdb, destnode, priority, 
2439                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2440                            mem_ctx, &timeout, NULL);
2441 }
2442
2443 /* 
2444    async freeze recv control
2445 */
2446 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2447 {
2448         int ret;
2449         int32_t res;
2450
2451         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2452         if ( (ret != 0) || (res != 0) ){
2453                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2454                 return -1;
2455         }
2456
2457         return 0;
2458 }
2459
2460 /*
2461   freeze databases of a certain priority
2462  */
2463 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2464 {
2465         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2466         struct ctdb_client_control_state *state;
2467         int ret;
2468
2469         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2470         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2471         talloc_free(tmp_ctx);
2472
2473         return ret;
2474 }
2475
2476 /* Freeze all databases */
2477 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2478 {
2479         int i;
2480
2481         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2482                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2483                         return -1;
2484                 }
2485         }
2486         return 0;
2487 }
2488
2489 /*
2490   thaw databases of a certain priority
2491  */
2492 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2493 {
2494         int ret;
2495         int32_t res;
2496
2497         ret = ctdb_control(ctdb, destnode, priority, 
2498                            CTDB_CONTROL_THAW, 0, tdb_null, 
2499                            NULL, NULL, &res, &timeout, NULL);
2500         if (ret != 0 || res != 0) {
2501                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2502                 return -1;
2503         }
2504
2505         return 0;
2506 }
2507
2508 /* thaw all databases */
2509 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2510 {
2511         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2512 }
2513
2514 /*
2515   get pnn of a node, or -1
2516  */
2517 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2518 {
2519         int ret;
2520         int32_t res;
2521
2522         ret = ctdb_control(ctdb, destnode, 0, 
2523                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2524                            NULL, NULL, &res, &timeout, NULL);
2525         if (ret != 0) {
2526                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2527                 return -1;
2528         }
2529
2530         return res;
2531 }
2532
2533 /*
2534   get the monitoring mode of a remote node
2535  */
2536 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2537 {
2538         int ret;
2539         int32_t res;
2540
2541         ret = ctdb_control(ctdb, destnode, 0, 
2542                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2543                            NULL, NULL, &res, &timeout, NULL);
2544         if (ret != 0) {
2545                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2546                 return -1;
2547         }
2548
2549         *monmode = res;
2550
2551         return 0;
2552 }
2553
2554
2555 /*
2556  set the monitoring mode of a remote node to active
2557  */
2558 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2559 {
2560         int ret;
2561         
2562
2563         ret = ctdb_control(ctdb, destnode, 0, 
2564                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2565                            NULL, NULL,NULL, &timeout, NULL);
2566         if (ret != 0) {
2567                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2568                 return -1;
2569         }
2570
2571         
2572
2573         return 0;
2574 }
2575
2576 /*
2577   set the monitoring mode of a remote node to disable
2578  */
2579 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2580 {
2581         int ret;
2582         
2583
2584         ret = ctdb_control(ctdb, destnode, 0, 
2585                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2586                            NULL, NULL, NULL, &timeout, NULL);
2587         if (ret != 0) {
2588                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2589                 return -1;
2590         }
2591
2592         
2593
2594         return 0;
2595 }
2596
2597
2598
2599 /*
2600   sent to a node to make it take over an ip address
2601 */
2602 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2603                           uint32_t destnode, struct ctdb_public_ip *ip)
2604 {
2605         TDB_DATA data;
2606         int ret;
2607         int32_t res;
2608
2609         data.dsize = sizeof(*ip);
2610         data.dptr  = (uint8_t *)ip;
2611
2612         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0,
2613                            data, NULL, NULL, &res, &timeout, NULL);
2614         if (ret != 0 || res != 0) {
2615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2616                 return -1;
2617         }
2618
2619         return 0;
2620 }
2621
2622
2623 /*
2624   sent to a node to make it release an ip address
2625 */
2626 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2627                          uint32_t destnode, struct ctdb_public_ip *ip)
2628 {
2629         TDB_DATA data;
2630         int ret;
2631         int32_t res;
2632
2633         data.dsize = sizeof(*ip);
2634         data.dptr  = (uint8_t *)ip;
2635
2636         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0,
2637                            data, NULL, NULL, &res, &timeout, NULL);
2638         if (ret != 0 || res != 0) {
2639                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2640                 return -1;
2641         }
2642
2643         return 0;
2644 }
2645
2646
2647 /*
2648   get a tunable
2649  */
2650 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2651                           struct timeval timeout, 
2652                           uint32_t destnode,
2653                           const char *name, uint32_t *value)
2654 {
2655         struct ctdb_control_get_tunable *t;
2656         TDB_DATA data, outdata;
2657         int32_t res;
2658         int ret;
2659
2660         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2661         data.dptr  = talloc_size(ctdb, data.dsize);
2662         CTDB_NO_MEMORY(ctdb, data.dptr);
2663
2664         t = (struct ctdb_control_get_tunable *)data.dptr;
2665         t->length = strlen(name)+1;
2666         memcpy(t->name, name, t->length);
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2669                            &outdata, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if (ret != 0 || res != 0) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2673                 return ret != 0 ? ret : res;
2674         }
2675
2676         if (outdata.dsize != sizeof(uint32_t)) {
2677                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2678                 talloc_free(outdata.dptr);
2679                 return -1;
2680         }
2681         
2682         *value = *(uint32_t *)outdata.dptr;
2683         talloc_free(outdata.dptr);
2684
2685         return 0;
2686 }
2687
2688 /*
2689   set a tunable
2690  */
2691 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2692                           struct timeval timeout, 
2693                           uint32_t destnode,
2694                           const char *name, uint32_t value)
2695 {
2696         struct ctdb_control_set_tunable *t;
2697         TDB_DATA data;
2698         int32_t res;
2699         int ret;
2700
2701         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2702         data.dptr  = talloc_size(ctdb, data.dsize);
2703         CTDB_NO_MEMORY(ctdb, data.dptr);
2704
2705         t = (struct ctdb_control_set_tunable *)data.dptr;
2706         t->length = strlen(name)+1;
2707         memcpy(t->name, name, t->length);
2708         t->value = value;
2709
2710         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2711                            NULL, &res, &timeout, NULL);
2712         talloc_free(data.dptr);
2713         if ((ret != 0) || (res == -1)) {
2714                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2715                 return -1;
2716         }
2717
2718         return res;
2719 }
2720
2721 /*
2722   list tunables
2723  */
2724 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2725                             struct timeval timeout, 
2726                             uint32_t destnode,
2727                             TALLOC_CTX *mem_ctx,
2728                             const char ***list, uint32_t *count)
2729 {
2730         TDB_DATA outdata;
2731         int32_t res;
2732         int ret;
2733         struct ctdb_control_list_tunable *t;
2734         char *p, *s, *ptr;
2735
2736         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2737                            mem_ctx, &outdata, &res, &timeout, NULL);
2738         if (ret != 0 || res != 0) {
2739                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2740                 return -1;
2741         }
2742
2743         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2744         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2745             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2746                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2747                 talloc_free(outdata.dptr);
2748                 return -1;              
2749         }
2750         
2751         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2752         CTDB_NO_MEMORY(ctdb, p);
2753
2754         talloc_free(outdata.dptr);
2755         
2756         (*list) = NULL;
2757         (*count) = 0;
2758
2759         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2760                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2761                 CTDB_NO_MEMORY(ctdb, *list);
2762                 (*list)[*count] = talloc_strdup(*list, s);
2763                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2764                 (*count)++;
2765         }
2766
2767         talloc_free(p);
2768
2769         return 0;
2770 }
2771
2772
2773 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2774                                    struct timeval timeout, uint32_t destnode,
2775                                    TALLOC_CTX *mem_ctx,
2776                                    uint32_t flags,
2777                                    struct ctdb_public_ip_list_old **ips)
2778 {
2779         int ret;
2780         TDB_DATA outdata;
2781         int32_t res;
2782
2783         ret = ctdb_control(ctdb, destnode, 0,
2784                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2785                            mem_ctx, &outdata, &res, &timeout, NULL);
2786         if (ret != 0 || res != 0) {
2787                 DEBUG(DEBUG_ERR,(__location__
2788                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
2789                                  ret, res));
2790                 return -1;
2791         }
2792
2793         *ips = (struct ctdb_public_ip_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2794         talloc_free(outdata.dptr);
2795
2796         return 0;
2797 }
2798
2799 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2800                              struct timeval timeout, uint32_t destnode,
2801                              TALLOC_CTX *mem_ctx,
2802                              struct ctdb_public_ip_list_old **ips)
2803 {
2804         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2805                                               destnode, mem_ctx,
2806                                               0, ips);
2807 }
2808
2809 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2810                                  struct timeval timeout, uint32_t destnode,
2811                                  TALLOC_CTX *mem_ctx,
2812                                  const ctdb_sock_addr *addr,
2813                                  struct ctdb_control_public_ip_info **_info)
2814 {
2815         int ret;
2816         TDB_DATA indata;
2817         TDB_DATA outdata;
2818         int32_t res;
2819         struct ctdb_control_public_ip_info *info;
2820         uint32_t len;
2821         uint32_t i;
2822
2823         indata.dptr = discard_const_p(uint8_t, addr);
2824         indata.dsize = sizeof(*addr);
2825
2826         ret = ctdb_control(ctdb, destnode, 0,
2827                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2828                            mem_ctx, &outdata, &res, &timeout, NULL);
2829         if (ret != 0 || res != 0) {
2830                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2831                                 "failed ret:%d res:%d\n",
2832                                 ret, res));
2833                 return -1;
2834         }
2835
2836         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2837         if (len > outdata.dsize) {
2838                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2839                                 "returned invalid data with size %u > %u\n",
2840                                 (unsigned int)outdata.dsize,
2841                                 (unsigned int)len));
2842                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2843                 return -1;
2844         }
2845
2846         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2847         len += info->num*sizeof(struct ctdb_control_iface_info);
2848
2849         if (len > outdata.dsize) {
2850                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2851                                 "returned invalid data with size %u > %u\n",
2852                                 (unsigned int)outdata.dsize,
2853                                 (unsigned int)len));
2854                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2855                 return -1;
2856         }
2857
2858         /* make sure we null terminate the returned strings */
2859         for (i=0; i < info->num; i++) {
2860                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2861         }
2862
2863         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2864                                                                 outdata.dptr,
2865                                                                 outdata.dsize);
2866         talloc_free(outdata.dptr);
2867         if (*_info == NULL) {
2868                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2869                                 "talloc_memdup size %u failed\n",
2870                                 (unsigned int)outdata.dsize));
2871                 return -1;
2872         }
2873
2874         return 0;
2875 }
2876
2877 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2878                          struct timeval timeout, uint32_t destnode,
2879                          TALLOC_CTX *mem_ctx,
2880                          struct ctdb_control_get_ifaces **_ifaces)
2881 {
2882         int ret;
2883         TDB_DATA outdata;
2884         int32_t res;
2885         struct ctdb_control_get_ifaces *ifaces;
2886         uint32_t len;
2887         uint32_t i;
2888
2889         ret = ctdb_control(ctdb, destnode, 0,
2890                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2891                            mem_ctx, &outdata, &res, &timeout, NULL);
2892         if (ret != 0 || res != 0) {
2893                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2894                                 "failed ret:%d res:%d\n",
2895                                 ret, res));
2896                 return -1;
2897         }
2898
2899         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2900         if (len > outdata.dsize) {
2901                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2902                                 "returned invalid data with size %u > %u\n",
2903                                 (unsigned int)outdata.dsize,
2904                                 (unsigned int)len));
2905                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2906                 return -1;
2907         }
2908
2909         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2910         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2911
2912         if (len > outdata.dsize) {
2913                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2914                                 "returned invalid data with size %u > %u\n",
2915                                 (unsigned int)outdata.dsize,
2916                                 (unsigned int)len));
2917                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2918                 return -1;
2919         }
2920
2921         /* make sure we null terminate the returned strings */
2922         for (i=0; i < ifaces->num; i++) {
2923                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2924         }
2925
2926         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2927                                                                   outdata.dptr,
2928                                                                   outdata.dsize);
2929         talloc_free(outdata.dptr);
2930         if (*_ifaces == NULL) {
2931                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2932                                 "talloc_memdup size %u failed\n",
2933                                 (unsigned int)outdata.dsize));
2934                 return -1;
2935         }
2936
2937         return 0;
2938 }
2939
2940 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2941                              struct timeval timeout, uint32_t destnode,
2942                              TALLOC_CTX *mem_ctx,
2943                              const struct ctdb_control_iface_info *info)
2944 {
2945         int ret;
2946         TDB_DATA indata;
2947         int32_t res;
2948
2949         indata.dptr = discard_const_p(uint8_t, info);
2950         indata.dsize = sizeof(*info);
2951
2952         ret = ctdb_control(ctdb, destnode, 0,
2953                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2954                            mem_ctx, NULL, &res, &timeout, NULL);
2955         if (ret != 0 || res != 0) {
2956                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2957                                 "failed ret:%d res:%d\n",
2958                                 ret, res));
2959                 return -1;
2960         }
2961
2962         return 0;
2963 }
2964
2965 /*
2966   set/clear the permanent disabled bit on a remote node
2967  */
2968 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2969                        uint32_t set, uint32_t clear)
2970 {
2971         int ret;
2972         TDB_DATA data;
2973         struct ctdb_node_map_old *nodemap=NULL;
2974         struct ctdb_node_flag_change c;
2975         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2976         uint32_t recmaster;
2977         uint32_t *nodes;
2978
2979
2980         /* find the recovery master */
2981         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2982         if (ret != 0) {
2983                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2984                 talloc_free(tmp_ctx);
2985                 return ret;
2986         }
2987
2988
2989         /* read the node flags from the recmaster */
2990         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2991         if (ret != 0) {
2992                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2993                 talloc_free(tmp_ctx);
2994                 return -1;
2995         }
2996         if (destnode >= nodemap->num) {
2997                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2998                 talloc_free(tmp_ctx);
2999                 return -1;
3000         }
3001
3002         c.pnn       = destnode;
3003         c.old_flags = nodemap->nodes[destnode].flags;
3004         c.new_flags = c.old_flags;
3005         c.new_flags |= set;
3006         c.new_flags &= ~clear;
3007
3008         data.dsize = sizeof(c);
3009         data.dptr = (unsigned char *)&c;
3010
3011         /* send the flags update to all connected nodes */
3012         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3013
3014         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3015                                         nodes, 0,
3016                                         timeout, false, data,
3017                                         NULL, NULL,
3018                                         NULL) != 0) {
3019                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3020
3021                 talloc_free(tmp_ctx);
3022                 return -1;
3023         }
3024
3025         talloc_free(tmp_ctx);
3026         return 0;
3027 }
3028
3029
3030 /*
3031   get all tunables
3032  */
3033 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3034                                struct timeval timeout, 
3035                                uint32_t destnode,
3036                                struct ctdb_tunable *tunables)
3037 {
3038         TDB_DATA outdata;
3039         int ret;
3040         int32_t res;
3041
3042         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3043                            &outdata, &res, &timeout, NULL);
3044         if (ret != 0 || res != 0) {
3045                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3046                 return -1;
3047         }
3048
3049         if (outdata.dsize != sizeof(*tunables)) {
3050                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3051                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3052                 return -1;              
3053         }
3054
3055         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3056         talloc_free(outdata.dptr);
3057         return 0;
3058 }
3059
3060 /*
3061   add a public address to a node
3062  */
3063 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3064                       struct timeval timeout, 
3065                       uint32_t destnode,
3066                       struct ctdb_control_ip_iface *pub)
3067 {
3068         TDB_DATA data;
3069         int32_t res;
3070         int ret;
3071
3072         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3073         data.dptr  = (unsigned char *)pub;
3074
3075         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3076                            NULL, &res, &timeout, NULL);
3077         if (ret != 0 || res != 0) {
3078                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3079                 return -1;
3080         }
3081
3082         return 0;
3083 }
3084
3085 /*
3086   delete a public address from a node
3087  */
3088 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3089                       struct timeval timeout, 
3090                       uint32_t destnode,
3091                       struct ctdb_control_ip_iface *pub)
3092 {
3093         TDB_DATA data;
3094         int32_t res;
3095         int ret;
3096
3097         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3098         data.dptr  = (unsigned char *)pub;
3099
3100         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3101                            NULL, &res, &timeout, NULL);
3102         if (ret != 0 || res != 0) {
3103                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3104                 return -1;
3105         }
3106
3107         return 0;
3108 }
3109
3110 /*
3111   kill a tcp connection
3112  */
3113 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3114                       struct timeval timeout, 
3115                       uint32_t destnode,
3116                       struct ctdb_connection *killtcp)
3117 {
3118         TDB_DATA data;
3119         int32_t res;
3120         int ret;
3121
3122         data.dsize = sizeof(struct ctdb_connection);
3123         data.dptr  = (unsigned char *)killtcp;
3124
3125         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3126                            NULL, &res, &timeout, NULL);
3127         if (ret != 0 || res != 0) {
3128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3129                 return -1;
3130         }
3131
3132         return 0;
3133 }
3134
3135 /*
3136   send a gratious arp
3137  */
3138 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3139                       struct timeval timeout, 
3140                       uint32_t destnode,
3141                       ctdb_sock_addr *addr,
3142                       const char *ifname)
3143 {
3144         TDB_DATA data;
3145         int32_t res;
3146         int ret, len;
3147         struct ctdb_control_gratious_arp *gratious_arp;
3148         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3149
3150
3151         len = strlen(ifname)+1;
3152         gratious_arp = talloc_size(tmp_ctx, 
3153                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3154         CTDB_NO_MEMORY(ctdb, gratious_arp);
3155
3156         gratious_arp->addr = *addr;
3157         gratious_arp->len = len;
3158         memcpy(&gratious_arp->iface[0], ifname, len);
3159
3160
3161         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3162         data.dptr  = (unsigned char *)gratious_arp;
3163
3164         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3165                            NULL, &res, &timeout, NULL);
3166         if (ret != 0 || res != 0) {
3167                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3168                 talloc_free(tmp_ctx);
3169                 return -1;
3170         }
3171
3172         talloc_free(tmp_ctx);
3173         return 0;
3174 }
3175
3176 /*
3177   get a list of all tcp tickles that a node knows about for a particular vnn
3178  */
3179 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3180                               struct timeval timeout, uint32_t destnode, 
3181                               TALLOC_CTX *mem_ctx, 
3182                               ctdb_sock_addr *addr,
3183                               struct ctdb_control_tcp_tickle_list **list)
3184 {
3185         int ret;
3186         TDB_DATA data, outdata;
3187         int32_t status;
3188
3189         data.dptr = (uint8_t*)addr;
3190         data.dsize = sizeof(ctdb_sock_addr);
3191
3192         ret = ctdb_control(ctdb, destnode, 0, 
3193                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3194                            mem_ctx, &outdata, &status, NULL, NULL);
3195         if (ret != 0 || status != 0) {
3196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3197                 return -1;
3198         }
3199
3200         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3201
3202         return status;
3203 }
3204
3205 /*
3206   register a server id
3207  */
3208 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
3209                                  struct timeval timeout,
3210                                  struct ctdb_client_id *id)
3211 {
3212         TDB_DATA data;
3213         int32_t res;
3214         int ret;
3215
3216         data.dsize = sizeof(struct ctdb_client_id);
3217         data.dptr  = (unsigned char *)id;
3218
3219         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3220                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3221                         0, data, NULL,
3222                         NULL, &res, &timeout, NULL);
3223         if (ret != 0 || res != 0) {
3224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3225                 return -1;
3226         }
3227
3228         return 0;
3229 }
3230
3231 /*
3232   unregister a server id
3233  */
3234 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
3235                                    struct timeval timeout,
3236                                    struct ctdb_client_id *id)
3237 {
3238         TDB_DATA data;
3239         int32_t res;
3240         int ret;
3241
3242         data.dsize = sizeof(struct ctdb_client_id);
3243         data.dptr  = (unsigned char *)id;
3244
3245         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3246                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3247                         0, data, NULL,
3248                         NULL, &res, &timeout, NULL);
3249         if (ret != 0 || res != 0) {
3250                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3251                 return -1;
3252         }
3253
3254         return 0;
3255 }
3256
3257
3258 /*
3259   check if a server id exists
3260
3261   if a server id does exist, return *status == 1, otherwise *status == 0
3262  */
3263 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
3264                               struct timeval timeout, uint32_t destnode,
3265                               struct ctdb_client_id *id, uint32_t *status)
3266 {
3267         TDB_DATA data;
3268         int32_t res;
3269         int ret;
3270
3271         data.dsize = sizeof(struct ctdb_client_id);
3272         data.dptr  = (unsigned char *)id;
3273
3274         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3275                         0, data, NULL,
3276                         NULL, &res, &timeout, NULL);
3277         if (ret != 0) {
3278                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3279                 return -1;
3280         }
3281
3282         if (res) {
3283                 *status = 1;
3284         } else {
3285                 *status = 0;
3286         }
3287
3288         return 0;
3289 }
3290
3291 /*
3292    get the list of server ids that are registered on a node
3293 */
3294 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3295                                  TALLOC_CTX *mem_ctx,
3296                                  struct timeval timeout, uint32_t destnode,
3297                                  struct ctdb_client_id_list_old **svid_list)
3298 {
3299         int ret;
3300         TDB_DATA outdata;
3301         int32_t res;
3302
3303         ret = ctdb_control(ctdb, destnode, 0, 
3304                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3305                            mem_ctx, &outdata, &res, &timeout, NULL);
3306         if (ret != 0 || res != 0) {
3307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3308                 return -1;
3309         }
3310
3311         *svid_list = (struct ctdb_client_id_list_old *)talloc_steal(mem_ctx, outdata.dptr);
3312
3313         return 0;
3314 }
3315
3316 /*
3317   initialise the ctdb daemon for client applications
3318
3319   NOTE: In current code the daemon does not fork. This is for testing purposes only
3320   and to simplify the code.
3321 */
3322 struct ctdb_context *ctdb_init(struct tevent_context *ev)
3323 {
3324         int ret;
3325         struct ctdb_context *ctdb;
3326
3327         ctdb = talloc_zero(ev, struct ctdb_context);
3328         if (ctdb == NULL) {
3329                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3330                 return NULL;
3331         }
3332         ctdb->ev  = ev;
3333         /* Wrap early to exercise code. */
3334         ret = reqid_init(ctdb, INT_MAX-200, &ctdb->idr);
3335         if (ret != 0) {
3336                 DEBUG(DEBUG_ERR, ("reqid_init failed (%s)\n", strerror(ret)));
3337                 talloc_free(ctdb);
3338                 return NULL;
3339         }
3340
3341         ret = srvid_init(ctdb, &ctdb->srv);
3342         if (ret != 0) {
3343                 DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
3344                 talloc_free(ctdb);
3345                 return NULL;
3346         }
3347
3348         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3349         if (ret != 0) {
3350                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3351                 talloc_free(ctdb);
3352                 return NULL;
3353         }
3354
3355         ctdb->statistics.statistics_start_time = timeval_current();
3356
3357         return ctdb;
3358 }
3359
3360
3361 /*
3362   set some ctdb flags
3363 */
3364 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3365 {
3366         ctdb->flags |= flags;
3367 }
3368
3369 /*
3370   setup the local socket name
3371 */
3372 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3373 {
3374         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3375         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3376
3377         return 0;
3378 }
3379
3380 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3381 {
3382         return ctdb->daemon.name;
3383 }
3384
3385 /*
3386   return the pnn of this node
3387 */
3388 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3389 {
3390         return ctdb->pnn;
3391 }
3392
3393
3394 /*
3395   get the uptime of a remote node
3396  */
3397 struct ctdb_client_control_state *
3398 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3399 {
3400         return ctdb_control_send(ctdb, destnode, 0, 
3401                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3402                            mem_ctx, &timeout, NULL);
3403 }
3404
3405 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3406 {
3407         int ret;
3408         int32_t res;
3409         TDB_DATA outdata;
3410
3411         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3412         if (ret != 0 || res != 0) {
3413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3414                 return -1;
3415         }
3416
3417         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3418
3419         return 0;
3420 }
3421
3422 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3423 {
3424         struct ctdb_client_control_state *state;
3425
3426         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3427         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3428 }
3429
3430 /*
3431   send a control to execute the "recovered" event script on a node
3432  */
3433 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3434 {
3435         int ret;
3436         int32_t status;
3437
3438         ret = ctdb_control(ctdb, destnode, 0, 
3439                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3440                            NULL, NULL, &status, &timeout, NULL);
3441         if (ret != 0 || status != 0) {
3442                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3443                 return -1;
3444         }
3445
3446         return 0;
3447 }
3448
3449 /* 
3450   callback for the async helpers used when sending the same control
3451   to multiple nodes in parallell.
3452 */
3453 static void async_callback(struct ctdb_client_control_state *state)
3454 {
3455         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3456         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3457         int ret;
3458         TDB_DATA outdata;
3459         int32_t res = -1;
3460         uint32_t destnode = state->c->hdr.destnode;
3461
3462         outdata.dsize = 0;
3463         outdata.dptr = NULL;
3464
3465         /* one more node has responded with recmode data */
3466         data->count--;
3467
3468         /* if we failed to push the db, then return an error and let
3469            the main loop try again.
3470         */
3471         if (state->state != CTDB_CONTROL_DONE) {
3472                 if ( !data->dont_log_errors) {
3473                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3474                 }
3475                 data->fail_count++;
3476                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3477                         res = -ETIME;
3478                 } else {
3479                         res = -1;
3480                 }
3481                 if (data->fail_callback) {
3482                         data->fail_callback(ctdb, destnode, res, outdata,
3483                                         data->callback_data);
3484                 }
3485                 return;
3486         }
3487         
3488         state->async.fn = NULL;
3489
3490         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3491         if ((ret != 0) || (res != 0)) {
3492                 if ( !data->dont_log_errors) {
3493                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3494                 }
3495                 data->fail_count++;
3496                 if (data->fail_callback) {
3497                         data->fail_callback(ctdb, destnode, res, outdata,
3498                                         data->callback_data);
3499                 }
3500         }
3501         if ((ret == 0) && (data->callback != NULL)) {
3502                 data->callback(ctdb, destnode, res, outdata,
3503                                         data->callback_data);
3504         }
3505 }
3506
3507
3508 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3509 {
3510         /* set up the callback functions */
3511         state->async.fn = async_callback;
3512         state->async.private_data = data;
3513         
3514         /* one more control to wait for to complete */
3515         data->count++;
3516 }
3517
3518
3519 /* wait for up to the maximum number of seconds allowed
3520    or until all nodes we expect a response from has replied
3521 */
3522 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3523 {
3524         while (data->count > 0) {
3525                 tevent_loop_once(ctdb->ev);
3526         }
3527         if (data->fail_count != 0) {
3528                 if (!data->dont_log_errors) {
3529                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3530                                  data->fail_count));
3531                 }
3532                 return -1;
3533         }
3534         return 0;
3535 }
3536
3537
3538 /* 
3539    perform a simple control on the listed nodes
3540    The control cannot return data
3541  */
3542 int ctdb_client_async_control(struct ctdb_context *ctdb,
3543                                 enum ctdb_controls opcode,
3544                                 uint32_t *nodes,
3545                                 uint64_t srvid,
3546                                 struct timeval timeout,
3547                                 bool dont_log_errors,
3548                                 TDB_DATA data,
3549                                 client_async_callback client_callback,
3550                                 client_async_callback fail_callback,
3551                                 void *callback_data)
3552 {
3553         struct client_async_data *async_data;
3554         struct ctdb_client_control_state *state;
3555         int j, num_nodes;
3556
3557         async_data = talloc_zero(ctdb, struct client_async_data);
3558         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3559         async_data->dont_log_errors = dont_log_errors;
3560         async_data->callback = client_callback;
3561         async_data->fail_callback = fail_callback;
3562         async_data->callback_data = callback_data;
3563         async_data->opcode        = opcode;
3564
3565         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3566
3567         /* loop over all nodes and send an async control to each of them */
3568         for (j=0; j<num_nodes; j++) {
3569                 uint32_t pnn = nodes[j];
3570
3571                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3572                                           0, data, async_data, &timeout, NULL);
3573                 if (state == NULL) {
3574                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3575                         talloc_free(async_data);
3576                         return -1;
3577                 }
3578                 
3579                 ctdb_client_async_add(async_data, state);
3580         }
3581
3582         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3583                 talloc_free(async_data);
3584                 return -1;
3585         }
3586
3587         talloc_free(async_data);
3588         return 0;
3589 }
3590
3591 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3592                                 struct ctdb_vnn_map *vnn_map,
3593                                 TALLOC_CTX *mem_ctx,
3594                                 bool include_self)
3595 {
3596         int i, j, num_nodes;
3597         uint32_t *nodes;
3598
3599         for (i=num_nodes=0;i<vnn_map->size;i++) {
3600                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3601                         continue;
3602                 }
3603                 num_nodes++;
3604         } 
3605
3606         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3607         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3608
3609         for (i=j=0;i<vnn_map->size;i++) {
3610                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3611                         continue;
3612                 }
3613                 nodes[j++] = vnn_map->map[i];
3614         } 
3615
3616         return nodes;
3617 }
3618
3619 /* Get list of nodes not including those with flags specified by mask.
3620  * If exclude_pnn is not -1 then exclude that pnn from the list.
3621  */
3622 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3623                         struct ctdb_node_map_old *node_map,
3624                         TALLOC_CTX *mem_ctx,
3625                         uint32_t mask,
3626                         int exclude_pnn)
3627 {
3628         int i, j, num_nodes;
3629         uint32_t *nodes;
3630
3631         for (i=num_nodes=0;i<node_map->num;i++) {
3632                 if (node_map->nodes[i].flags & mask) {
3633                         continue;
3634                 }
3635                 if (node_map->nodes[i].pnn == exclude_pnn) {
3636                         continue;
3637                 }
3638                 num_nodes++;
3639         } 
3640
3641         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3642         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3643
3644         for (i=j=0;i<node_map->num;i++) {
3645                 if (node_map->nodes[i].flags & mask) {
3646                         continue;
3647                 }
3648                 if (node_map->nodes[i].pnn == exclude_pnn) {
3649                         continue;
3650                 }
3651                 nodes[j++] = node_map->nodes[i].pnn;
3652         } 
3653
3654         return nodes;
3655 }
3656
3657 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3658                                 struct ctdb_node_map_old *node_map,
3659                                 TALLOC_CTX *mem_ctx,
3660                                 bool include_self)
3661 {
3662         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3663                              include_self ? -1 : ctdb->pnn);
3664 }
3665
3666 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3667                                 struct ctdb_node_map_old *node_map,
3668                                 TALLOC_CTX *mem_ctx,
3669                                 bool include_self)
3670 {
3671         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3672                              include_self ? -1 : ctdb->pnn);
3673 }
3674
3675 /* 
3676   this is used to test if a pnn lock exists and if it exists will return
3677   the number of connections that pnn has reported or -1 if that recovery
3678   daemon is not running.
3679 */
3680 int
3681 ctdb_read_pnn_lock(int fd, int32_t pnn)
3682 {
3683         struct flock lock;
3684         char c;
3685
3686         lock.l_type = F_WRLCK;
3687         lock.l_whence = SEEK_SET;
3688         lock.l_start = pnn;
3689         lock.l_len = 1;
3690         lock.l_pid = 0;
3691
3692         if (fcntl(fd, F_GETLK, &lock) != 0) {
3693                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3694                 return -1;
3695         }
3696
3697         if (lock.l_type == F_UNLCK) {
3698                 return -1;
3699         }
3700
3701         if (pread(fd, &c, 1, pnn) == -1) {
3702                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3703                 return -1;
3704         }
3705
3706         return c;
3707 }
3708
3709 /*
3710   get capabilities of a remote node
3711  */
3712 struct ctdb_client_control_state *
3713 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3714 {
3715         return ctdb_control_send(ctdb, destnode, 0, 
3716                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3717                            mem_ctx, &timeout, NULL);
3718 }
3719
3720 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3721 {
3722         int ret;
3723         int32_t res;
3724         TDB_DATA outdata;
3725
3726         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3727         if ( (ret != 0) || (res != 0) ) {
3728                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3729                 return -1;
3730         }
3731
3732         if (capabilities) {
3733                 *capabilities = *((uint32_t *)outdata.dptr);
3734         }
3735
3736         return 0;
3737 }
3738
3739 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3740 {
3741         struct ctdb_client_control_state *state;
3742         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3743         int ret;
3744
3745         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3746         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3747         talloc_free(tmp_ctx);
3748         return ret;
3749 }
3750
3751 static void get_capabilities_callback(struct ctdb_context *ctdb,
3752                                       uint32_t node_pnn, int32_t res,
3753                                       TDB_DATA outdata, void *callback_data)
3754 {
3755         struct ctdb_node_capabilities *caps =
3756                 talloc_get_type(callback_data,
3757                                 struct ctdb_node_capabilities);
3758
3759         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
3760                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
3761                 return;
3762         }
3763
3764         if (node_pnn >= talloc_array_length(caps)) {
3765                 DEBUG(DEBUG_ERR,
3766                       (__location__ " unexpected PNN %u\n", node_pnn));
3767                 return;
3768         }
3769
3770         caps[node_pnn].retrieved = true;
3771         caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
3772 }
3773
3774 struct ctdb_node_capabilities *
3775 ctdb_get_capabilities(struct ctdb_context *ctdb,
3776                       TALLOC_CTX *mem_ctx,
3777                       struct timeval timeout,
3778                       struct ctdb_node_map_old *nodemap)
3779 {
3780         uint32_t *nodes;
3781         uint32_t i, res;
3782         struct ctdb_node_capabilities *ret;
3783
3784         nodes = list_of_connected_nodes(ctdb, nodemap, mem_ctx, true);
3785
3786         ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
3787                            nodemap->num);
3788         CTDB_NO_MEMORY_NULL(ctdb, ret);
3789         /* Prepopulate the expected PNNs */
3790         for (i = 0; i < talloc_array_length(ret); i++) {
3791                 ret[i].retrieved = false;
3792         }
3793
3794         res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
3795                                         nodes, 0, timeout,
3796                                         false, tdb_null,
3797                                         get_capabilities_callback, NULL,
3798                                         ret);
3799         if (res != 0) {
3800                 DEBUG(DEBUG_ERR,
3801                       (__location__ " Failed to read node capabilities.\n"));
3802                 TALLOC_FREE(ret);
3803         }
3804
3805         return ret;
3806 }
3807
3808 uint32_t *
3809 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
3810                            uint32_t pnn)
3811 {
3812         if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
3813                 return &caps[pnn].capabilities;
3814         }
3815
3816         return NULL;
3817 }
3818
3819 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
3820                                 uint32_t pnn,
3821                                 uint32_t capabilities_required)
3822 {
3823         uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
3824         return (capp != NULL) &&
3825                 ((*capp & capabilities_required) == capabilities_required);
3826 }
3827
3828
3829 struct server_id {
3830         uint64_t pid;
3831         uint32_t task_id;
3832         uint32_t vnn;
3833         uint64_t unique_id;
3834 };
3835
3836 static struct server_id server_id_fetch(struct ctdb_context *ctdb, uint32_t reqid)
3837 {
3838         struct server_id id;
3839
3840         id.pid = getpid();
3841         id.task_id = reqid;
3842         id.vnn = ctdb_get_pnn(ctdb);
3843         id.unique_id = id.vnn;
3844         id.unique_id = (id.unique_id << 32) | reqid;
3845
3846         return id;
3847 }
3848
3849 /* This is basically a copy from Samba's server_id.*.  However, a
3850  * dependency chain stops us from using Samba's version, so use a
3851  * renamed copy until a better solution is found. */
3852 static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
3853 {
3854         if (id1->pid != id2->pid) {
3855                 return false;
3856         }
3857
3858         if (id1->task_id != id2->task_id) {
3859                 return false;
3860         }
3861
3862         if (id1->vnn != id2->vnn) {
3863                 return false;
3864         }
3865
3866         if (id1->unique_id != id2->unique_id) {
3867                 return false;
3868         }
3869
3870         return true;
3871 }
3872
3873 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3874 {
3875         struct ctdb_client_id sid;
3876         int ret;
3877         uint32_t result = 0;
3878
3879         sid.type = SERVER_TYPE_SAMBA;
3880         sid.pnn = id->vnn;
3881         sid.server_id = id->pid;
3882
3883         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3884                                         id->vnn, &sid, &result);
3885         if (ret != 0) {
3886                 /* If control times out, assume server_id exists. */
3887                 return true;
3888         }
3889
3890         if (result) {
3891                 return true;
3892         }
3893
3894         return false;
3895 }
3896
3897
3898 enum g_lock_type {
3899         G_LOCK_READ = 0,
3900         G_LOCK_WRITE = 1,
3901 };
3902
3903 struct g_lock_rec {
3904         enum g_lock_type type;
3905         struct server_id id;
3906 };
3907
3908 struct g_lock_recs {
3909         unsigned int num;
3910         struct g_lock_rec *lock;
3911 };
3912
3913 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3914                          struct g_lock_recs **locks)
3915 {
3916         struct g_lock_recs *recs;
3917
3918         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3919         if (recs == NULL) {
3920                 return false;
3921         }
3922
3923         if (data.dsize == 0) {
3924                 goto done;
3925         }
3926
3927         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3928                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3929                                   (unsigned long)data.dsize));
3930                 talloc_free(recs);
3931                 return false;
3932         }
3933
3934         recs->num = data.dsize / sizeof(struct g_lock_rec);
3935         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3936         if (recs->lock == NULL) {
3937                 talloc_free(recs);
3938                 return false;
3939         }
3940
3941 done:
3942         if (locks != NULL) {
3943                 *locks = recs;
3944         }
3945
3946         return true;
3947 }
3948
3949
3950 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3951                         struct ctdb_db_context *ctdb_db,
3952                         const char *keyname, uint32_t reqid)
3953 {
3954         TDB_DATA key, data;
3955         struct ctdb_record_handle *h;
3956         struct g_lock_recs *locks;
3957         struct server_id id;
3958         struct timeval t_start;
3959         int i;
3960
3961         key.dptr = (uint8_t *)discard_const(keyname);
3962         key.dsize = strlen(keyname) + 1;
3963
3964         t_start = timeval_current();
3965
3966 again:
3967         /* Keep trying for an hour. */
3968         if (timeval_elapsed(&t_start) > 3600) {
3969                 return false;
3970         }
3971
3972         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3973         if (h == NULL) {
3974                 return false;
3975         }
3976
3977         if (!g_lock_parse(h, data, &locks)) {
3978                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3979                 talloc_free(data.dptr);
3980                 talloc_free(h);
3981                 return false;
3982         }
3983
3984         talloc_free(data.dptr);
3985
3986         id = server_id_fetch(ctdb_db->ctdb, reqid);
3987
3988         i = 0;
3989         while (i < locks->num) {
3990                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
3991                         /* Internal error */
3992                         talloc_free(h);
3993                         return false;
3994                 }
3995
3996                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3997                         if (i < locks->num-1) {
3998                                 locks->lock[i] = locks->lock[locks->num-1];
3999                         }
4000                         locks->num--;
4001                         continue;
4002                 }
4003
4004                 /* This entry is locked. */
4005                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
4006                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4007                                    (unsigned long long)id.pid,
4008                                    id.task_id, id.vnn,
4009                                    (unsigned long long)id.unique_id));
4010                 talloc_free(h);
4011                 goto again;
4012         }
4013
4014         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
4015                                      locks->num+1);
4016         if (locks->lock == NULL) {
4017                 talloc_free(h);
4018                 return false;
4019         }
4020
4021         locks->lock[locks->num].type = G_LOCK_WRITE;
4022         locks->lock[locks->num].id = id;
4023         locks->num++;
4024
4025         data.dptr = (uint8_t *)locks->lock;
4026         data.dsize = locks->num * sizeof(struct g_lock_rec);
4027
4028         if (ctdb_record_store(h, data) != 0) {
4029                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
4030                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4031                                   (unsigned long long)id.pid,
4032                                   id.task_id, id.vnn,
4033                                   (unsigned long long)id.unique_id));
4034                 talloc_free(h);
4035                 return false;
4036         }
4037
4038         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
4039                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4040                            (unsigned long long)id.pid,
4041                            id.task_id, id.vnn,
4042                            (unsigned long long)id.unique_id));
4043
4044         talloc_free(h);
4045         return true;
4046 }
4047
4048 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
4049                           struct ctdb_db_context *ctdb_db,
4050                           const char *keyname, uint32_t reqid)
4051 {
4052         TDB_DATA key, data;
4053         struct ctdb_record_handle *h;
4054         struct g_lock_recs *locks;
4055         struct server_id id;
4056         int i;
4057         bool found = false;
4058
4059         key.dptr = (uint8_t *)discard_const(keyname);
4060         key.dsize = strlen(keyname) + 1;
4061         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
4062         if (h == NULL) {
4063                 return false;
4064         }
4065
4066         if (!g_lock_parse(h, data, &locks)) {
4067                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4068                 talloc_free(data.dptr);
4069                 talloc_free(h);
4070                 return false;
4071         }
4072
4073         talloc_free(data.dptr);
4074
4075         id = server_id_fetch(ctdb_db->ctdb, reqid);
4076
4077         for (i=0; i<locks->num; i++) {
4078                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
4079                         if (i < locks->num-1) {
4080                                 locks->lock[i] = locks->lock[locks->num-1];
4081                         }
4082                         locks->num--;
4083                         found = true;
4084                         break;
4085                 }
4086         }
4087
4088         if (!found) {
4089                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4090                 talloc_free(h);
4091                 return false;
4092         }
4093
4094         data.dptr = (uint8_t *)locks->lock;
4095         data.dsize = locks->num * sizeof(struct g_lock_rec);
4096
4097         if (ctdb_record_store(h, data) != 0) {
4098                 talloc_free(h);
4099                 return false;
4100         }
4101
4102         talloc_free(h);
4103         return true;
4104 }
4105
4106
4107 struct ctdb_transaction_handle {
4108         struct ctdb_db_context *ctdb_db;
4109         struct ctdb_db_context *g_lock_db;
4110         char *lock_name;
4111         uint32_t reqid;
4112         /*
4113          * we store reads and writes done under a transaction:
4114          * - one list stores both reads and writes (m_all)
4115          * - the other just writes (m_write)
4116          */
4117         struct ctdb_marshall_buffer *m_all;
4118         struct ctdb_marshall_buffer *m_write;
4119 };
4120
4121 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4122 {
4123         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4124         reqid_remove(h->ctdb_db->ctdb->idr, h->reqid);
4125         return 0;
4126 }
4127
4128
4129 /**
4130  * start a transaction on a database
4131  */
4132 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4133                                                        TALLOC_CTX *mem_ctx)
4134 {
4135         struct ctdb_transaction_handle *h;
4136         struct ctdb_client_id id;
4137
4138         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4139         if (h == NULL) {
4140                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4141                 return NULL;
4142         }
4143
4144         h->ctdb_db = ctdb_db;
4145         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4146                                        (unsigned int)ctdb_db->db_id);
4147         if (h->lock_name == NULL) {
4148                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4149                 talloc_free(h);
4150                 return NULL;
4151         }
4152
4153         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4154                                    "g_lock.tdb", false, 0);
4155         if (!h->g_lock_db) {
4156                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4157                 talloc_free(h);
4158                 return NULL;
4159         }
4160
4161         id.type = SERVER_TYPE_SAMBA;
4162         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4163         id.server_id = getpid();
4164
4165         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4166                                          &id) != 0) {
4167                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4168                 talloc_free(h);
4169                 return NULL;
4170         }
4171
4172         h->reqid = reqid_new(h->ctdb_db->ctdb->idr, h);
4173
4174         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4175                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4176                 talloc_free(h);
4177                 return NULL;
4178         }
4179
4180         talloc_set_destructor(h, ctdb_transaction_destructor);
4181         return h;
4182 }
4183
4184 /**
4185  * fetch a record inside a transaction
4186  */
4187 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4188                            TALLOC_CTX *mem_ctx,
4189                            TDB_DATA key, TDB_DATA *data)
4190 {
4191         struct ctdb_ltdb_header header;
4192         int ret;
4193
4194         ZERO_STRUCT(header);
4195
4196         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4197         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4198                 /* record doesn't exist yet */
4199                 *data = tdb_null;
4200                 ret = 0;
4201         }
4202
4203         if (ret != 0) {
4204                 return ret;
4205         }
4206
4207         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4208         if (h->m_all == NULL) {
4209                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4210                 return -1;
4211         }
4212
4213         return 0;
4214 }
4215
4216 /**
4217  * stores a record inside a transaction
4218  */
4219 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4220                            TDB_DATA key, TDB_DATA data)
4221 {
4222         TALLOC_CTX *tmp_ctx = talloc_new(h);
4223         struct ctdb_ltdb_header header;
4224         TDB_DATA olddata;
4225         int ret;
4226
4227         /* we need the header so we can update the RSN */
4228         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4229         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4230                 /* the record doesn't exist - create one with us as dmaster.
4231                    This is only safe because we are in a transaction and this
4232                    is a persistent database */
4233                 ZERO_STRUCT(header);
4234         } else if (ret != 0) {
4235                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4236                 talloc_free(tmp_ctx);
4237                 return ret;
4238         }
4239
4240         if (data.dsize == olddata.dsize &&
4241             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4242             header.rsn != 0) {
4243                 /* save writing the same data */
4244                 talloc_free(tmp_ctx);
4245                 return 0;
4246         }
4247
4248         header.dmaster = h->ctdb_db->ctdb->pnn;
4249         header.rsn++;
4250
4251         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4252         if (h->m_all == NULL) {
4253                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4254                 talloc_free(tmp_ctx);
4255                 return -1;
4256         }
4257
4258         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4259         if (h->m_write == NULL) {
4260                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4261                 talloc_free(tmp_ctx);
4262                 return -1;
4263         }
4264
4265         talloc_free(tmp_ctx);
4266         return 0;
4267 }
4268
4269 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4270 {
4271         const char *keyname = CTDB_DB_SEQNUM_KEY;
4272         TDB_DATA key, data;
4273         struct ctdb_ltdb_header header;
4274         int ret;
4275
4276         key.dptr = (uint8_t *)discard_const(keyname);
4277         key.dsize = strlen(keyname) + 1;
4278
4279         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4280         if (ret != 0) {
4281                 *seqnum = 0;
4282                 return 0;
4283         }
4284
4285         if (data.dsize == 0) {
4286                 *seqnum = 0;
4287                 return 0;
4288         }
4289
4290         if (data.dsize != sizeof(*seqnum)) {
4291                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4292                                   data.dsize));
4293                 talloc_free(data.dptr);
4294                 return -1;
4295         }
4296
4297         *seqnum = *(uint64_t *)data.dptr;
4298         talloc_free(data.dptr);
4299
4300         return 0;
4301 }
4302
4303
4304 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4305                                 uint64_t seqnum)
4306 {
4307         const char *keyname = CTDB_DB_SEQNUM_KEY;
4308         TDB_DATA key, data;
4309
4310         key.dptr = (uint8_t *)discard_const(keyname);
4311         key.dsize = strlen(keyname) + 1;
4312
4313         data.dptr = (uint8_t *)&seqnum;
4314         data.dsize = sizeof(seqnum);
4315
4316         return ctdb_transaction_store(h, key, data);
4317 }
4318
4319
4320 /**
4321  * commit a transaction
4322  */
4323 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4324 {
4325         int ret;
4326         uint64_t old_seqnum, new_seqnum;
4327         int32_t status;
4328         struct timeval timeout;
4329
4330         if (h->m_write == NULL) {
4331                 /* no changes were made */
4332                 talloc_free(h);
4333                 return 0;
4334         }
4335
4336         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4337         if (ret != 0) {
4338                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4339                 ret = -1;
4340                 goto done;
4341         }
4342
4343         new_seqnum = old_seqnum + 1;
4344         ret = ctdb_store_db_seqnum(h, new_seqnum);
4345         if (ret != 0) {
4346                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4347                 ret = -1;
4348                 goto done;
4349         }
4350
4351 again:
4352         timeout = timeval_current_ofs(3,0);
4353         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4354                            h->ctdb_db->db_id,
4355                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4356                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4357                            &status, &timeout, NULL);
4358         if (ret != 0 || status != 0) {
4359                 /*
4360                  * TRANS3_COMMIT control will only fail if recovery has been
4361                  * triggered.  Check if the database has been updated or not.
4362                  */
4363                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4364                 if (ret != 0) {
4365                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4366                         goto done;
4367                 }
4368
4369                 if (new_seqnum == old_seqnum) {
4370                         /* Database not yet updated, try again */
4371                         goto again;
4372                 }
4373
4374                 if (new_seqnum != (old_seqnum + 1)) {
4375                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4376                                           (long long unsigned)new_seqnum,
4377                                           (long long unsigned)old_seqnum));
4378                         ret = -1;
4379                         goto done;
4380                 }
4381         }
4382
4383         ret = 0;
4384
4385 done:
4386         talloc_free(h);
4387         return ret;
4388 }
4389
4390 /**
4391  * cancel a transaction
4392  */
4393 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4394 {
4395         talloc_free(h);
4396         return 0;
4397 }
4398
4399
4400 /*
4401   recovery daemon ping to main daemon
4402  */
4403 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4404 {
4405         int ret;
4406         int32_t res;
4407
4408         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4409                            ctdb, NULL, &res, NULL, NULL);
4410         if (ret != 0 || res != 0) {
4411                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4412                 return -1;
4413         }
4414
4415         return 0;
4416 }
4417
4418 /* When forking the main daemon and the child process needs to connect
4419  * back to the daemon as a client process, this function can be used
4420  * to change the ctdb context from daemon into client mode.  The child
4421  * process must be created using ctdb_fork() and not fork() -
4422  * ctdb_fork() does some necessary housekeeping.
4423  */
4424 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4425 {
4426         int ret;
4427         va_list ap;
4428
4429         /* Add extra information so we can identify this in the logs */
4430         va_start(ap, fmt);
4431         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4432         va_end(ap);
4433
4434         /* get a new event context */
4435         ctdb->ev = tevent_context_init(ctdb);
4436         tevent_loop_allow_nesting(ctdb->ev);
4437
4438         /* Connect to main CTDB daemon */
4439         ret = ctdb_socket_connect(ctdb);
4440         if (ret != 0) {
4441                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4442                 return -1;
4443         }
4444
4445         ctdb->can_send_controls = true;
4446
4447         return 0;
4448 }
4449
4450 /*
4451   get the status of running the monitor eventscripts: NULL means never run.
4452  */
4453 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
4454                               struct timeval timeout, uint32_t destnode,
4455                               TALLOC_CTX *mem_ctx,
4456                               enum ctdb_event type,
4457                               struct ctdb_script_list_old **scripts)
4458 {
4459         int ret;
4460         TDB_DATA outdata, indata;
4461         int32_t res;
4462         uint32_t uinttype = type;
4463
4464         indata.dptr = (uint8_t *)&uinttype;
4465         indata.dsize = sizeof(uinttype);
4466
4467         ret = ctdb_control(ctdb, destnode, 0, 
4468                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4469                            mem_ctx, &outdata, &res, &timeout, NULL);
4470         if (ret != 0 || res != 0) {
4471                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4472                 return -1;
4473         }
4474
4475         if (outdata.dsize == 0) {
4476                 *scripts = NULL;
4477         } else {
4478                 *scripts = (struct ctdb_script_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4479                 talloc_free(outdata.dptr);
4480         }
4481
4482         return 0;
4483 }
4484
4485 /*
4486   tell the main daemon how long it took to lock the reclock file
4487  */
4488 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4489 {
4490         int ret;
4491         int32_t res;
4492         TDB_DATA data;
4493
4494         data.dptr = (uint8_t *)&latency;
4495         data.dsize = sizeof(latency);
4496
4497         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4498                            ctdb, NULL, &res, NULL, NULL);
4499         if (ret != 0 || res != 0) {
4500                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4501                 return -1;
4502         }
4503
4504         return 0;
4505 }
4506
4507 /*
4508   get the name of the reclock file
4509  */
4510 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4511                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4512                          const char **name)
4513 {
4514         int ret;
4515         int32_t res;
4516         TDB_DATA data;
4517
4518         ret = ctdb_control(ctdb, destnode, 0, 
4519                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4520                            mem_ctx, &data, &res, &timeout, NULL);
4521         if (ret != 0 || res != 0) {
4522                 return -1;
4523         }
4524
4525         if (data.dsize == 0) {
4526                 *name = NULL;
4527         } else {
4528                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4529         }
4530         talloc_free(data.dptr);
4531
4532         return 0;
4533 }
4534
4535 /*
4536   set the reclock filename for a node
4537  */
4538 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4539 {
4540         int ret;
4541         TDB_DATA data;
4542         int32_t res;
4543
4544         if (reclock == NULL) {
4545                 data.dsize = 0;
4546                 data.dptr  = NULL;
4547         } else {
4548                 data.dsize = strlen(reclock) + 1;
4549                 data.dptr  = discard_const(reclock);
4550         }
4551
4552         ret = ctdb_control(ctdb, destnode, 0, 
4553                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4554                            NULL, NULL, &res, &timeout, NULL);
4555         if (ret != 0 || res != 0) {
4556                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4557                 return -1;
4558         }
4559
4560         return 0;
4561 }
4562
4563 /*
4564   stop a node
4565  */
4566 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4567 {
4568         int ret;
4569         int32_t res;
4570
4571         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4572                            ctdb, NULL, &res, &timeout, NULL);
4573         if (ret != 0 || res != 0) {
4574                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4575                 return -1;
4576         }
4577
4578         return 0;
4579 }
4580
4581 /*
4582   continue a node
4583  */
4584 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4585 {
4586         int ret;
4587
4588         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4589                            ctdb, NULL, NULL, &timeout, NULL);
4590         if (ret != 0) {
4591                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4592                 return -1;
4593         }
4594
4595         return 0;
4596 }
4597
4598 /*
4599   set the natgw state for a node
4600  */
4601 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4602 {
4603         int ret;
4604         TDB_DATA data;
4605         int32_t res;
4606
4607         data.dsize = sizeof(natgwstate);
4608         data.dptr  = (uint8_t *)&natgwstate;
4609
4610         ret = ctdb_control(ctdb, destnode, 0, 
4611                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4612                            NULL, NULL, &res, &timeout, NULL);
4613         if (ret != 0 || res != 0) {
4614                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4615                 return -1;
4616         }
4617
4618         return 0;
4619 }
4620
4621 /*
4622   set the lmaster role for a node
4623  */
4624 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4625 {
4626         int ret;
4627         TDB_DATA data;
4628         int32_t res;
4629
4630         data.dsize = sizeof(lmasterrole);
4631         data.dptr  = (uint8_t *)&lmasterrole;
4632
4633         ret = ctdb_control(ctdb, destnode, 0, 
4634                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4635                            NULL, NULL, &res, &timeout, NULL);
4636         if (ret != 0 || res != 0) {
4637                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4638                 return -1;
4639         }
4640
4641         return 0;
4642 }
4643
4644 /*
4645   set the recmaster role for a node
4646  */
4647 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4648 {
4649         int ret;
4650         TDB_DATA data;
4651         int32_t res;
4652
4653         data.dsize = sizeof(recmasterrole);
4654         data.dptr  = (uint8_t *)&recmasterrole;
4655
4656         ret = ctdb_control(ctdb, destnode, 0, 
4657                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4658                            NULL, NULL, &res, &timeout, NULL);
4659         if (ret != 0 || res != 0) {
4660                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4661                 return -1;
4662         }
4663
4664         return 0;
4665 }
4666
4667 /* enable an eventscript
4668  */
4669 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4670 {
4671         int ret;
4672         TDB_DATA data;
4673         int32_t res;
4674
4675         data.dsize = strlen(script) + 1;
4676         data.dptr  = discard_const(script);
4677
4678         ret = ctdb_control(ctdb, destnode, 0, 
4679                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4680                            NULL, NULL, &res, &timeout, NULL);
4681         if (ret != 0 || res != 0) {
4682                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4683                 return -1;
4684         }
4685
4686         return 0;
4687 }
4688
4689 /* disable an eventscript
4690  */
4691 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4692 {
4693         int ret;
4694         TDB_DATA data;
4695         int32_t res;
4696
4697         data.dsize = strlen(script) + 1;
4698         data.dptr  = discard_const(script);
4699
4700         ret = ctdb_control(ctdb, destnode, 0, 
4701                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4702                            NULL, NULL, &res, &timeout, NULL);
4703         if (ret != 0 || res != 0) {
4704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4705                 return -1;
4706         }
4707
4708         return 0;
4709 }
4710
4711
4712 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4713 {
4714         int ret;
4715         TDB_DATA data;
4716         int32_t res;
4717
4718         data.dsize = sizeof(*bantime);
4719         data.dptr  = (uint8_t *)bantime;
4720
4721         ret = ctdb_control(ctdb, destnode, 0, 
4722                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4723                            NULL, NULL, &res, &timeout, NULL);
4724         if (ret != 0 || res != 0) {
4725                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4726                 return -1;
4727         }
4728
4729         return 0;
4730 }
4731
4732
4733 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4734 {
4735         int ret;
4736         TDB_DATA outdata;
4737         int32_t res;
4738         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4739
4740         ret = ctdb_control(ctdb, destnode, 0, 
4741                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4742                            tmp_ctx, &outdata, &res, &timeout, NULL);
4743         if (ret != 0 || res != 0) {
4744                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4745                 talloc_free(tmp_ctx);
4746                 return -1;
4747         }
4748
4749         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4750         talloc_free(tmp_ctx);
4751
4752         return 0;
4753 }
4754
4755
4756 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4757 {
4758         int ret;
4759         int32_t res;
4760         TDB_DATA data;
4761         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4762
4763         data.dptr = (uint8_t*)db_prio;
4764         data.dsize = sizeof(*db_prio);
4765
4766         ret = ctdb_control(ctdb, destnode, 0, 
4767                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4768                            tmp_ctx, NULL, &res, &timeout, NULL);
4769         if (ret != 0 || res != 0) {
4770                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4771                 talloc_free(tmp_ctx);
4772                 return -1;
4773         }
4774
4775         talloc_free(tmp_ctx);
4776
4777         return 0;
4778 }
4779
4780 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4781 {
4782         int ret;
4783         int32_t res;
4784         TDB_DATA data;
4785         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4786
4787         data.dptr = (uint8_t*)&db_id;
4788         data.dsize = sizeof(db_id);
4789
4790         ret = ctdb_control(ctdb, destnode, 0, 
4791                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4792                            tmp_ctx, NULL, &res, &timeout, NULL);
4793         if (ret != 0 || res < 0) {
4794                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4795                 talloc_free(tmp_ctx);
4796                 return -1;
4797         }
4798
4799         if (priority) {
4800                 *priority = res;
4801         }
4802
4803         talloc_free(tmp_ctx);
4804
4805         return 0;
4806 }
4807
4808 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb,
4809                              struct timeval timeout, uint32_t destnode,
4810                              TALLOC_CTX *mem_ctx,
4811                              struct ctdb_statistics_list_old **stats)
4812 {
4813         int ret;
4814         TDB_DATA outdata;
4815         int32_t res;
4816
4817         ret = ctdb_control(ctdb, destnode, 0, 
4818                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4819                            mem_ctx, &outdata, &res, &timeout, NULL);
4820         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4821                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4822                 return -1;
4823         }
4824
4825         *stats = (struct ctdb_statistics_list_old *)talloc_memdup(mem_ctx,
4826                                                                   outdata.dptr,
4827                                                                   outdata.dsize);
4828         talloc_free(outdata.dptr);
4829
4830         return 0;
4831 }
4832
4833 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4834 {
4835         if (h == NULL) {
4836                 return NULL;
4837         }
4838
4839         return &h->header;
4840 }
4841
4842
4843 struct ctdb_client_control_state *
4844 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4845 {
4846         struct ctdb_client_control_state *handle;
4847         struct ctdb_marshall_buffer *m;
4848         struct ctdb_rec_data_old *rec;
4849         TDB_DATA outdata;
4850
4851         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4852         if (m == NULL) {
4853                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4854                 return NULL;
4855         }
4856
4857         m->db_id = ctdb_db->db_id;
4858
4859         rec = ctdb_marshall_record(m, 0, key, header, data);
4860         if (rec == NULL) {
4861                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4862                 talloc_free(m);
4863                 return NULL;
4864         }
4865         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4866         if (m == NULL) {
4867                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4868                 talloc_free(m);
4869                 return NULL;
4870         }
4871         m->count++;
4872         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4873
4874
4875         outdata.dptr = (uint8_t *)m;
4876         outdata.dsize = talloc_get_size(m);
4877
4878         handle = ctdb_control_send(ctdb, destnode, 0, 
4879                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4880                            mem_ctx, &timeout, NULL);
4881         talloc_free(m);
4882         return handle;
4883 }
4884
4885 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4886 {
4887         int ret;
4888         int32_t res;
4889
4890         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4891         if ( (ret != 0) || (res != 0) ){
4892                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4893                 return -1;
4894         }
4895
4896         return 0;
4897 }
4898
4899 int
4900 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4901 {
4902         struct ctdb_client_control_state *state;
4903
4904         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4905         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4906 }
4907
4908
4909
4910
4911
4912
4913 /*
4914   set a database to be readonly
4915  */
4916 struct ctdb_client_control_state *
4917 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4918 {
4919         TDB_DATA data;
4920
4921         data.dptr = (uint8_t *)&dbid;
4922         data.dsize = sizeof(dbid);
4923
4924         return ctdb_control_send(ctdb, destnode, 0, 
4925                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4926                            ctdb, NULL, NULL);
4927 }
4928
4929 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4930 {
4931         int ret;
4932         int32_t res;
4933
4934         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4935         if (ret != 0 || res != 0) {
4936           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4937                 return -1;
4938         }
4939
4940         return 0;
4941 }
4942
4943 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4944 {
4945         struct ctdb_client_control_state *state;
4946
4947         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4948         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4949 }
4950
4951 /*
4952   set a database to be sticky
4953  */
4954 struct ctdb_client_control_state *
4955 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4956 {
4957         TDB_DATA data;
4958
4959         data.dptr = (uint8_t *)&dbid;
4960         data.dsize = sizeof(dbid);
4961
4962         return ctdb_control_send(ctdb, destnode, 0, 
4963                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4964                            ctdb, NULL, NULL);
4965 }
4966
4967 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4968 {
4969         int ret;
4970         int32_t res;
4971
4972         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4973         if (ret != 0 || res != 0) {
4974                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4975                 return -1;
4976         }
4977
4978         return 0;
4979 }
4980
4981 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4982 {
4983         struct ctdb_client_control_state *state;
4984
4985         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4986         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4987 }