ctdb-client: ctdb_fetch_lock should check for readonly delegations
[amitay/samba.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479 {
480         int res;
481         int32_t status;
482
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514 /*
515  * check server ids
516  */
517 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
518                                        uint8_t *result)
519 {
520         TDB_DATA indata, outdata;
521         int res;
522         int32_t status;
523         int i;
524
525         indata.dptr = (uint8_t *)ids;
526         indata.dsize = num * sizeof(*ids);
527
528         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
529                            indata, ctdb, &outdata, &status, NULL, NULL);
530         if (res != 0 || status != 0) {
531                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
532                 return -1;
533         }
534
535         if (outdata.dsize != num*sizeof(uint8_t)) {
536                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
537                                   (long unsigned int)num*sizeof(uint8_t),
538                                   outdata.dsize));
539                 talloc_free(outdata.dptr);
540                 return -1;
541         }
542
543         for (i=0; i<num; i++) {
544                 result[i] = outdata.dptr[i];
545         }
546
547         talloc_free(outdata.dptr);
548         return 0;
549 }
550
551 /*
552   send a message - from client context
553  */
554 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
555                       uint64_t srvid, TDB_DATA data)
556 {
557         struct ctdb_req_message *r;
558         int len, res;
559
560         len = offsetof(struct ctdb_req_message, data) + data.dsize;
561         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
562                                len, struct ctdb_req_message);
563         CTDB_NO_MEMORY(ctdb, r);
564
565         r->hdr.destnode  = pnn;
566         r->srvid         = srvid;
567         r->datalen       = data.dsize;
568         memcpy(&r->data[0], data.dptr, data.dsize);
569         
570         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
571         talloc_free(r);
572         return res;
573 }
574
575
576 /*
577   cancel a ctdb_fetch_lock operation, releasing the lock
578  */
579 static int fetch_lock_destructor(struct ctdb_record_handle *h)
580 {
581         ctdb_ltdb_unlock(h->ctdb_db, h->key);
582         return 0;
583 }
584
585 /*
586   force the migration of a record to this node
587  */
588 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
589 {
590         struct ctdb_call call;
591         ZERO_STRUCT(call);
592         call.call_id = CTDB_NULL_FUNC;
593         call.key = key;
594         call.flags = CTDB_IMMEDIATE_MIGRATION;
595         return ctdb_call(ctdb_db, &call);
596 }
597
598 /*
599   try to fetch a readonly copy of a record
600  */
601 static int
602 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
603 {
604         int ret;
605
606         struct ctdb_call call;
607         ZERO_STRUCT(call);
608
609         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
610         call.call_data.dptr = NULL;
611         call.call_data.dsize = 0;
612         call.key = key;
613         call.flags = CTDB_WANT_READONLY;
614         ret = ctdb_call(ctdb_db, &call);
615
616         if (ret != 0) {
617                 return -1;
618         }
619         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
620                 return -1;
621         }
622
623         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
624         if (*hdr == NULL) {
625                 talloc_free(call.reply_data.dptr);
626                 return -1;
627         }
628
629         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
630         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
631         if (data->dptr == NULL) {
632                 talloc_free(call.reply_data.dptr);
633                 talloc_free(hdr);
634                 return -1;
635         }
636
637         return 0;
638 }
639
640 /*
641   get a lock on a record, and return the records data. Blocks until it gets the lock
642  */
643 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
644                                            TDB_DATA key, TDB_DATA *data)
645 {
646         int ret;
647         struct ctdb_record_handle *h;
648
649         /*
650           procedure is as follows:
651
652           1) get the chain lock. 
653           2) check if we are dmaster
654           3) if we are the dmaster then return handle 
655           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
656              reply from ctdbd
657           5) when we get the reply, goto (1)
658          */
659
660         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
661         if (h == NULL) {
662                 return NULL;
663         }
664
665         h->ctdb_db = ctdb_db;
666         h->key     = key;
667         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
668         if (h->key.dptr == NULL) {
669                 talloc_free(h);
670                 return NULL;
671         }
672         h->data    = data;
673
674         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
675                  (const char *)key.dptr));
676
677 again:
678         /* step 1 - get the chain lock */
679         ret = ctdb_ltdb_lock(ctdb_db, key);
680         if (ret != 0) {
681                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
682                 talloc_free(h);
683                 return NULL;
684         }
685
686         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
687
688         talloc_set_destructor(h, fetch_lock_destructor);
689
690         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
691
692         /* when torturing, ensure we test the remote path */
693         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
694             random() % 5 == 0) {
695                 h->header.dmaster = (uint32_t)-1;
696         }
697
698
699         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
700
701         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
702                 ctdb_ltdb_unlock(ctdb_db, key);
703                 ret = ctdb_client_force_migration(ctdb_db, key);
704                 if (ret != 0) {
705                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
706                         talloc_free(h);
707                         return NULL;
708                 }
709                 goto again;
710         }
711
712         /* if this is a request for read/write and we have delegations
713            we have to revoke all delegations first
714         */
715         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
716             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
717                 ctdb_ltdb_unlock(ctdb_db, key);
718                 ret = ctdb_client_force_migration(ctdb_db, key);
719                 if (ret != 0) {
720                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
721                         talloc_free(h);
722                         return NULL;
723                 }
724                 goto again;
725         }
726
727         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
728         return h;
729 }
730
731 /*
732   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
733  */
734 struct ctdb_record_handle *
735 ctdb_fetch_readonly_lock(
736         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
737         TDB_DATA key, TDB_DATA *data,
738         int read_only)
739 {
740         int ret;
741         struct ctdb_record_handle *h;
742         struct ctdb_ltdb_header *roheader = NULL;
743
744         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
745         if (h == NULL) {
746                 return NULL;
747         }
748
749         h->ctdb_db = ctdb_db;
750         h->key     = key;
751         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
752         if (h->key.dptr == NULL) {
753                 talloc_free(h);
754                 return NULL;
755         }
756         h->data    = data;
757
758         data->dptr = NULL;
759         data->dsize = 0;
760
761
762 again:
763         talloc_free(roheader);
764         roheader = NULL;
765
766         talloc_free(data->dptr);
767         data->dptr = NULL;
768         data->dsize = 0;
769
770         /* Lock the record/chain */
771         ret = ctdb_ltdb_lock(ctdb_db, key);
772         if (ret != 0) {
773                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
774                 talloc_free(h);
775                 return NULL;
776         }
777
778         talloc_set_destructor(h, fetch_lock_destructor);
779
780         /* Check if record exists yet in the TDB */
781         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
782         if (ret != 0) {
783                 ctdb_ltdb_unlock(ctdb_db, key);
784                 ret = ctdb_client_force_migration(ctdb_db, key);
785                 if (ret != 0) {
786                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
787                         talloc_free(h);
788                         return NULL;
789                 }
790                 goto again;
791         }
792
793         /* if this is a request for read/write and we have delegations
794            we have to revoke all delegations first
795         */
796         if ((read_only == 0) 
797         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
798         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
799                 ctdb_ltdb_unlock(ctdb_db, key);
800                 ret = ctdb_client_force_migration(ctdb_db, key);
801                 if (ret != 0) {
802                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
803                         talloc_free(h);
804                         return NULL;
805                 }
806                 goto again;
807         }
808
809         /* if we are dmaster, just return the handle */
810         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
811                 return h;
812         }
813
814         if (read_only != 0) {
815                 TDB_DATA rodata = {NULL, 0};
816
817                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
818                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
819                         return h;
820                 }
821
822                 ctdb_ltdb_unlock(ctdb_db, key);
823                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
824                 if (ret != 0) {
825                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
826                         ret = ctdb_client_force_migration(ctdb_db, key);
827                         if (ret != 0) {
828                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
829                                 talloc_free(h);
830                                 return NULL;
831                         }
832
833                         goto again;
834                 }
835
836                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
837                         ret = ctdb_client_force_migration(ctdb_db, key);
838                         if (ret != 0) {
839                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
840                                 talloc_free(h);
841                                 return NULL;
842                         }
843
844                         goto again;
845                 }
846
847                 ret = ctdb_ltdb_lock(ctdb_db, key);
848                 if (ret != 0) {
849                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
850                         talloc_free(h);
851                         return NULL;
852                 }
853
854                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
855                 if (ret != 0) {
856                         ctdb_ltdb_unlock(ctdb_db, key);
857
858                         ret = ctdb_client_force_migration(ctdb_db, key);
859                         if (ret != 0) {
860                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
861                                 talloc_free(h);
862                                 return NULL;
863                         }
864
865                         goto again;
866                 }
867
868                 return h;
869         }
870
871         /* we are not dmaster and this was not a request for a readonly lock
872          * so unlock the record, migrate it and try again
873          */
874         ctdb_ltdb_unlock(ctdb_db, key);
875         ret = ctdb_client_force_migration(ctdb_db, key);
876         if (ret != 0) {
877                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
878                 talloc_free(h);
879                 return NULL;
880         }
881         goto again;
882 }
883
884 /*
885   store some data to the record that was locked with ctdb_fetch_lock()
886 */
887 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
888 {
889         if (h->ctdb_db->persistent) {
890                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
891                 return -1;
892         }
893
894         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
895 }
896
897 /*
898   non-locking fetch of a record
899  */
900 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
901                TDB_DATA key, TDB_DATA *data)
902 {
903         struct ctdb_call call;
904         int ret;
905
906         call.call_id = CTDB_FETCH_FUNC;
907         call.call_data.dptr = NULL;
908         call.call_data.dsize = 0;
909         call.key = key;
910
911         ret = ctdb_call(ctdb_db, &call);
912
913         if (ret == 0) {
914                 *data = call.reply_data;
915                 talloc_steal(mem_ctx, data->dptr);
916         }
917
918         return ret;
919 }
920
921
922
923 /*
924    called when a control completes or timesout to invoke the callback
925    function the user provided
926 */
927 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
928         struct timeval t, void *private_data)
929 {
930         struct ctdb_client_control_state *state;
931         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
932         int ret;
933
934         state = talloc_get_type(private_data, struct ctdb_client_control_state);
935         talloc_steal(tmp_ctx, state);
936
937         ret = ctdb_control_recv(state->ctdb, state, state,
938                         NULL, 
939                         NULL, 
940                         NULL);
941         if (ret != 0) {
942                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
943         }
944
945         talloc_free(tmp_ctx);
946 }
947
948 /*
949   called when a CTDB_REPLY_CONTROL packet comes in in the client
950
951   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
952   contains any reply data from the control
953 */
954 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
955                                       struct ctdb_req_header *hdr)
956 {
957         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
958         struct ctdb_client_control_state *state;
959
960         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
961         if (state == NULL) {
962                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
963                 return;
964         }
965
966         if (hdr->reqid != state->reqid) {
967                 /* we found a record  but it was the wrong one */
968                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
969                 return;
970         }
971
972         state->outdata.dptr = c->data;
973         state->outdata.dsize = c->datalen;
974         state->status = c->status;
975         if (c->errorlen) {
976                 state->errormsg = talloc_strndup(state, 
977                                                  (char *)&c->data[c->datalen], 
978                                                  c->errorlen);
979         }
980
981         /* state->outdata now uses resources from c so we dont want c
982            to just dissappear from under us while state is still alive
983         */
984         talloc_steal(state, c);
985
986         state->state = CTDB_CONTROL_DONE;
987
988         /* if we had a callback registered for this control, pull the response
989            and call the callback.
990         */
991         if (state->async.fn) {
992                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
993         }
994 }
995
996
997 /*
998   destroy a ctdb_control in client
999 */
1000 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
1001 {
1002         ctdb_reqid_remove(state->ctdb, state->reqid);
1003         return 0;
1004 }
1005
1006
1007 /* time out handler for ctdb_control */
1008 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1009         struct timeval t, void *private_data)
1010 {
1011         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1012
1013         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1014                          "dstnode:%u\n", state->reqid, state->c->opcode,
1015                          state->c->hdr.destnode));
1016
1017         state->state = CTDB_CONTROL_TIMEOUT;
1018
1019         /* if we had a callback registered for this control, pull the response
1020            and call the callback.
1021         */
1022         if (state->async.fn) {
1023                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1024         }
1025 }
1026
1027 /* async version of send control request */
1028 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1029                 uint32_t destnode, uint64_t srvid, 
1030                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1031                 TALLOC_CTX *mem_ctx,
1032                 struct timeval *timeout,
1033                 char **errormsg)
1034 {
1035         struct ctdb_client_control_state *state;
1036         size_t len;
1037         struct ctdb_req_control *c;
1038         int ret;
1039
1040         if (errormsg) {
1041                 *errormsg = NULL;
1042         }
1043
1044         /* if the domain socket is not yet open, open it */
1045         if (ctdb->daemon.sd==-1) {
1046                 ctdb_socket_connect(ctdb);
1047         }
1048
1049         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1050         CTDB_NO_MEMORY_NULL(ctdb, state);
1051
1052         state->ctdb       = ctdb;
1053         state->reqid      = ctdb_reqid_new(ctdb, state);
1054         state->state      = CTDB_CONTROL_WAIT;
1055         state->errormsg   = NULL;
1056
1057         talloc_set_destructor(state, ctdb_client_control_destructor);
1058
1059         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1060         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1061                                len, struct ctdb_req_control);
1062         state->c            = c;        
1063         CTDB_NO_MEMORY_NULL(ctdb, c);
1064         c->hdr.reqid        = state->reqid;
1065         c->hdr.destnode     = destnode;
1066         c->opcode           = opcode;
1067         c->client_id        = 0;
1068         c->flags            = flags;
1069         c->srvid            = srvid;
1070         c->datalen          = data.dsize;
1071         if (data.dsize) {
1072                 memcpy(&c->data[0], data.dptr, data.dsize);
1073         }
1074
1075         /* timeout */
1076         if (timeout && !timeval_is_zero(timeout)) {
1077                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1078         }
1079
1080         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1081         if (ret != 0) {
1082                 talloc_free(state);
1083                 return NULL;
1084         }
1085
1086         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1087                 talloc_free(state);
1088                 return NULL;
1089         }
1090
1091         return state;
1092 }
1093
1094
1095 /* async version of receive control reply */
1096 int ctdb_control_recv(struct ctdb_context *ctdb, 
1097                 struct ctdb_client_control_state *state, 
1098                 TALLOC_CTX *mem_ctx,
1099                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1100 {
1101         TALLOC_CTX *tmp_ctx;
1102
1103         if (status != NULL) {
1104                 *status = -1;
1105         }
1106         if (errormsg != NULL) {
1107                 *errormsg = NULL;
1108         }
1109
1110         if (state == NULL) {
1111                 return -1;
1112         }
1113
1114         /* prevent double free of state */
1115         tmp_ctx = talloc_new(ctdb);
1116         talloc_steal(tmp_ctx, state);
1117
1118         /* loop one event at a time until we either timeout or the control
1119            completes.
1120         */
1121         while (state->state == CTDB_CONTROL_WAIT) {
1122                 event_loop_once(ctdb->ev);
1123         }
1124
1125         if (state->state != CTDB_CONTROL_DONE) {
1126                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1127                 if (state->async.fn) {
1128                         state->async.fn(state);
1129                 }
1130                 talloc_free(tmp_ctx);
1131                 return -1;
1132         }
1133
1134         if (state->errormsg) {
1135                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1136                 if (errormsg) {
1137                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1138                 }
1139                 if (state->async.fn) {
1140                         state->async.fn(state);
1141                 }
1142                 talloc_free(tmp_ctx);
1143                 return -1;
1144         }
1145
1146         if (outdata) {
1147                 *outdata = state->outdata;
1148                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1149         }
1150
1151         if (status) {
1152                 *status = state->status;
1153         }
1154
1155         if (state->async.fn) {
1156                 state->async.fn(state);
1157         }
1158
1159         talloc_free(tmp_ctx);
1160         return 0;
1161 }
1162
1163
1164
1165 /*
1166   send a ctdb control message
1167   timeout specifies how long we should wait for a reply.
1168   if timeout is NULL we wait indefinitely
1169  */
1170 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1171                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1172                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1173                  struct timeval *timeout,
1174                  char **errormsg)
1175 {
1176         struct ctdb_client_control_state *state;
1177
1178         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1179                         flags, data, mem_ctx,
1180                         timeout, errormsg);
1181
1182         /* FIXME: Error conditions in ctdb_control_send return NULL without
1183          * setting errormsg.  So, there is no way to distinguish between sucess
1184          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1185         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1186                 if (status != NULL) {
1187                         *status = 0;
1188                 }
1189                 return 0;
1190         }
1191
1192         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1193                         errormsg);
1194 }
1195
1196
1197
1198
1199 /*
1200   a process exists call. Returns 0 if process exists, -1 otherwise
1201  */
1202 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1203 {
1204         int ret;
1205         TDB_DATA data;
1206         int32_t status;
1207
1208         data.dptr = (uint8_t*)&pid;
1209         data.dsize = sizeof(pid);
1210
1211         ret = ctdb_control(ctdb, destnode, 0, 
1212                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1213                            NULL, NULL, &status, NULL, NULL);
1214         if (ret != 0) {
1215                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1216                 return -1;
1217         }
1218
1219         return status;
1220 }
1221
1222 /*
1223   get remote statistics
1224  */
1225 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1226 {
1227         int ret;
1228         TDB_DATA data;
1229         int32_t res;
1230
1231         ret = ctdb_control(ctdb, destnode, 0, 
1232                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1233                            ctdb, &data, &res, NULL, NULL);
1234         if (ret != 0 || res != 0) {
1235                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1236                 return -1;
1237         }
1238
1239         if (data.dsize != sizeof(struct ctdb_statistics)) {
1240                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1241                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1242                       return -1;
1243         }
1244
1245         *status = *(struct ctdb_statistics *)data.dptr;
1246         talloc_free(data.dptr);
1247                         
1248         return 0;
1249 }
1250
1251 /*
1252  * get db statistics
1253  */
1254 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1255                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1256 {
1257         int ret;
1258         TDB_DATA indata, outdata;
1259         int32_t res;
1260         struct ctdb_db_statistics *wire, *s;
1261         char *ptr;
1262         int i;
1263
1264         indata.dptr = (uint8_t *)&dbid;
1265         indata.dsize = sizeof(dbid);
1266
1267         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1268                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1269         if (ret != 0 || res != 0) {
1270                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1271                 return -1;
1272         }
1273
1274         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1275                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1276                                  outdata.dsize,
1277                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1278                 return -1;
1279         }
1280
1281         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1282         if (s == NULL) {
1283                 talloc_free(outdata.dptr);
1284                 CTDB_NO_MEMORY(ctdb, s);
1285         }
1286
1287         wire = (struct ctdb_db_statistics *)outdata.dptr;
1288         *s = *wire;
1289         ptr = &wire->hot_keys_wire[0];
1290         for (i=0; i<wire->num_hot_keys; i++) {
1291                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1292                 if (s->hot_keys[i].key.dptr == NULL) {
1293                         talloc_free(outdata.dptr);
1294                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1295                 }
1296
1297                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1298                 ptr += wire->hot_keys[i].key.dsize;
1299         }
1300
1301         talloc_free(outdata.dptr);
1302         *dbstat = s;
1303         return 0;
1304 }
1305
1306 /*
1307   shutdown a remote ctdb node
1308  */
1309 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1310 {
1311         struct ctdb_client_control_state *state;
1312
1313         state = ctdb_control_send(ctdb, destnode, 0, 
1314                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1315                            NULL, &timeout, NULL);
1316         if (state == NULL) {
1317                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1318                 return -1;
1319         }
1320
1321         return 0;
1322 }
1323
1324 /*
1325   get vnn map from a remote node
1326  */
1327 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1328 {
1329         int ret;
1330         TDB_DATA outdata;
1331         int32_t res;
1332         struct ctdb_vnn_map_wire *map;
1333
1334         ret = ctdb_control(ctdb, destnode, 0, 
1335                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1336                            mem_ctx, &outdata, &res, &timeout, NULL);
1337         if (ret != 0 || res != 0) {
1338                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1339                 return -1;
1340         }
1341         
1342         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1343         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1344             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1345                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1346                 return -1;
1347         }
1348
1349         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1350         CTDB_NO_MEMORY(ctdb, *vnnmap);
1351         (*vnnmap)->generation = map->generation;
1352         (*vnnmap)->size       = map->size;
1353         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1354
1355         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1356         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1357         talloc_free(outdata.dptr);
1358                     
1359         return 0;
1360 }
1361
1362
1363 /*
1364   get the recovery mode of a remote node
1365  */
1366 struct ctdb_client_control_state *
1367 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1368 {
1369         return ctdb_control_send(ctdb, destnode, 0, 
1370                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1371                            mem_ctx, &timeout, NULL);
1372 }
1373
1374 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1375 {
1376         int ret;
1377         int32_t res;
1378
1379         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1380         if (ret != 0) {
1381                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1382                 return -1;
1383         }
1384
1385         if (recmode) {
1386                 *recmode = (uint32_t)res;
1387         }
1388
1389         return 0;
1390 }
1391
1392 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1393 {
1394         struct ctdb_client_control_state *state;
1395
1396         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1397         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1398 }
1399
1400
1401
1402
1403 /*
1404   set the recovery mode of a remote node
1405  */
1406 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1407 {
1408         int ret;
1409         TDB_DATA data;
1410         int32_t res;
1411
1412         data.dsize = sizeof(uint32_t);
1413         data.dptr = (unsigned char *)&recmode;
1414
1415         ret = ctdb_control(ctdb, destnode, 0, 
1416                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1417                            NULL, NULL, &res, &timeout, NULL);
1418         if (ret != 0 || res != 0) {
1419                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1420                 return -1;
1421         }
1422
1423         return 0;
1424 }
1425
1426
1427
1428 /*
1429   get the recovery master of a remote node
1430  */
1431 struct ctdb_client_control_state *
1432 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1433                         struct timeval timeout, uint32_t destnode)
1434 {
1435         return ctdb_control_send(ctdb, destnode, 0, 
1436                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1437                            mem_ctx, &timeout, NULL);
1438 }
1439
1440 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1441 {
1442         int ret;
1443         int32_t res;
1444
1445         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1446         if (ret != 0) {
1447                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1448                 return -1;
1449         }
1450
1451         if (recmaster) {
1452                 *recmaster = (uint32_t)res;
1453         }
1454
1455         return 0;
1456 }
1457
1458 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1459 {
1460         struct ctdb_client_control_state *state;
1461
1462         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1463         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1464 }
1465
1466
1467 /*
1468   set the recovery master of a remote node
1469  */
1470 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1471 {
1472         int ret;
1473         TDB_DATA data;
1474         int32_t res;
1475
1476         ZERO_STRUCT(data);
1477         data.dsize = sizeof(uint32_t);
1478         data.dptr = (unsigned char *)&recmaster;
1479
1480         ret = ctdb_control(ctdb, destnode, 0, 
1481                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1482                            NULL, NULL, &res, &timeout, NULL);
1483         if (ret != 0 || res != 0) {
1484                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1485                 return -1;
1486         }
1487
1488         return 0;
1489 }
1490
1491
1492 /*
1493   get a list of databases off a remote node
1494  */
1495 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1496                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1497 {
1498         int ret;
1499         TDB_DATA outdata;
1500         int32_t res;
1501
1502         ret = ctdb_control(ctdb, destnode, 0, 
1503                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1504                            mem_ctx, &outdata, &res, &timeout, NULL);
1505         if (ret != 0 || res != 0) {
1506                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1507                 return -1;
1508         }
1509
1510         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1511         talloc_free(outdata.dptr);
1512                     
1513         return 0;
1514 }
1515
1516 /*
1517   get a list of nodes (vnn and flags ) from a remote node
1518  */
1519 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1520                 struct timeval timeout, uint32_t destnode, 
1521                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1522 {
1523         int ret;
1524         TDB_DATA outdata;
1525         int32_t res;
1526
1527         ret = ctdb_control(ctdb, destnode, 0, 
1528                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1529                            mem_ctx, &outdata, &res, &timeout, NULL);
1530         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1531                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1532                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1533         }
1534         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1535                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1536                 return -1;
1537         }
1538
1539         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1540         talloc_free(outdata.dptr);
1541                     
1542         return 0;
1543 }
1544
1545 /*
1546   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1547  */
1548 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1549                 struct timeval timeout, uint32_t destnode, 
1550                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1551 {
1552         int ret, i, len;
1553         TDB_DATA outdata;
1554         struct ctdb_node_mapv4 *nodemapv4;
1555         int32_t res;
1556
1557         ret = ctdb_control(ctdb, destnode, 0, 
1558                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1559                            mem_ctx, &outdata, &res, &timeout, NULL);
1560         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1561                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1562                 return -1;
1563         }
1564
1565         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1566
1567         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1568         (*nodemap) = talloc_zero_size(mem_ctx, len);
1569         CTDB_NO_MEMORY(ctdb, (*nodemap));
1570
1571         (*nodemap)->num = nodemapv4->num;
1572         for (i=0; i<nodemapv4->num; i++) {
1573                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1574                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1575                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1576                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1577         }
1578                 
1579         talloc_free(outdata.dptr);
1580                     
1581         return 0;
1582 }
1583
1584 /*
1585   drop the transport, reload the nodes file and restart the transport
1586  */
1587 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1588                     struct timeval timeout, uint32_t destnode)
1589 {
1590         int ret;
1591         int32_t res;
1592
1593         ret = ctdb_control(ctdb, destnode, 0, 
1594                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1595                            NULL, NULL, &res, &timeout, NULL);
1596         if (ret != 0 || res != 0) {
1597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1598                 return -1;
1599         }
1600
1601         return 0;
1602 }
1603
1604
1605 /*
1606   set vnn map on a node
1607  */
1608 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1609                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1610 {
1611         int ret;
1612         TDB_DATA data;
1613         int32_t res;
1614         struct ctdb_vnn_map_wire *map;
1615         size_t len;
1616
1617         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1618         map = talloc_size(mem_ctx, len);
1619         CTDB_NO_MEMORY(ctdb, map);
1620
1621         map->generation = vnnmap->generation;
1622         map->size = vnnmap->size;
1623         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1624         
1625         data.dsize = len;
1626         data.dptr  = (uint8_t *)map;
1627
1628         ret = ctdb_control(ctdb, destnode, 0, 
1629                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1630                            NULL, NULL, &res, &timeout, NULL);
1631         if (ret != 0 || res != 0) {
1632                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1633                 return -1;
1634         }
1635
1636         talloc_free(map);
1637
1638         return 0;
1639 }
1640
1641
1642 /*
1643   async send for pull database
1644  */
1645 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1646         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1647         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1648 {
1649         TDB_DATA indata;
1650         struct ctdb_control_pulldb *pull;
1651         struct ctdb_client_control_state *state;
1652
1653         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1654         CTDB_NO_MEMORY_NULL(ctdb, pull);
1655
1656         pull->db_id   = dbid;
1657         pull->lmaster = lmaster;
1658
1659         indata.dsize = sizeof(struct ctdb_control_pulldb);
1660         indata.dptr  = (unsigned char *)pull;
1661
1662         state = ctdb_control_send(ctdb, destnode, 0, 
1663                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1664                                   mem_ctx, &timeout, NULL);
1665         talloc_free(pull);
1666
1667         return state;
1668 }
1669
1670 /*
1671   async recv for pull database
1672  */
1673 int ctdb_ctrl_pulldb_recv(
1674         struct ctdb_context *ctdb, 
1675         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1676         TDB_DATA *outdata)
1677 {
1678         int ret;
1679         int32_t res;
1680
1681         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1682         if ( (ret != 0) || (res != 0) ){
1683                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1684                 return -1;
1685         }
1686
1687         return 0;
1688 }
1689
1690 /*
1691   pull all keys and records for a specific database on a node
1692  */
1693 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1694                 uint32_t dbid, uint32_t lmaster, 
1695                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1696                 TDB_DATA *outdata)
1697 {
1698         struct ctdb_client_control_state *state;
1699
1700         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1701                                       timeout);
1702         
1703         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1704 }
1705
1706
1707 /*
1708   change dmaster for all keys in the database to the new value
1709  */
1710 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1711                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1712 {
1713         int ret;
1714         TDB_DATA indata;
1715         int32_t res;
1716
1717         indata.dsize = 2*sizeof(uint32_t);
1718         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1719
1720         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1721         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1722
1723         ret = ctdb_control(ctdb, destnode, 0, 
1724                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1725                            NULL, NULL, &res, &timeout, NULL);
1726         if (ret != 0 || res != 0) {
1727                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1728                 return -1;
1729         }
1730
1731         return 0;
1732 }
1733
1734 /*
1735   ping a node, return number of clients connected
1736  */
1737 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1738 {
1739         int ret;
1740         int32_t res;
1741
1742         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1743                            tdb_null, NULL, NULL, &res, NULL, NULL);
1744         if (ret != 0) {
1745                 return -1;
1746         }
1747         return res;
1748 }
1749
1750 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1751                            struct timeval timeout, 
1752                            uint32_t destnode,
1753                            uint32_t *runstate)
1754 {
1755         TDB_DATA outdata;
1756         int32_t res;
1757         int ret;
1758
1759         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1760                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1761         if (ret != 0 || res != 0) {
1762                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1763                 return ret != 0 ? ret : res;
1764         }
1765
1766         if (outdata.dsize != sizeof(uint32_t)) {
1767                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1768                 talloc_free(outdata.dptr);
1769                 return -1;
1770         }
1771
1772         if (runstate != NULL) {
1773                 *runstate = *(uint32_t *)outdata.dptr;
1774         }
1775         talloc_free(outdata.dptr);
1776
1777         return 0;
1778 }
1779
1780 /*
1781   find the real path to a ltdb 
1782  */
1783 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1784                    const char **path)
1785 {
1786         int ret;
1787         int32_t res;
1788         TDB_DATA data;
1789
1790         data.dptr = (uint8_t *)&dbid;
1791         data.dsize = sizeof(dbid);
1792
1793         ret = ctdb_control(ctdb, destnode, 0, 
1794                            CTDB_CONTROL_GETDBPATH, 0, data, 
1795                            mem_ctx, &data, &res, &timeout, NULL);
1796         if (ret != 0 || res != 0) {
1797                 return -1;
1798         }
1799
1800         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1801         if ((*path) == NULL) {
1802                 return -1;
1803         }
1804
1805         talloc_free(data.dptr);
1806
1807         return 0;
1808 }
1809
1810 /*
1811   find the name of a db 
1812  */
1813 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1814                    const char **name)
1815 {
1816         int ret;
1817         int32_t res;
1818         TDB_DATA data;
1819
1820         data.dptr = (uint8_t *)&dbid;
1821         data.dsize = sizeof(dbid);
1822
1823         ret = ctdb_control(ctdb, destnode, 0, 
1824                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1825                            mem_ctx, &data, &res, &timeout, NULL);
1826         if (ret != 0 || res != 0) {
1827                 return -1;
1828         }
1829
1830         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1831         if ((*name) == NULL) {
1832                 return -1;
1833         }
1834
1835         talloc_free(data.dptr);
1836
1837         return 0;
1838 }
1839
1840 /*
1841   get the health status of a db
1842  */
1843 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1844                           struct timeval timeout,
1845                           uint32_t destnode,
1846                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1847                           const char **reason)
1848 {
1849         int ret;
1850         int32_t res;
1851         TDB_DATA data;
1852
1853         data.dptr = (uint8_t *)&dbid;
1854         data.dsize = sizeof(dbid);
1855
1856         ret = ctdb_control(ctdb, destnode, 0,
1857                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1858                            mem_ctx, &data, &res, &timeout, NULL);
1859         if (ret != 0 || res != 0) {
1860                 return -1;
1861         }
1862
1863         if (data.dsize == 0) {
1864                 (*reason) = NULL;
1865                 return 0;
1866         }
1867
1868         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1869         if ((*reason) == NULL) {
1870                 return -1;
1871         }
1872
1873         talloc_free(data.dptr);
1874
1875         return 0;
1876 }
1877
1878 /*
1879  * get db sequence number
1880  */
1881 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1882                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1883 {
1884         int ret;
1885         int32_t res;
1886         TDB_DATA data, outdata;
1887
1888         data.dptr = (uint8_t *)&dbid;
1889         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1890
1891         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1892                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1893         if (ret != 0 || res != 0) {
1894                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1895                 return -1;
1896         }
1897
1898         if (outdata.dsize != sizeof(uint64_t)) {
1899                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1900                 talloc_free(outdata.dptr);
1901                 return -1;
1902         }
1903
1904         if (seqnum != NULL) {
1905                 *seqnum = *(uint64_t *)outdata.dptr;
1906         }
1907         talloc_free(outdata.dptr);
1908
1909         return 0;
1910 }
1911
1912 /*
1913   create a database
1914  */
1915 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1916                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1917 {
1918         int ret;
1919         int32_t res;
1920         TDB_DATA data;
1921         uint64_t tdb_flags = 0;
1922
1923         data.dptr = discard_const(name);
1924         data.dsize = strlen(name)+1;
1925
1926         /* Make sure that volatile databases use jenkins hash */
1927         if (!persistent) {
1928                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1929         }
1930
1931         ret = ctdb_control(ctdb, destnode, tdb_flags,
1932                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1933                            0, data, 
1934                            mem_ctx, &data, &res, &timeout, NULL);
1935
1936         if (ret != 0 || res != 0) {
1937                 return -1;
1938         }
1939
1940         return 0;
1941 }
1942
1943 /*
1944   get debug level on a node
1945  */
1946 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1947 {
1948         int ret;
1949         int32_t res;
1950         TDB_DATA data;
1951
1952         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1953                            ctdb, &data, &res, NULL, NULL);
1954         if (ret != 0 || res != 0) {
1955                 return -1;
1956         }
1957         if (data.dsize != sizeof(int32_t)) {
1958                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1959                          (unsigned)data.dsize));
1960                 return -1;
1961         }
1962         *level = *(int32_t *)data.dptr;
1963         talloc_free(data.dptr);
1964         return 0;
1965 }
1966
1967 /*
1968   set debug level on a node
1969  */
1970 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1971 {
1972         int ret;
1973         int32_t res;
1974         TDB_DATA data;
1975
1976         data.dptr = (uint8_t *)&level;
1977         data.dsize = sizeof(level);
1978
1979         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1980                            NULL, NULL, &res, NULL, NULL);
1981         if (ret != 0 || res != 0) {
1982                 return -1;
1983         }
1984         return 0;
1985 }
1986
1987
1988 /*
1989   get a list of connected nodes
1990  */
1991 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1992                                 struct timeval timeout,
1993                                 TALLOC_CTX *mem_ctx,
1994                                 uint32_t *num_nodes)
1995 {
1996         struct ctdb_node_map *map=NULL;
1997         int ret, i;
1998         uint32_t *nodes;
1999
2000         *num_nodes = 0;
2001
2002         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2003         if (ret != 0) {
2004                 return NULL;
2005         }
2006
2007         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2008         if (nodes == NULL) {
2009                 return NULL;
2010         }
2011
2012         for (i=0;i<map->num;i++) {
2013                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2014                         nodes[*num_nodes] = map->nodes[i].pnn;
2015                         (*num_nodes)++;
2016                 }
2017         }
2018
2019         return nodes;
2020 }
2021
2022
2023 /*
2024   reset remote status
2025  */
2026 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2027 {
2028         int ret;
2029         int32_t res;
2030
2031         ret = ctdb_control(ctdb, destnode, 0, 
2032                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2033                            NULL, NULL, &res, NULL, NULL);
2034         if (ret != 0 || res != 0) {
2035                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2036                 return -1;
2037         }
2038         return 0;
2039 }
2040
2041 /*
2042   attach to a specific database - client call
2043 */
2044 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2045                                     struct timeval timeout,
2046                                     const char *name,
2047                                     bool persistent,
2048                                     uint32_t tdb_flags)
2049 {
2050         struct ctdb_db_context *ctdb_db;
2051         TDB_DATA data;
2052         int ret;
2053         int32_t res;
2054
2055         ctdb_db = ctdb_db_handle(ctdb, name);
2056         if (ctdb_db) {
2057                 return ctdb_db;
2058         }
2059
2060         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2061         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2062
2063         ctdb_db->ctdb = ctdb;
2064         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2065         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2066
2067         data.dptr = discard_const(name);
2068         data.dsize = strlen(name)+1;
2069
2070         /* CTDB has switched to using jenkins hash for volatile databases.
2071          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2072          * always set it.
2073          */
2074         if (!persistent) {
2075                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2076         }
2077
2078         /* tell ctdb daemon to attach */
2079         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2080                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2081                            0, data, ctdb_db, &data, &res, NULL, NULL);
2082         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2083                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2084                 talloc_free(ctdb_db);
2085                 return NULL;
2086         }
2087         
2088         ctdb_db->db_id = *(uint32_t *)data.dptr;
2089         talloc_free(data.dptr);
2090
2091         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2094                 talloc_free(ctdb_db);
2095                 return NULL;
2096         }
2097
2098         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2099         if (ctdb->valgrinding) {
2100                 tdb_flags |= TDB_NOMMAP;
2101         }
2102         tdb_flags |= TDB_DISALLOW_NESTING;
2103
2104         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2105         if (ctdb_db->ltdb == NULL) {
2106                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2107                 talloc_free(ctdb_db);
2108                 return NULL;
2109         }
2110
2111         ctdb_db->persistent = persistent;
2112
2113         DLIST_ADD(ctdb->db_list, ctdb_db);
2114
2115         /* add well known functions */
2116         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2117         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2118         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2119
2120         return ctdb_db;
2121 }
2122
2123
2124 /*
2125   setup a call for a database
2126  */
2127 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2128 {
2129         struct ctdb_registered_call *call;
2130
2131 #if 0
2132         TDB_DATA data;
2133         int32_t status;
2134         struct ctdb_control_set_call c;
2135         int ret;
2136
2137         /* this is no longer valid with the separate daemon architecture */
2138         c.db_id = ctdb_db->db_id;
2139         c.fn    = fn;
2140         c.id    = id;
2141
2142         data.dptr = (uint8_t *)&c;
2143         data.dsize = sizeof(c);
2144
2145         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2146                            data, NULL, NULL, &status, NULL, NULL);
2147         if (ret != 0 || status != 0) {
2148                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2149                 return -1;
2150         }
2151 #endif
2152
2153         /* also register locally */
2154         call = talloc(ctdb_db, struct ctdb_registered_call);
2155         call->fn = fn;
2156         call->id = id;
2157
2158         DLIST_ADD(ctdb_db->calls, call);        
2159         return 0;
2160 }
2161
2162
2163 struct traverse_state {
2164         bool done;
2165         uint32_t count;
2166         ctdb_traverse_func fn;
2167         void *private_data;
2168         bool listemptyrecords;
2169 };
2170
2171 /*
2172   called on each key during a ctdb_traverse
2173  */
2174 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2175 {
2176         struct traverse_state *state = (struct traverse_state *)p;
2177         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2178         TDB_DATA key;
2179
2180         if (data.dsize < sizeof(uint32_t) ||
2181             d->length != data.dsize) {
2182                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2183                 state->done = true;
2184                 return;
2185         }
2186
2187         key.dsize = d->keylen;
2188         key.dptr  = &d->data[0];
2189         data.dsize = d->datalen;
2190         data.dptr = &d->data[d->keylen];
2191
2192         if (key.dsize == 0 && data.dsize == 0) {
2193                 /* end of traverse */
2194                 state->done = true;
2195                 return;
2196         }
2197
2198         if (!state->listemptyrecords &&
2199             data.dsize == sizeof(struct ctdb_ltdb_header))
2200         {
2201                 /* empty records are deleted records in ctdb */
2202                 return;
2203         }
2204
2205         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2206                 state->done = true;
2207         }
2208
2209         state->count++;
2210 }
2211
2212 /**
2213  * start a cluster wide traverse, calling the supplied fn on each record
2214  * return the number of records traversed, or -1 on error
2215  *
2216  * Extendet variant with a flag to signal whether empty records should
2217  * be listed.
2218  */
2219 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2220                              ctdb_traverse_func fn,
2221                              bool withemptyrecords,
2222                              void *private_data)
2223 {
2224         TDB_DATA data;
2225         struct ctdb_traverse_start_ext t;
2226         int32_t status;
2227         int ret;
2228         uint64_t srvid = (getpid() | 0xFLL<<60);
2229         struct traverse_state state;
2230
2231         state.done = false;
2232         state.count = 0;
2233         state.private_data = private_data;
2234         state.fn = fn;
2235         state.listemptyrecords = withemptyrecords;
2236
2237         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2238         if (ret != 0) {
2239                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2240                 return -1;
2241         }
2242
2243         t.db_id = ctdb_db->db_id;
2244         t.srvid = srvid;
2245         t.reqid = 0;
2246         t.withemptyrecords = withemptyrecords;
2247
2248         data.dptr = (uint8_t *)&t;
2249         data.dsize = sizeof(t);
2250
2251         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2252                            data, NULL, NULL, &status, NULL, NULL);
2253         if (ret != 0 || status != 0) {
2254                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2255                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2256                 return -1;
2257         }
2258
2259         while (!state.done) {
2260                 event_loop_once(ctdb_db->ctdb->ev);
2261         }
2262
2263         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2264         if (ret != 0) {
2265                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2266                 return -1;
2267         }
2268
2269         return state.count;
2270 }
2271
2272 /**
2273  * start a cluster wide traverse, calling the supplied fn on each record
2274  * return the number of records traversed, or -1 on error
2275  *
2276  * Standard version which does not list the empty records:
2277  * These are considered deleted.
2278  */
2279 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2280 {
2281         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2282 }
2283
2284 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2285 /*
2286   called on each key during a catdb
2287  */
2288 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2289 {
2290         int i;
2291         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2292         FILE *f = c->f;
2293         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2294
2295         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2296         for (i=0;i<key.dsize;i++) {
2297                 if (ISASCII(key.dptr[i])) {
2298                         fprintf(f, "%c", key.dptr[i]);
2299                 } else {
2300                         fprintf(f, "\\%02X", key.dptr[i]);
2301                 }
2302         }
2303         fprintf(f, "\"\n");
2304
2305         fprintf(f, "dmaster: %u\n", h->dmaster);
2306         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2307
2308         if (c->printlmaster && ctdb->vnn_map != NULL) {
2309                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2310         }
2311
2312         if (c->printhash) {
2313                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2314         }
2315
2316         if (c->printrecordflags) {
2317                 fprintf(f, "flags: 0x%08x", h->flags);
2318                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2319                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2320                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2321                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2322                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2323                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2324                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2325                 fprintf(f, "\n");
2326         }
2327
2328         if (c->printdatasize) {
2329                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2330         } else {
2331                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2332                 for (i=sizeof(*h);i<data.dsize;i++) {
2333                         if (ISASCII(data.dptr[i])) {
2334                                 fprintf(f, "%c", data.dptr[i]);
2335                         } else {
2336                                 fprintf(f, "\\%02X", data.dptr[i]);
2337                         }
2338                 }
2339                 fprintf(f, "\"\n");
2340         }
2341
2342         fprintf(f, "\n");
2343
2344         return 0;
2345 }
2346
2347 /*
2348   convenience function to list all keys to stdout
2349  */
2350 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2351                  struct ctdb_dump_db_context *ctx)
2352 {
2353         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2354                                  ctx->printemptyrecords, ctx);
2355 }
2356
2357 /*
2358   get the pid of a ctdb daemon
2359  */
2360 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2361 {
2362         int ret;
2363         int32_t res;
2364
2365         ret = ctdb_control(ctdb, destnode, 0, 
2366                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2367                            NULL, NULL, &res, &timeout, NULL);
2368         if (ret != 0) {
2369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2370                 return -1;
2371         }
2372
2373         *pid = res;
2374
2375         return 0;
2376 }
2377
2378
2379 /*
2380   async freeze send control
2381  */
2382 struct ctdb_client_control_state *
2383 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2384 {
2385         return ctdb_control_send(ctdb, destnode, priority, 
2386                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2387                            mem_ctx, &timeout, NULL);
2388 }
2389
2390 /* 
2391    async freeze recv control
2392 */
2393 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2394 {
2395         int ret;
2396         int32_t res;
2397
2398         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2399         if ( (ret != 0) || (res != 0) ){
2400                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2401                 return -1;
2402         }
2403
2404         return 0;
2405 }
2406
2407 /*
2408   freeze databases of a certain priority
2409  */
2410 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2411 {
2412         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2413         struct ctdb_client_control_state *state;
2414         int ret;
2415
2416         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2417         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2418         talloc_free(tmp_ctx);
2419
2420         return ret;
2421 }
2422
2423 /* Freeze all databases */
2424 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2425 {
2426         int i;
2427
2428         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2429                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2430                         return -1;
2431                 }
2432         }
2433         return 0;
2434 }
2435
2436 /*
2437   thaw databases of a certain priority
2438  */
2439 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2440 {
2441         int ret;
2442         int32_t res;
2443
2444         ret = ctdb_control(ctdb, destnode, priority, 
2445                            CTDB_CONTROL_THAW, 0, tdb_null, 
2446                            NULL, NULL, &res, &timeout, NULL);
2447         if (ret != 0 || res != 0) {
2448                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2449                 return -1;
2450         }
2451
2452         return 0;
2453 }
2454
2455 /* thaw all databases */
2456 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2457 {
2458         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2459 }
2460
2461 /*
2462   get pnn of a node, or -1
2463  */
2464 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2465 {
2466         int ret;
2467         int32_t res;
2468
2469         ret = ctdb_control(ctdb, destnode, 0, 
2470                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2471                            NULL, NULL, &res, &timeout, NULL);
2472         if (ret != 0) {
2473                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2474                 return -1;
2475         }
2476
2477         return res;
2478 }
2479
2480 /*
2481   get the monitoring mode of a remote node
2482  */
2483 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2484 {
2485         int ret;
2486         int32_t res;
2487
2488         ret = ctdb_control(ctdb, destnode, 0, 
2489                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2490                            NULL, NULL, &res, &timeout, NULL);
2491         if (ret != 0) {
2492                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2493                 return -1;
2494         }
2495
2496         *monmode = res;
2497
2498         return 0;
2499 }
2500
2501
2502 /*
2503  set the monitoring mode of a remote node to active
2504  */
2505 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2506 {
2507         int ret;
2508         
2509
2510         ret = ctdb_control(ctdb, destnode, 0, 
2511                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2512                            NULL, NULL,NULL, &timeout, NULL);
2513         if (ret != 0) {
2514                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2515                 return -1;
2516         }
2517
2518         
2519
2520         return 0;
2521 }
2522
2523 /*
2524   set the monitoring mode of a remote node to disable
2525  */
2526 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2527 {
2528         int ret;
2529         
2530
2531         ret = ctdb_control(ctdb, destnode, 0, 
2532                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2533                            NULL, NULL, NULL, &timeout, NULL);
2534         if (ret != 0) {
2535                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2536                 return -1;
2537         }
2538
2539         
2540
2541         return 0;
2542 }
2543
2544
2545
2546 /* 
2547   sent to a node to make it take over an ip address
2548 */
2549 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2550                           uint32_t destnode, struct ctdb_public_ip *ip)
2551 {
2552         TDB_DATA data;
2553         struct ctdb_public_ipv4 ipv4;
2554         int ret;
2555         int32_t res;
2556
2557         if (ip->addr.sa.sa_family == AF_INET) {
2558                 ipv4.pnn = ip->pnn;
2559                 ipv4.sin = ip->addr.ip;
2560
2561                 data.dsize = sizeof(ipv4);
2562                 data.dptr  = (uint8_t *)&ipv4;
2563
2564                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2565                            NULL, &res, &timeout, NULL);
2566         } else {
2567                 data.dsize = sizeof(*ip);
2568                 data.dptr  = (uint8_t *)ip;
2569
2570                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2571                            NULL, &res, &timeout, NULL);
2572         }
2573
2574         if (ret != 0 || res != 0) {
2575                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2576                 return -1;
2577         }
2578
2579         return 0;       
2580 }
2581
2582
2583 /* 
2584   sent to a node to make it release an ip address
2585 */
2586 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2587                          uint32_t destnode, struct ctdb_public_ip *ip)
2588 {
2589         TDB_DATA data;
2590         struct ctdb_public_ipv4 ipv4;
2591         int ret;
2592         int32_t res;
2593
2594         if (ip->addr.sa.sa_family == AF_INET) {
2595                 ipv4.pnn = ip->pnn;
2596                 ipv4.sin = ip->addr.ip;
2597
2598                 data.dsize = sizeof(ipv4);
2599                 data.dptr  = (uint8_t *)&ipv4;
2600
2601                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2602                                    NULL, &res, &timeout, NULL);
2603         } else {
2604                 data.dsize = sizeof(*ip);
2605                 data.dptr  = (uint8_t *)ip;
2606
2607                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2608                                    NULL, &res, &timeout, NULL);
2609         }
2610
2611         if (ret != 0 || res != 0) {
2612                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2613                 return -1;
2614         }
2615
2616         return 0;       
2617 }
2618
2619
2620 /*
2621   get a tunable
2622  */
2623 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2624                           struct timeval timeout, 
2625                           uint32_t destnode,
2626                           const char *name, uint32_t *value)
2627 {
2628         struct ctdb_control_get_tunable *t;
2629         TDB_DATA data, outdata;
2630         int32_t res;
2631         int ret;
2632
2633         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2634         data.dptr  = talloc_size(ctdb, data.dsize);
2635         CTDB_NO_MEMORY(ctdb, data.dptr);
2636
2637         t = (struct ctdb_control_get_tunable *)data.dptr;
2638         t->length = strlen(name)+1;
2639         memcpy(t->name, name, t->length);
2640
2641         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2642                            &outdata, &res, &timeout, NULL);
2643         talloc_free(data.dptr);
2644         if (ret != 0 || res != 0) {
2645                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2646                 return ret != 0 ? ret : res;
2647         }
2648
2649         if (outdata.dsize != sizeof(uint32_t)) {
2650                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2651                 talloc_free(outdata.dptr);
2652                 return -1;
2653         }
2654         
2655         *value = *(uint32_t *)outdata.dptr;
2656         talloc_free(outdata.dptr);
2657
2658         return 0;
2659 }
2660
2661 /*
2662   set a tunable
2663  */
2664 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2665                           struct timeval timeout, 
2666                           uint32_t destnode,
2667                           const char *name, uint32_t value)
2668 {
2669         struct ctdb_control_set_tunable *t;
2670         TDB_DATA data;
2671         int32_t res;
2672         int ret;
2673
2674         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2675         data.dptr  = talloc_size(ctdb, data.dsize);
2676         CTDB_NO_MEMORY(ctdb, data.dptr);
2677
2678         t = (struct ctdb_control_set_tunable *)data.dptr;
2679         t->length = strlen(name)+1;
2680         memcpy(t->name, name, t->length);
2681         t->value = value;
2682
2683         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2684                            NULL, &res, &timeout, NULL);
2685         talloc_free(data.dptr);
2686         if (ret != 0 || res != 0) {
2687                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2688                 return -1;
2689         }
2690
2691         return 0;
2692 }
2693
2694 /*
2695   list tunables
2696  */
2697 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2698                             struct timeval timeout, 
2699                             uint32_t destnode,
2700                             TALLOC_CTX *mem_ctx,
2701                             const char ***list, uint32_t *count)
2702 {
2703         TDB_DATA outdata;
2704         int32_t res;
2705         int ret;
2706         struct ctdb_control_list_tunable *t;
2707         char *p, *s, *ptr;
2708
2709         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2710                            mem_ctx, &outdata, &res, &timeout, NULL);
2711         if (ret != 0 || res != 0) {
2712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2713                 return -1;
2714         }
2715
2716         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2717         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2718             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2719                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2720                 talloc_free(outdata.dptr);
2721                 return -1;              
2722         }
2723         
2724         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2725         CTDB_NO_MEMORY(ctdb, p);
2726
2727         talloc_free(outdata.dptr);
2728         
2729         (*list) = NULL;
2730         (*count) = 0;
2731
2732         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2733                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2734                 CTDB_NO_MEMORY(ctdb, *list);
2735                 (*list)[*count] = talloc_strdup(*list, s);
2736                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2737                 (*count)++;
2738         }
2739
2740         talloc_free(p);
2741
2742         return 0;
2743 }
2744
2745
2746 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2747                                    struct timeval timeout, uint32_t destnode,
2748                                    TALLOC_CTX *mem_ctx,
2749                                    uint32_t flags,
2750                                    struct ctdb_all_public_ips **ips)
2751 {
2752         int ret;
2753         TDB_DATA outdata;
2754         int32_t res;
2755
2756         ret = ctdb_control(ctdb, destnode, 0, 
2757                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2758                            mem_ctx, &outdata, &res, &timeout, NULL);
2759         if (ret == 0 && res == -1) {
2760                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2761                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2762         }
2763         if (ret != 0 || res != 0) {
2764           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2765                 return -1;
2766         }
2767
2768         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2769         talloc_free(outdata.dptr);
2770                     
2771         return 0;
2772 }
2773
2774 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2775                              struct timeval timeout, uint32_t destnode,
2776                              TALLOC_CTX *mem_ctx,
2777                              struct ctdb_all_public_ips **ips)
2778 {
2779         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2780                                               destnode, mem_ctx,
2781                                               0, ips);
2782 }
2783
2784 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2785                         struct timeval timeout, uint32_t destnode, 
2786                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2787 {
2788         int ret, i, len;
2789         TDB_DATA outdata;
2790         int32_t res;
2791         struct ctdb_all_public_ipsv4 *ipsv4;
2792
2793         ret = ctdb_control(ctdb, destnode, 0, 
2794                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2795                            mem_ctx, &outdata, &res, &timeout, NULL);
2796         if (ret != 0 || res != 0) {
2797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2798                 return -1;
2799         }
2800
2801         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2802         len = offsetof(struct ctdb_all_public_ips, ips) +
2803                 ipsv4->num*sizeof(struct ctdb_public_ip);
2804         *ips = talloc_zero_size(mem_ctx, len);
2805         CTDB_NO_MEMORY(ctdb, *ips);
2806         (*ips)->num = ipsv4->num;
2807         for (i=0; i<ipsv4->num; i++) {
2808                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2809                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2810         }
2811
2812         talloc_free(outdata.dptr);
2813                     
2814         return 0;
2815 }
2816
2817 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2818                                  struct timeval timeout, uint32_t destnode,
2819                                  TALLOC_CTX *mem_ctx,
2820                                  const ctdb_sock_addr *addr,
2821                                  struct ctdb_control_public_ip_info **_info)
2822 {
2823         int ret;
2824         TDB_DATA indata;
2825         TDB_DATA outdata;
2826         int32_t res;
2827         struct ctdb_control_public_ip_info *info;
2828         uint32_t len;
2829         uint32_t i;
2830
2831         indata.dptr = discard_const_p(uint8_t, addr);
2832         indata.dsize = sizeof(*addr);
2833
2834         ret = ctdb_control(ctdb, destnode, 0,
2835                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2836                            mem_ctx, &outdata, &res, &timeout, NULL);
2837         if (ret != 0 || res != 0) {
2838                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2839                                 "failed ret:%d res:%d\n",
2840                                 ret, res));
2841                 return -1;
2842         }
2843
2844         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2845         if (len > outdata.dsize) {
2846                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2847                                 "returned invalid data with size %u > %u\n",
2848                                 (unsigned int)outdata.dsize,
2849                                 (unsigned int)len));
2850                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2851                 return -1;
2852         }
2853
2854         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2855         len += info->num*sizeof(struct ctdb_control_iface_info);
2856
2857         if (len > outdata.dsize) {
2858                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2859                                 "returned invalid data with size %u > %u\n",
2860                                 (unsigned int)outdata.dsize,
2861                                 (unsigned int)len));
2862                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2863                 return -1;
2864         }
2865
2866         /* make sure we null terminate the returned strings */
2867         for (i=0; i < info->num; i++) {
2868                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2869         }
2870
2871         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2872                                                                 outdata.dptr,
2873                                                                 outdata.dsize);
2874         talloc_free(outdata.dptr);
2875         if (*_info == NULL) {
2876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2877                                 "talloc_memdup size %u failed\n",
2878                                 (unsigned int)outdata.dsize));
2879                 return -1;
2880         }
2881
2882         return 0;
2883 }
2884
2885 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2886                          struct timeval timeout, uint32_t destnode,
2887                          TALLOC_CTX *mem_ctx,
2888                          struct ctdb_control_get_ifaces **_ifaces)
2889 {
2890         int ret;
2891         TDB_DATA outdata;
2892         int32_t res;
2893         struct ctdb_control_get_ifaces *ifaces;
2894         uint32_t len;
2895         uint32_t i;
2896
2897         ret = ctdb_control(ctdb, destnode, 0,
2898                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2899                            mem_ctx, &outdata, &res, &timeout, NULL);
2900         if (ret != 0 || res != 0) {
2901                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2902                                 "failed ret:%d res:%d\n",
2903                                 ret, res));
2904                 return -1;
2905         }
2906
2907         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2908         if (len > outdata.dsize) {
2909                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2910                                 "returned invalid data with size %u > %u\n",
2911                                 (unsigned int)outdata.dsize,
2912                                 (unsigned int)len));
2913                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2914                 return -1;
2915         }
2916
2917         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2918         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2919
2920         if (len > outdata.dsize) {
2921                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2922                                 "returned invalid data with size %u > %u\n",
2923                                 (unsigned int)outdata.dsize,
2924                                 (unsigned int)len));
2925                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2926                 return -1;
2927         }
2928
2929         /* make sure we null terminate the returned strings */
2930         for (i=0; i < ifaces->num; i++) {
2931                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2932         }
2933
2934         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2935                                                                   outdata.dptr,
2936                                                                   outdata.dsize);
2937         talloc_free(outdata.dptr);
2938         if (*_ifaces == NULL) {
2939                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2940                                 "talloc_memdup size %u failed\n",
2941                                 (unsigned int)outdata.dsize));
2942                 return -1;
2943         }
2944
2945         return 0;
2946 }
2947
2948 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2949                              struct timeval timeout, uint32_t destnode,
2950                              TALLOC_CTX *mem_ctx,
2951                              const struct ctdb_control_iface_info *info)
2952 {
2953         int ret;
2954         TDB_DATA indata;
2955         int32_t res;
2956
2957         indata.dptr = discard_const_p(uint8_t, info);
2958         indata.dsize = sizeof(*info);
2959
2960         ret = ctdb_control(ctdb, destnode, 0,
2961                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2962                            mem_ctx, NULL, &res, &timeout, NULL);
2963         if (ret != 0 || res != 0) {
2964                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2965                                 "failed ret:%d res:%d\n",
2966                                 ret, res));
2967                 return -1;
2968         }
2969
2970         return 0;
2971 }
2972
2973 /*
2974   set/clear the permanent disabled bit on a remote node
2975  */
2976 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2977                        uint32_t set, uint32_t clear)
2978 {
2979         int ret;
2980         TDB_DATA data;
2981         struct ctdb_node_map *nodemap=NULL;
2982         struct ctdb_node_flag_change c;
2983         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2984         uint32_t recmaster;
2985         uint32_t *nodes;
2986
2987
2988         /* find the recovery master */
2989         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2990         if (ret != 0) {
2991                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2992                 talloc_free(tmp_ctx);
2993                 return ret;
2994         }
2995
2996
2997         /* read the node flags from the recmaster */
2998         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2999         if (ret != 0) {
3000                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
3001                 talloc_free(tmp_ctx);
3002                 return -1;
3003         }
3004         if (destnode >= nodemap->num) {
3005                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
3006                 talloc_free(tmp_ctx);
3007                 return -1;
3008         }
3009
3010         c.pnn       = destnode;
3011         c.old_flags = nodemap->nodes[destnode].flags;
3012         c.new_flags = c.old_flags;
3013         c.new_flags |= set;
3014         c.new_flags &= ~clear;
3015
3016         data.dsize = sizeof(c);
3017         data.dptr = (unsigned char *)&c;
3018
3019         /* send the flags update to all connected nodes */
3020         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3021
3022         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3023                                         nodes, 0,
3024                                         timeout, false, data,
3025                                         NULL, NULL,
3026                                         NULL) != 0) {
3027                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3028
3029                 talloc_free(tmp_ctx);
3030                 return -1;
3031         }
3032
3033         talloc_free(tmp_ctx);
3034         return 0;
3035 }
3036
3037
3038 /*
3039   get all tunables
3040  */
3041 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3042                                struct timeval timeout, 
3043                                uint32_t destnode,
3044                                struct ctdb_tunable *tunables)
3045 {
3046         TDB_DATA outdata;
3047         int ret;
3048         int32_t res;
3049
3050         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3051                            &outdata, &res, &timeout, NULL);
3052         if (ret != 0 || res != 0) {
3053                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3054                 return -1;
3055         }
3056
3057         if (outdata.dsize != sizeof(*tunables)) {
3058                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3059                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3060                 return -1;              
3061         }
3062
3063         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3064         talloc_free(outdata.dptr);
3065         return 0;
3066 }
3067
3068 /*
3069   add a public address to a node
3070  */
3071 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3072                       struct timeval timeout, 
3073                       uint32_t destnode,
3074                       struct ctdb_control_ip_iface *pub)
3075 {
3076         TDB_DATA data;
3077         int32_t res;
3078         int ret;
3079
3080         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3081         data.dptr  = (unsigned char *)pub;
3082
3083         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3084                            NULL, &res, &timeout, NULL);
3085         if (ret != 0 || res != 0) {
3086                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3087                 return -1;
3088         }
3089
3090         return 0;
3091 }
3092
3093 /*
3094   delete a public address from a node
3095  */
3096 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3097                       struct timeval timeout, 
3098                       uint32_t destnode,
3099                       struct ctdb_control_ip_iface *pub)
3100 {
3101         TDB_DATA data;
3102         int32_t res;
3103         int ret;
3104
3105         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3106         data.dptr  = (unsigned char *)pub;
3107
3108         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3109                            NULL, &res, &timeout, NULL);
3110         if (ret != 0 || res != 0) {
3111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3112                 return -1;
3113         }
3114
3115         return 0;
3116 }
3117
3118 /*
3119   kill a tcp connection
3120  */
3121 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3122                       struct timeval timeout, 
3123                       uint32_t destnode,
3124                       struct ctdb_control_killtcp *killtcp)
3125 {
3126         TDB_DATA data;
3127         int32_t res;
3128         int ret;
3129
3130         data.dsize = sizeof(struct ctdb_control_killtcp);
3131         data.dptr  = (unsigned char *)killtcp;
3132
3133         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3134                            NULL, &res, &timeout, NULL);
3135         if (ret != 0 || res != 0) {
3136                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3137                 return -1;
3138         }
3139
3140         return 0;
3141 }
3142
3143 /*
3144   send a gratious arp
3145  */
3146 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3147                       struct timeval timeout, 
3148                       uint32_t destnode,
3149                       ctdb_sock_addr *addr,
3150                       const char *ifname)
3151 {
3152         TDB_DATA data;
3153         int32_t res;
3154         int ret, len;
3155         struct ctdb_control_gratious_arp *gratious_arp;
3156         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3157
3158
3159         len = strlen(ifname)+1;
3160         gratious_arp = talloc_size(tmp_ctx, 
3161                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3162         CTDB_NO_MEMORY(ctdb, gratious_arp);
3163
3164         gratious_arp->addr = *addr;
3165         gratious_arp->len = len;
3166         memcpy(&gratious_arp->iface[0], ifname, len);
3167
3168
3169         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3170         data.dptr  = (unsigned char *)gratious_arp;
3171
3172         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3173                            NULL, &res, &timeout, NULL);
3174         if (ret != 0 || res != 0) {
3175                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3176                 talloc_free(tmp_ctx);
3177                 return -1;
3178         }
3179
3180         talloc_free(tmp_ctx);
3181         return 0;
3182 }
3183
3184 /*
3185   get a list of all tcp tickles that a node knows about for a particular vnn
3186  */
3187 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3188                               struct timeval timeout, uint32_t destnode, 
3189                               TALLOC_CTX *mem_ctx, 
3190                               ctdb_sock_addr *addr,
3191                               struct ctdb_control_tcp_tickle_list **list)
3192 {
3193         int ret;
3194         TDB_DATA data, outdata;
3195         int32_t status;
3196
3197         data.dptr = (uint8_t*)addr;
3198         data.dsize = sizeof(ctdb_sock_addr);
3199
3200         ret = ctdb_control(ctdb, destnode, 0, 
3201                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3202                            mem_ctx, &outdata, &status, NULL, NULL);
3203         if (ret != 0 || status != 0) {
3204                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3205                 return -1;
3206         }
3207
3208         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3209
3210         return status;
3211 }
3212
3213 /*
3214   register a server id
3215  */
3216 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3217                       struct timeval timeout, 
3218                       struct ctdb_server_id *id)
3219 {
3220         TDB_DATA data;
3221         int32_t res;
3222         int ret;
3223
3224         data.dsize = sizeof(struct ctdb_server_id);
3225         data.dptr  = (unsigned char *)id;
3226
3227         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3228                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3229                         0, data, NULL,
3230                         NULL, &res, &timeout, NULL);
3231         if (ret != 0 || res != 0) {
3232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3233                 return -1;
3234         }
3235
3236         return 0;
3237 }
3238
3239 /*
3240   unregister a server id
3241  */
3242 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3243                       struct timeval timeout, 
3244                       struct ctdb_server_id *id)
3245 {
3246         TDB_DATA data;
3247         int32_t res;
3248         int ret;
3249
3250         data.dsize = sizeof(struct ctdb_server_id);
3251         data.dptr  = (unsigned char *)id;
3252
3253         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3254                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3255                         0, data, NULL,
3256                         NULL, &res, &timeout, NULL);
3257         if (ret != 0 || res != 0) {
3258                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3259                 return -1;
3260         }
3261
3262         return 0;
3263 }
3264
3265
3266 /*
3267   check if a server id exists
3268
3269   if a server id does exist, return *status == 1, otherwise *status == 0
3270  */
3271 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3272                       struct timeval timeout, 
3273                       uint32_t destnode,
3274                       struct ctdb_server_id *id,
3275                       uint32_t *status)
3276 {
3277         TDB_DATA data;
3278         int32_t res;
3279         int ret;
3280
3281         data.dsize = sizeof(struct ctdb_server_id);
3282         data.dptr  = (unsigned char *)id;
3283
3284         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3285                         0, data, NULL,
3286                         NULL, &res, &timeout, NULL);
3287         if (ret != 0) {
3288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3289                 return -1;
3290         }
3291
3292         if (res) {
3293                 *status = 1;
3294         } else {
3295                 *status = 0;
3296         }
3297
3298         return 0;
3299 }
3300
3301 /*
3302    get the list of server ids that are registered on a node
3303 */
3304 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3305                 TALLOC_CTX *mem_ctx,
3306                 struct timeval timeout, uint32_t destnode, 
3307                 struct ctdb_server_id_list **svid_list)
3308 {
3309         int ret;
3310         TDB_DATA outdata;
3311         int32_t res;
3312
3313         ret = ctdb_control(ctdb, destnode, 0, 
3314                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3315                            mem_ctx, &outdata, &res, &timeout, NULL);
3316         if (ret != 0 || res != 0) {
3317                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3318                 return -1;
3319         }
3320
3321         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3322                     
3323         return 0;
3324 }
3325
3326 /*
3327   initialise the ctdb daemon for client applications
3328
3329   NOTE: In current code the daemon does not fork. This is for testing purposes only
3330   and to simplify the code.
3331 */
3332 struct ctdb_context *ctdb_init(struct event_context *ev)
3333 {
3334         int ret;
3335         struct ctdb_context *ctdb;
3336
3337         ctdb = talloc_zero(ev, struct ctdb_context);
3338         if (ctdb == NULL) {
3339                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3340                 return NULL;
3341         }
3342         ctdb->ev  = ev;
3343         ctdb->idr = idr_init(ctdb);
3344         /* Wrap early to exercise code. */
3345         ctdb->lastid = INT_MAX-200;
3346         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3347
3348         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3349         if (ret != 0) {
3350                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3351                 talloc_free(ctdb);
3352                 return NULL;
3353         }
3354
3355         ctdb->statistics.statistics_start_time = timeval_current();
3356
3357         return ctdb;
3358 }
3359
3360
3361 /*
3362   set some ctdb flags
3363 */
3364 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3365 {
3366         ctdb->flags |= flags;
3367 }
3368
3369 /*
3370   setup the local socket name
3371 */
3372 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3373 {
3374         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3375         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3376
3377         return 0;
3378 }
3379
3380 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3381 {
3382         return ctdb->daemon.name;
3383 }
3384
3385 /*
3386   return the pnn of this node
3387 */
3388 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3389 {
3390         return ctdb->pnn;
3391 }
3392
3393
3394 /*
3395   get the uptime of a remote node
3396  */
3397 struct ctdb_client_control_state *
3398 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3399 {
3400         return ctdb_control_send(ctdb, destnode, 0, 
3401                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3402                            mem_ctx, &timeout, NULL);
3403 }
3404
3405 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3406 {
3407         int ret;
3408         int32_t res;
3409         TDB_DATA outdata;
3410
3411         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3412         if (ret != 0 || res != 0) {
3413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3414                 return -1;
3415         }
3416
3417         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3418
3419         return 0;
3420 }
3421
3422 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3423 {
3424         struct ctdb_client_control_state *state;
3425
3426         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3427         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3428 }
3429
3430 /*
3431   send a control to execute the "recovered" event script on a node
3432  */
3433 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3434 {
3435         int ret;
3436         int32_t status;
3437
3438         ret = ctdb_control(ctdb, destnode, 0, 
3439                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3440                            NULL, NULL, &status, &timeout, NULL);
3441         if (ret != 0 || status != 0) {
3442                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3443                 return -1;
3444         }
3445
3446         return 0;
3447 }
3448
3449 /* 
3450   callback for the async helpers used when sending the same control
3451   to multiple nodes in parallell.
3452 */
3453 static void async_callback(struct ctdb_client_control_state *state)
3454 {
3455         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3456         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3457         int ret;
3458         TDB_DATA outdata;
3459         int32_t res = -1;
3460         uint32_t destnode = state->c->hdr.destnode;
3461
3462         outdata.dsize = 0;
3463         outdata.dptr = NULL;
3464
3465         /* one more node has responded with recmode data */
3466         data->count--;
3467
3468         /* if we failed to push the db, then return an error and let
3469            the main loop try again.
3470         */
3471         if (state->state != CTDB_CONTROL_DONE) {
3472                 if ( !data->dont_log_errors) {
3473                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3474                 }
3475                 data->fail_count++;
3476                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3477                         res = -ETIME;
3478                 } else {
3479                         res = -1;
3480                 }
3481                 if (data->fail_callback) {
3482                         data->fail_callback(ctdb, destnode, res, outdata,
3483                                         data->callback_data);
3484                 }
3485                 return;
3486         }
3487         
3488         state->async.fn = NULL;
3489
3490         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3491         if ((ret != 0) || (res != 0)) {
3492                 if ( !data->dont_log_errors) {
3493                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3494                 }
3495                 data->fail_count++;
3496                 if (data->fail_callback) {
3497                         data->fail_callback(ctdb, destnode, res, outdata,
3498                                         data->callback_data);
3499                 }
3500         }
3501         if ((ret == 0) && (data->callback != NULL)) {
3502                 data->callback(ctdb, destnode, res, outdata,
3503                                         data->callback_data);
3504         }
3505 }
3506
3507
3508 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3509 {
3510         /* set up the callback functions */
3511         state->async.fn = async_callback;
3512         state->async.private_data = data;
3513         
3514         /* one more control to wait for to complete */
3515         data->count++;
3516 }
3517
3518
3519 /* wait for up to the maximum number of seconds allowed
3520    or until all nodes we expect a response from has replied
3521 */
3522 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3523 {
3524         while (data->count > 0) {
3525                 event_loop_once(ctdb->ev);
3526         }
3527         if (data->fail_count != 0) {
3528                 if (!data->dont_log_errors) {
3529                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3530                                  data->fail_count));
3531                 }
3532                 return -1;
3533         }
3534         return 0;
3535 }
3536
3537
3538 /* 
3539    perform a simple control on the listed nodes
3540    The control cannot return data
3541  */
3542 int ctdb_client_async_control(struct ctdb_context *ctdb,
3543                                 enum ctdb_controls opcode,
3544                                 uint32_t *nodes,
3545                                 uint64_t srvid,
3546                                 struct timeval timeout,
3547                                 bool dont_log_errors,
3548                                 TDB_DATA data,
3549                                 client_async_callback client_callback,
3550                                 client_async_callback fail_callback,
3551                                 void *callback_data)
3552 {
3553         struct client_async_data *async_data;
3554         struct ctdb_client_control_state *state;
3555         int j, num_nodes;
3556
3557         async_data = talloc_zero(ctdb, struct client_async_data);
3558         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3559         async_data->dont_log_errors = dont_log_errors;
3560         async_data->callback = client_callback;
3561         async_data->fail_callback = fail_callback;
3562         async_data->callback_data = callback_data;
3563         async_data->opcode        = opcode;
3564
3565         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3566
3567         /* loop over all nodes and send an async control to each of them */
3568         for (j=0; j<num_nodes; j++) {
3569                 uint32_t pnn = nodes[j];
3570
3571                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3572                                           0, data, async_data, &timeout, NULL);
3573                 if (state == NULL) {
3574                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3575                         talloc_free(async_data);
3576                         return -1;
3577                 }
3578                 
3579                 ctdb_client_async_add(async_data, state);
3580         }
3581
3582         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3583                 talloc_free(async_data);
3584                 return -1;
3585         }
3586
3587         talloc_free(async_data);
3588         return 0;
3589 }
3590
3591 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3592                                 struct ctdb_vnn_map *vnn_map,
3593                                 TALLOC_CTX *mem_ctx,
3594                                 bool include_self)
3595 {
3596         int i, j, num_nodes;
3597         uint32_t *nodes;
3598
3599         for (i=num_nodes=0;i<vnn_map->size;i++) {
3600                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3601                         continue;
3602                 }
3603                 num_nodes++;
3604         } 
3605
3606         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3607         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3608
3609         for (i=j=0;i<vnn_map->size;i++) {
3610                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3611                         continue;
3612                 }
3613                 nodes[j++] = vnn_map->map[i];
3614         } 
3615
3616         return nodes;
3617 }
3618
3619 /* Get list of nodes not including those with flags specified by mask.
3620  * If exclude_pnn is not -1 then exclude that pnn from the list.
3621  */
3622 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3623                         struct ctdb_node_map *node_map,
3624                         TALLOC_CTX *mem_ctx,
3625                         uint32_t mask,
3626                         int exclude_pnn)
3627 {
3628         int i, j, num_nodes;
3629         uint32_t *nodes;
3630
3631         for (i=num_nodes=0;i<node_map->num;i++) {
3632                 if (node_map->nodes[i].flags & mask) {
3633                         continue;
3634                 }
3635                 if (node_map->nodes[i].pnn == exclude_pnn) {
3636                         continue;
3637                 }
3638                 num_nodes++;
3639         } 
3640
3641         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3642         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3643
3644         for (i=j=0;i<node_map->num;i++) {
3645                 if (node_map->nodes[i].flags & mask) {
3646                         continue;
3647                 }
3648                 if (node_map->nodes[i].pnn == exclude_pnn) {
3649                         continue;
3650                 }
3651                 nodes[j++] = node_map->nodes[i].pnn;
3652         } 
3653
3654         return nodes;
3655 }
3656
3657 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3658                                 struct ctdb_node_map *node_map,
3659                                 TALLOC_CTX *mem_ctx,
3660                                 bool include_self)
3661 {
3662         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3663                              include_self ? -1 : ctdb->pnn);
3664 }
3665
3666 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3667                                 struct ctdb_node_map *node_map,
3668                                 TALLOC_CTX *mem_ctx,
3669                                 bool include_self)
3670 {
3671         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3672                              include_self ? -1 : ctdb->pnn);
3673 }
3674
3675 /* 
3676   this is used to test if a pnn lock exists and if it exists will return
3677   the number of connections that pnn has reported or -1 if that recovery
3678   daemon is not running.
3679 */
3680 int
3681 ctdb_read_pnn_lock(int fd, int32_t pnn)
3682 {
3683         struct flock lock;
3684         char c;
3685
3686         lock.l_type = F_WRLCK;
3687         lock.l_whence = SEEK_SET;
3688         lock.l_start = pnn;
3689         lock.l_len = 1;
3690         lock.l_pid = 0;
3691
3692         if (fcntl(fd, F_GETLK, &lock) != 0) {
3693                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3694                 return -1;
3695         }
3696
3697         if (lock.l_type == F_UNLCK) {
3698                 return -1;
3699         }
3700
3701         if (pread(fd, &c, 1, pnn) == -1) {
3702                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3703                 return -1;
3704         }
3705
3706         return c;
3707 }
3708
3709 /*
3710   get capabilities of a remote node
3711  */
3712 struct ctdb_client_control_state *
3713 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3714 {
3715         return ctdb_control_send(ctdb, destnode, 0, 
3716                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3717                            mem_ctx, &timeout, NULL);
3718 }
3719
3720 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3721 {
3722         int ret;
3723         int32_t res;
3724         TDB_DATA outdata;
3725
3726         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3727         if ( (ret != 0) || (res != 0) ) {
3728                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3729                 return -1;
3730         }
3731
3732         if (capabilities) {
3733                 *capabilities = *((uint32_t *)outdata.dptr);
3734         }
3735
3736         return 0;
3737 }
3738
3739 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3740 {
3741         struct ctdb_client_control_state *state;
3742         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3743         int ret;
3744
3745         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3746         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3747         talloc_free(tmp_ctx);
3748         return ret;
3749 }
3750
3751 struct server_id {
3752         uint64_t pid;
3753         uint32_t task_id;
3754         uint32_t vnn;
3755         uint64_t unique_id;
3756 };
3757
3758 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3759 {
3760         struct server_id id;
3761
3762         id.pid = getpid();
3763         id.task_id = reqid;
3764         id.vnn = ctdb_get_pnn(ctdb);
3765         id.unique_id = id.vnn;
3766         id.unique_id = (id.unique_id << 32) | reqid;
3767
3768         return id;
3769 }
3770
3771 static bool server_id_equal(struct server_id *id1, struct server_id *id2)
3772 {
3773         if (id1->pid != id2->pid) {
3774                 return false;
3775         }
3776
3777         if (id1->task_id != id2->task_id) {
3778                 return false;
3779         }
3780
3781         if (id1->vnn != id2->vnn) {
3782                 return false;
3783         }
3784
3785         if (id1->unique_id != id2->unique_id) {
3786                 return false;
3787         }
3788
3789         return true;
3790 }
3791
3792 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3793 {
3794         struct ctdb_server_id sid;
3795         int ret;
3796         uint32_t result;
3797
3798         sid.type = SERVER_TYPE_SAMBA;
3799         sid.pnn = id->vnn;
3800         sid.server_id = id->pid;
3801
3802         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3803                                         id->vnn, &sid, &result);
3804         if (ret != 0) {
3805                 /* If control times out, assume server_id exists. */
3806                 return true;
3807         }
3808
3809         if (result) {
3810                 return true;
3811         }
3812
3813         return false;
3814 }
3815
3816
3817 enum g_lock_type {
3818         G_LOCK_READ = 0,
3819         G_LOCK_WRITE = 1,
3820 };
3821
3822 struct g_lock_rec {
3823         enum g_lock_type type;
3824         struct server_id id;
3825 };
3826
3827 struct g_lock_recs {
3828         unsigned int num;
3829         struct g_lock_rec *lock;
3830 };
3831
3832 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3833                          struct g_lock_recs **locks)
3834 {
3835         struct g_lock_recs *recs;
3836
3837         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3838         if (recs == NULL) {
3839                 return false;
3840         }
3841
3842         if (data.dsize == 0) {
3843                 goto done;
3844         }
3845
3846         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3847                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3848                                   (unsigned long)data.dsize));
3849                 talloc_free(recs);
3850                 return false;
3851         }
3852
3853         recs->num = data.dsize / sizeof(struct g_lock_rec);
3854         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3855         if (recs->lock == NULL) {
3856                 talloc_free(recs);
3857                 return false;
3858         }
3859
3860 done:
3861         if (locks != NULL) {
3862                 *locks = recs;
3863         }
3864
3865         return true;
3866 }
3867
3868
3869 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3870                         struct ctdb_db_context *ctdb_db,
3871                         const char *keyname, uint32_t reqid)
3872 {
3873         TDB_DATA key, data;
3874         struct ctdb_record_handle *h;
3875         struct g_lock_recs *locks;
3876         struct server_id id;
3877         struct timeval t_start;
3878         int i;
3879
3880         key.dptr = (uint8_t *)discard_const(keyname);
3881         key.dsize = strlen(keyname) + 1;
3882
3883         t_start = timeval_current();
3884
3885 again:
3886         /* Keep trying for an hour. */
3887         if (timeval_elapsed(&t_start) > 3600) {
3888                 return false;
3889         }
3890
3891         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3892         if (h == NULL) {
3893                 return false;
3894         }
3895
3896         if (!g_lock_parse(h, data, &locks)) {
3897                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3898                 talloc_free(data.dptr);
3899                 talloc_free(h);
3900                 return false;
3901         }
3902
3903         talloc_free(data.dptr);
3904
3905         id = server_id_get(ctdb_db->ctdb, reqid);
3906
3907         i = 0;
3908         while (i < locks->num) {
3909                 if (server_id_equal(&locks->lock[i].id, &id)) {
3910                         /* Internal error */
3911                         talloc_free(h);
3912                         return false;
3913                 }
3914
3915                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3916                         if (i < locks->num-1) {
3917                                 locks->lock[i] = locks->lock[locks->num-1];
3918                         }
3919                         locks->num--;
3920                         continue;
3921                 }
3922
3923                 /* This entry is locked. */
3924                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3925                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3926                                    (unsigned long long)id.pid,
3927                                    id.task_id, id.vnn,
3928                                    (unsigned long long)id.unique_id));
3929                 talloc_free(h);
3930                 goto again;
3931         }
3932
3933         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3934                                      locks->num+1);
3935         if (locks->lock == NULL) {
3936                 talloc_free(h);
3937                 return false;
3938         }
3939
3940         locks->lock[locks->num].type = G_LOCK_WRITE;
3941         locks->lock[locks->num].id = id;
3942         locks->num++;
3943
3944         data.dptr = (uint8_t *)locks->lock;
3945         data.dsize = locks->num * sizeof(struct g_lock_rec);
3946
3947         if (ctdb_record_store(h, data) != 0) {
3948                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3949                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3950                                   (unsigned long long)id.pid,
3951                                   id.task_id, id.vnn,
3952                                   (unsigned long long)id.unique_id));
3953                 talloc_free(h);
3954                 return false;
3955         }
3956
3957         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3958                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3959                            (unsigned long long)id.pid,
3960                            id.task_id, id.vnn,
3961                            (unsigned long long)id.unique_id));
3962
3963         talloc_free(h);
3964         return true;
3965 }
3966
3967 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3968                           struct ctdb_db_context *ctdb_db,
3969                           const char *keyname, uint32_t reqid)
3970 {
3971         TDB_DATA key, data;
3972         struct ctdb_record_handle *h;
3973         struct g_lock_recs *locks;
3974         struct server_id id;
3975         int i;
3976         bool found = false;
3977
3978         key.dptr = (uint8_t *)discard_const(keyname);
3979         key.dsize = strlen(keyname) + 1;
3980         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3981         if (h == NULL) {
3982                 return false;
3983         }
3984
3985         if (!g_lock_parse(h, data, &locks)) {
3986                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3987                 talloc_free(data.dptr);
3988                 talloc_free(h);
3989                 return false;
3990         }
3991
3992         talloc_free(data.dptr);
3993
3994         id = server_id_get(ctdb_db->ctdb, reqid);
3995
3996         for (i=0; i<locks->num; i++) {
3997                 if (server_id_equal(&locks->lock[i].id, &id)) {
3998                         if (i < locks->num-1) {
3999                                 locks->lock[i] = locks->lock[locks->num-1];
4000                         }
4001                         locks->num--;
4002                         found = true;
4003                         break;
4004                 }
4005         }
4006
4007         if (!found) {
4008                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4009                 talloc_free(h);
4010                 return false;
4011         }
4012
4013         data.dptr = (uint8_t *)locks->lock;
4014         data.dsize = locks->num * sizeof(struct g_lock_rec);
4015
4016         if (ctdb_record_store(h, data) != 0) {
4017                 talloc_free(h);
4018                 return false;
4019         }
4020
4021         talloc_free(h);
4022         return true;
4023 }
4024
4025
4026 struct ctdb_transaction_handle {
4027         struct ctdb_db_context *ctdb_db;
4028         struct ctdb_db_context *g_lock_db;
4029         char *lock_name;
4030         uint32_t reqid;
4031         /*
4032          * we store reads and writes done under a transaction:
4033          * - one list stores both reads and writes (m_all)
4034          * - the other just writes (m_write)
4035          */
4036         struct ctdb_marshall_buffer *m_all;
4037         struct ctdb_marshall_buffer *m_write;
4038 };
4039
4040 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4041 {
4042         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4043         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4044         return 0;
4045 }
4046
4047
4048 /**
4049  * start a transaction on a database
4050  */
4051 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4052                                                        TALLOC_CTX *mem_ctx)
4053 {
4054         struct ctdb_transaction_handle *h;
4055         struct ctdb_server_id id;
4056
4057         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4058         if (h == NULL) {
4059                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4060                 return NULL;
4061         }
4062
4063         h->ctdb_db = ctdb_db;
4064         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4065                                        (unsigned int)ctdb_db->db_id);
4066         if (h->lock_name == NULL) {
4067                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4068                 talloc_free(h);
4069                 return NULL;
4070         }
4071
4072         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4073                                    "g_lock.tdb", false, 0);
4074         if (!h->g_lock_db) {
4075                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4076                 talloc_free(h);
4077                 return NULL;
4078         }
4079
4080         id.type = SERVER_TYPE_SAMBA;
4081         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4082         id.server_id = getpid();
4083
4084         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4085                                          &id) != 0) {
4086                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4087                 talloc_free(h);
4088                 return NULL;
4089         }
4090
4091         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4092
4093         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4094                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4095                 talloc_free(h);
4096                 return NULL;
4097         }
4098
4099         talloc_set_destructor(h, ctdb_transaction_destructor);
4100         return h;
4101 }
4102
4103 /**
4104  * fetch a record inside a transaction
4105  */
4106 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4107                            TALLOC_CTX *mem_ctx,
4108                            TDB_DATA key, TDB_DATA *data)
4109 {
4110         struct ctdb_ltdb_header header;
4111         int ret;
4112
4113         ZERO_STRUCT(header);
4114
4115         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4116         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4117                 /* record doesn't exist yet */
4118                 *data = tdb_null;
4119                 ret = 0;
4120         }
4121
4122         if (ret != 0) {
4123                 return ret;
4124         }
4125
4126         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4127         if (h->m_all == NULL) {
4128                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4129                 return -1;
4130         }
4131
4132         return 0;
4133 }
4134
4135 /**
4136  * stores a record inside a transaction
4137  */
4138 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4139                            TDB_DATA key, TDB_DATA data)
4140 {
4141         TALLOC_CTX *tmp_ctx = talloc_new(h);
4142         struct ctdb_ltdb_header header;
4143         TDB_DATA olddata;
4144         int ret;
4145
4146         /* we need the header so we can update the RSN */
4147         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4148         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4149                 /* the record doesn't exist - create one with us as dmaster.
4150                    This is only safe because we are in a transaction and this
4151                    is a persistent database */
4152                 ZERO_STRUCT(header);
4153         } else if (ret != 0) {
4154                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4155                 talloc_free(tmp_ctx);
4156                 return ret;
4157         }
4158
4159         if (data.dsize == olddata.dsize &&
4160             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4161             header.rsn != 0) {
4162                 /* save writing the same data */
4163                 talloc_free(tmp_ctx);
4164                 return 0;
4165         }
4166
4167         header.dmaster = h->ctdb_db->ctdb->pnn;
4168         header.rsn++;
4169
4170         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4171         if (h->m_all == NULL) {
4172                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4173                 talloc_free(tmp_ctx);
4174                 return -1;
4175         }
4176
4177         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4178         if (h->m_write == NULL) {
4179                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4180                 talloc_free(tmp_ctx);
4181                 return -1;
4182         }
4183
4184         talloc_free(tmp_ctx);
4185         return 0;
4186 }
4187
4188 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4189 {
4190         const char *keyname = CTDB_DB_SEQNUM_KEY;
4191         TDB_DATA key, data;
4192         struct ctdb_ltdb_header header;
4193         int ret;
4194
4195         key.dptr = (uint8_t *)discard_const(keyname);
4196         key.dsize = strlen(keyname) + 1;
4197
4198         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4199         if (ret != 0) {
4200                 *seqnum = 0;
4201                 return 0;
4202         }
4203
4204         if (data.dsize == 0) {
4205                 *seqnum = 0;
4206                 return 0;
4207         }
4208
4209         if (data.dsize != sizeof(*seqnum)) {
4210                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4211                                   data.dsize));
4212                 talloc_free(data.dptr);
4213                 return -1;
4214         }
4215
4216         *seqnum = *(uint64_t *)data.dptr;
4217         talloc_free(data.dptr);
4218
4219         return 0;
4220 }
4221
4222
4223 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4224                                 uint64_t seqnum)
4225 {
4226         const char *keyname = CTDB_DB_SEQNUM_KEY;
4227         TDB_DATA key, data;
4228
4229         key.dptr = (uint8_t *)discard_const(keyname);
4230         key.dsize = strlen(keyname) + 1;
4231
4232         data.dptr = (uint8_t *)&seqnum;
4233         data.dsize = sizeof(seqnum);
4234
4235         return ctdb_transaction_store(h, key, data);
4236 }
4237
4238
4239 /**
4240  * commit a transaction
4241  */
4242 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4243 {
4244         int ret;
4245         uint64_t old_seqnum, new_seqnum;
4246         int32_t status;
4247         struct timeval timeout;
4248
4249         if (h->m_write == NULL) {
4250                 /* no changes were made */
4251                 talloc_free(h);
4252                 return 0;
4253         }
4254
4255         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4256         if (ret != 0) {
4257                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4258                 ret = -1;
4259                 goto done;
4260         }
4261
4262         new_seqnum = old_seqnum + 1;
4263         ret = ctdb_store_db_seqnum(h, new_seqnum);
4264         if (ret != 0) {
4265                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4266                 ret = -1;
4267                 goto done;
4268         }
4269
4270 again:
4271         timeout = timeval_current_ofs(3,0);
4272         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4273                            h->ctdb_db->db_id,
4274                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4275                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4276                            &status, &timeout, NULL);
4277         if (ret != 0 || status != 0) {
4278                 /*
4279                  * TRANS3_COMMIT control will only fail if recovery has been
4280                  * triggered.  Check if the database has been updated or not.
4281                  */
4282                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4283                 if (ret != 0) {
4284                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4285                         goto done;
4286                 }
4287
4288                 if (new_seqnum == old_seqnum) {
4289                         /* Database not yet updated, try again */
4290                         goto again;
4291                 }
4292
4293                 if (new_seqnum != (old_seqnum + 1)) {
4294                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4295                                           (long long unsigned)new_seqnum,
4296                                           (long long unsigned)old_seqnum));
4297                         ret = -1;
4298                         goto done;
4299                 }
4300         }
4301
4302         ret = 0;
4303
4304 done:
4305         talloc_free(h);
4306         return ret;
4307 }
4308
4309 /**
4310  * cancel a transaction
4311  */
4312 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4313 {
4314         talloc_free(h);
4315         return 0;
4316 }
4317
4318
4319 /*
4320   recovery daemon ping to main daemon
4321  */
4322 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4323 {
4324         int ret;
4325         int32_t res;
4326
4327         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4328                            ctdb, NULL, &res, NULL, NULL);
4329         if (ret != 0 || res != 0) {
4330                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4331                 return -1;
4332         }
4333
4334         return 0;
4335 }
4336
4337 /* When forking the main daemon and the child process needs to connect
4338  * back to the daemon as a client process, this function can be used
4339  * to change the ctdb context from daemon into client mode.  The child
4340  * process must be created using ctdb_fork() and not fork() -
4341  * ctdb_fork() does some necessary housekeeping.
4342  */
4343 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4344 {
4345         int ret;
4346         va_list ap;
4347
4348         /* Add extra information so we can identify this in the logs */
4349         va_start(ap, fmt);
4350         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4351         va_end(ap);
4352
4353         /* get a new event context */
4354         ctdb->ev = event_context_init(ctdb);
4355         tevent_loop_allow_nesting(ctdb->ev);
4356
4357         /* Connect to main CTDB daemon */
4358         ret = ctdb_socket_connect(ctdb);
4359         if (ret != 0) {
4360                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4361                 return -1;
4362         }
4363
4364         ctdb->can_send_controls = true;
4365
4366         return 0;
4367 }
4368
4369 /*
4370   get the status of running the monitor eventscripts: NULL means never run.
4371  */
4372 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4373                 struct timeval timeout, uint32_t destnode, 
4374                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4375                 struct ctdb_scripts_wire **scripts)
4376 {
4377         int ret;
4378         TDB_DATA outdata, indata;
4379         int32_t res;
4380         uint32_t uinttype = type;
4381
4382         indata.dptr = (uint8_t *)&uinttype;
4383         indata.dsize = sizeof(uinttype);
4384
4385         ret = ctdb_control(ctdb, destnode, 0, 
4386                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4387                            mem_ctx, &outdata, &res, &timeout, NULL);
4388         if (ret != 0 || res != 0) {
4389                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4390                 return -1;
4391         }
4392
4393         if (outdata.dsize == 0) {
4394                 *scripts = NULL;
4395         } else {
4396                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4397                 talloc_free(outdata.dptr);
4398         }
4399                     
4400         return 0;
4401 }
4402
4403 /*
4404   tell the main daemon how long it took to lock the reclock file
4405  */
4406 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4407 {
4408         int ret;
4409         int32_t res;
4410         TDB_DATA data;
4411
4412         data.dptr = (uint8_t *)&latency;
4413         data.dsize = sizeof(latency);
4414
4415         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4416                            ctdb, NULL, &res, NULL, NULL);
4417         if (ret != 0 || res != 0) {
4418                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4419                 return -1;
4420         }
4421
4422         return 0;
4423 }
4424
4425 /*
4426   get the name of the reclock file
4427  */
4428 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4429                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4430                          const char **name)
4431 {
4432         int ret;
4433         int32_t res;
4434         TDB_DATA data;
4435
4436         ret = ctdb_control(ctdb, destnode, 0, 
4437                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4438                            mem_ctx, &data, &res, &timeout, NULL);
4439         if (ret != 0 || res != 0) {
4440                 return -1;
4441         }
4442
4443         if (data.dsize == 0) {
4444                 *name = NULL;
4445         } else {
4446                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4447         }
4448         talloc_free(data.dptr);
4449
4450         return 0;
4451 }
4452
4453 /*
4454   set the reclock filename for a node
4455  */
4456 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4457 {
4458         int ret;
4459         TDB_DATA data;
4460         int32_t res;
4461
4462         if (reclock == NULL) {
4463                 data.dsize = 0;
4464                 data.dptr  = NULL;
4465         } else {
4466                 data.dsize = strlen(reclock) + 1;
4467                 data.dptr  = discard_const(reclock);
4468         }
4469
4470         ret = ctdb_control(ctdb, destnode, 0, 
4471                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4472                            NULL, NULL, &res, &timeout, NULL);
4473         if (ret != 0 || res != 0) {
4474                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4475                 return -1;
4476         }
4477
4478         return 0;
4479 }
4480
4481 /*
4482   stop a node
4483  */
4484 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4485 {
4486         int ret;
4487         int32_t res;
4488
4489         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4490                            ctdb, NULL, &res, &timeout, NULL);
4491         if (ret != 0 || res != 0) {
4492                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4493                 return -1;
4494         }
4495
4496         return 0;
4497 }
4498
4499 /*
4500   continue a node
4501  */
4502 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4503 {
4504         int ret;
4505
4506         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4507                            ctdb, NULL, NULL, &timeout, NULL);
4508         if (ret != 0) {
4509                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4510                 return -1;
4511         }
4512
4513         return 0;
4514 }
4515
4516 /*
4517   set the natgw state for a node
4518  */
4519 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4520 {
4521         int ret;
4522         TDB_DATA data;
4523         int32_t res;
4524
4525         data.dsize = sizeof(natgwstate);
4526         data.dptr  = (uint8_t *)&natgwstate;
4527
4528         ret = ctdb_control(ctdb, destnode, 0, 
4529                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4530                            NULL, NULL, &res, &timeout, NULL);
4531         if (ret != 0 || res != 0) {
4532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4533                 return -1;
4534         }
4535
4536         return 0;
4537 }
4538
4539 /*
4540   set the lmaster role for a node
4541  */
4542 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4543 {
4544         int ret;
4545         TDB_DATA data;
4546         int32_t res;
4547
4548         data.dsize = sizeof(lmasterrole);
4549         data.dptr  = (uint8_t *)&lmasterrole;
4550
4551         ret = ctdb_control(ctdb, destnode, 0, 
4552                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4553                            NULL, NULL, &res, &timeout, NULL);
4554         if (ret != 0 || res != 0) {
4555                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4556                 return -1;
4557         }
4558
4559         return 0;
4560 }
4561
4562 /*
4563   set the recmaster role for a node
4564  */
4565 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4566 {
4567         int ret;
4568         TDB_DATA data;
4569         int32_t res;
4570
4571         data.dsize = sizeof(recmasterrole);
4572         data.dptr  = (uint8_t *)&recmasterrole;
4573
4574         ret = ctdb_control(ctdb, destnode, 0, 
4575                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4576                            NULL, NULL, &res, &timeout, NULL);
4577         if (ret != 0 || res != 0) {
4578                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4579                 return -1;
4580         }
4581
4582         return 0;
4583 }
4584
4585 /* enable an eventscript
4586  */
4587 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4588 {
4589         int ret;
4590         TDB_DATA data;
4591         int32_t res;
4592
4593         data.dsize = strlen(script) + 1;
4594         data.dptr  = discard_const(script);
4595
4596         ret = ctdb_control(ctdb, destnode, 0, 
4597                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4598                            NULL, NULL, &res, &timeout, NULL);
4599         if (ret != 0 || res != 0) {
4600                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4601                 return -1;
4602         }
4603
4604         return 0;
4605 }
4606
4607 /* disable an eventscript
4608  */
4609 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4610 {
4611         int ret;
4612         TDB_DATA data;
4613         int32_t res;
4614
4615         data.dsize = strlen(script) + 1;
4616         data.dptr  = discard_const(script);
4617
4618         ret = ctdb_control(ctdb, destnode, 0, 
4619                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4620                            NULL, NULL, &res, &timeout, NULL);
4621         if (ret != 0 || res != 0) {
4622                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4623                 return -1;
4624         }
4625
4626         return 0;
4627 }
4628
4629
4630 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4631 {
4632         int ret;
4633         TDB_DATA data;
4634         int32_t res;
4635
4636         data.dsize = sizeof(*bantime);
4637         data.dptr  = (uint8_t *)bantime;
4638
4639         ret = ctdb_control(ctdb, destnode, 0, 
4640                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4641                            NULL, NULL, &res, &timeout, NULL);
4642         if (ret != 0 || res != 0) {
4643                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4644                 return -1;
4645         }
4646
4647         return 0;
4648 }
4649
4650
4651 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4652 {
4653         int ret;
4654         TDB_DATA outdata;
4655         int32_t res;
4656         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4657
4658         ret = ctdb_control(ctdb, destnode, 0, 
4659                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4660                            tmp_ctx, &outdata, &res, &timeout, NULL);
4661         if (ret != 0 || res != 0) {
4662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4663                 talloc_free(tmp_ctx);
4664                 return -1;
4665         }
4666
4667         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4668         talloc_free(tmp_ctx);
4669
4670         return 0;
4671 }
4672
4673
4674 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4675 {
4676         int ret;
4677         int32_t res;
4678         TDB_DATA data;
4679         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4680
4681         data.dptr = (uint8_t*)db_prio;
4682         data.dsize = sizeof(*db_prio);
4683
4684         ret = ctdb_control(ctdb, destnode, 0, 
4685                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4686                            tmp_ctx, NULL, &res, &timeout, NULL);
4687         if (ret != 0 || res != 0) {
4688                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4689                 talloc_free(tmp_ctx);
4690                 return -1;
4691         }
4692
4693         talloc_free(tmp_ctx);
4694
4695         return 0;
4696 }
4697
4698 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4699 {
4700         int ret;
4701         int32_t res;
4702         TDB_DATA data;
4703         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4704
4705         data.dptr = (uint8_t*)&db_id;
4706         data.dsize = sizeof(db_id);
4707
4708         ret = ctdb_control(ctdb, destnode, 0, 
4709                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4710                            tmp_ctx, NULL, &res, &timeout, NULL);
4711         if (ret != 0 || res < 0) {
4712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4713                 talloc_free(tmp_ctx);
4714                 return -1;
4715         }
4716
4717         if (priority) {
4718                 *priority = res;
4719         }
4720
4721         talloc_free(tmp_ctx);
4722
4723         return 0;
4724 }
4725
4726 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4727 {
4728         int ret;
4729         TDB_DATA outdata;
4730         int32_t res;
4731
4732         ret = ctdb_control(ctdb, destnode, 0, 
4733                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4734                            mem_ctx, &outdata, &res, &timeout, NULL);
4735         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4736                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4737                 return -1;
4738         }
4739
4740         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4741         talloc_free(outdata.dptr);
4742                     
4743         return 0;
4744 }
4745
4746 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4747 {
4748         if (h == NULL) {
4749                 return NULL;
4750         }
4751
4752         return &h->header;
4753 }
4754
4755
4756 struct ctdb_client_control_state *
4757 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4758 {
4759         struct ctdb_client_control_state *handle;
4760         struct ctdb_marshall_buffer *m;
4761         struct ctdb_rec_data *rec;
4762         TDB_DATA outdata;
4763
4764         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4765         if (m == NULL) {
4766                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4767                 return NULL;
4768         }
4769
4770         m->db_id = ctdb_db->db_id;
4771
4772         rec = ctdb_marshall_record(m, 0, key, header, data);
4773         if (rec == NULL) {
4774                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4775                 talloc_free(m);
4776                 return NULL;
4777         }
4778         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4779         if (m == NULL) {
4780                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4781                 talloc_free(m);
4782                 return NULL;
4783         }
4784         m->count++;
4785         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4786
4787
4788         outdata.dptr = (uint8_t *)m;
4789         outdata.dsize = talloc_get_size(m);
4790
4791         handle = ctdb_control_send(ctdb, destnode, 0, 
4792                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4793                            mem_ctx, &timeout, NULL);
4794         talloc_free(m);
4795         return handle;
4796 }
4797
4798 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4799 {
4800         int ret;
4801         int32_t res;
4802
4803         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4804         if ( (ret != 0) || (res != 0) ){
4805                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4806                 return -1;
4807         }
4808
4809         return 0;
4810 }
4811
4812 int
4813 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4814 {
4815         struct ctdb_client_control_state *state;
4816
4817         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4818         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4819 }
4820
4821
4822
4823
4824
4825
4826 /*
4827   set a database to be readonly
4828  */
4829 struct ctdb_client_control_state *
4830 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4831 {
4832         TDB_DATA data;
4833
4834         data.dptr = (uint8_t *)&dbid;
4835         data.dsize = sizeof(dbid);
4836
4837         return ctdb_control_send(ctdb, destnode, 0, 
4838                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4839                            ctdb, NULL, NULL);
4840 }
4841
4842 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4843 {
4844         int ret;
4845         int32_t res;
4846
4847         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4848         if (ret != 0 || res != 0) {
4849           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4850                 return -1;
4851         }
4852
4853         return 0;
4854 }
4855
4856 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4857 {
4858         struct ctdb_client_control_state *state;
4859
4860         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4861         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4862 }
4863
4864 /*
4865   set a database to be sticky
4866  */
4867 struct ctdb_client_control_state *
4868 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4869 {
4870         TDB_DATA data;
4871
4872         data.dptr = (uint8_t *)&dbid;
4873         data.dsize = sizeof(dbid);
4874
4875         return ctdb_control_send(ctdb, destnode, 0, 
4876                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4877                            ctdb, NULL, NULL);
4878 }
4879
4880 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4881 {
4882         int ret;
4883         int32_t res;
4884
4885         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4886         if (ret != 0 || res != 0) {
4887                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4888                 return -1;
4889         }
4890
4891         return 0;
4892 }
4893
4894 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4895 {
4896         struct ctdb_client_control_state *state;
4897
4898         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4899         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4900 }