ctdb-daemon: Support per-node robust mutex feature
[sfrench/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 /*
33   allocate a packet for use in client<->daemon communication
34  */
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36                                             TALLOC_CTX *mem_ctx, 
37                                             enum ctdb_operation operation, 
38                                             size_t length, size_t slength,
39                                             const char *type)
40 {
41         int size;
42         struct ctdb_req_header *hdr;
43
44         length = MAX(length, slength);
45         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46
47         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
48         if (hdr == NULL) {
49                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50                          operation, (unsigned)length));
51                 return NULL;
52         }
53         talloc_set_name_const(hdr, type);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_VERSION;
58         hdr->srcnode      = ctdb->pnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, bool updatetdb)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88         c->header = header;
89
90         for (fn=ctdb_db->calls;fn;fn=fn->next) {
91                 if (fn->id == call->call_id) break;
92         }
93         if (fn == NULL) {
94                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
95                 talloc_free(c);
96                 return -1;
97         }
98
99         if (fn->fn(c) != 0) {
100                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         /* we need to force the record to be written out if this was a remote access */
106         if (c->new_data == NULL) {
107                 c->new_data = &c->record_data;
108         }
109
110         if (c->new_data && updatetdb) {
111                 /* XXX check that we always have the lock here? */
112                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
113                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
114                         talloc_free(c);
115                         return -1;
116                 }
117         }
118
119         if (c->reply_data) {
120                 call->reply_data = *c->reply_data;
121
122                 talloc_steal(call, call->reply_data.dptr);
123                 talloc_set_name_const(call->reply_data.dptr, __location__);
124         } else {
125                 call->reply_data.dptr = NULL;
126                 call->reply_data.dsize = 0;
127         }
128         call->status = c->status;
129
130         talloc_free(c);
131
132         return 0;
133 }
134
135
136 /*
137   queue a packet for sending from client to daemon
138 */
139 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
140 {
141         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
142 }
143
144
145 /*
146   called when a CTDB_REPLY_CALL packet comes in in the client
147
148   This packet comes in response to a CTDB_REQ_CALL request packet. It
149   contains any reply data from the call
150 */
151 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
152 {
153         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
154         struct ctdb_client_call_state *state;
155
156         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
157         if (state == NULL) {
158                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
159                 return;
160         }
161
162         if (hdr->reqid != state->reqid) {
163                 /* we found a record  but it was the wrong one */
164                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
165                 return;
166         }
167
168         state->call->reply_data.dptr = c->data;
169         state->call->reply_data.dsize = c->datalen;
170         state->call->status = c->status;
171
172         talloc_steal(state, c);
173
174         state->state = CTDB_CALL_DONE;
175
176         if (state->async.fn) {
177                 state->async.fn(state);
178         }
179 }
180
181 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
182
183 /*
184   this is called in the client, when data comes in from the daemon
185  */
186 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
187 {
188         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
189         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
190         TALLOC_CTX *tmp_ctx;
191
192         /* place the packet as a child of a tmp_ctx. We then use
193            talloc_free() below to free it. If any of the calls want
194            to keep it, then they will steal it somewhere else, and the
195            talloc_free() will be a no-op */
196         tmp_ctx = talloc_new(ctdb);
197         talloc_steal(tmp_ctx, hdr);
198
199         if (cnt == 0) {
200                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
201                 exit(1);
202         }
203
204         if (cnt < sizeof(*hdr)) {
205                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
206                 goto done;
207         }
208         if (cnt != hdr->length) {
209                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
210                                (unsigned)hdr->length, (unsigned)cnt);
211                 goto done;
212         }
213
214         if (hdr->ctdb_magic != CTDB_MAGIC) {
215                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
216                 goto done;
217         }
218
219         if (hdr->ctdb_version != CTDB_VERSION) {
220                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
221                 goto done;
222         }
223
224         switch (hdr->operation) {
225         case CTDB_REPLY_CALL:
226                 ctdb_client_reply_call(ctdb, hdr);
227                 break;
228
229         case CTDB_REQ_MESSAGE:
230                 ctdb_request_message(ctdb, hdr);
231                 break;
232
233         case CTDB_REPLY_CONTROL:
234                 ctdb_client_reply_control(ctdb, hdr);
235                 break;
236
237         default:
238                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
239         }
240
241 done:
242         talloc_free(tmp_ctx);
243 }
244
245 /*
246   connect to a unix domain socket
247 */
248 int ctdb_socket_connect(struct ctdb_context *ctdb)
249 {
250         struct sockaddr_un addr;
251
252         memset(&addr, 0, sizeof(addr));
253         addr.sun_family = AF_UNIX;
254         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
255
256         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
257         if (ctdb->daemon.sd == -1) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
259                 return -1;
260         }
261
262         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
263                 close(ctdb->daemon.sd);
264                 ctdb->daemon.sd = -1;
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
266                 return -1;
267         }
268
269         set_nonblocking(ctdb->daemon.sd);
270         set_close_on_exec(ctdb->daemon.sd);
271         
272         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
273                                               CTDB_DS_ALIGNMENT, 
274                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
275         return 0;
276 }
277
278
279 struct ctdb_record_handle {
280         struct ctdb_db_context *ctdb_db;
281         TDB_DATA key;
282         TDB_DATA *data;
283         struct ctdb_ltdb_header header;
284 };
285
286
287 /*
288   make a recv call to the local ctdb daemon - called from client context
289
290   This is called when the program wants to wait for a ctdb_call to complete and get the 
291   results. This call will block unless the call has already completed.
292 */
293 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
294 {
295         if (state == NULL) {
296                 return -1;
297         }
298
299         while (state->state < CTDB_CALL_DONE) {
300                 event_loop_once(state->ctdb_db->ctdb->ev);
301         }
302         if (state->state != CTDB_CALL_DONE) {
303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
304                 talloc_free(state);
305                 return -1;
306         }
307
308         if (state->call->reply_data.dsize) {
309                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
310                                                       state->call->reply_data.dptr,
311                                                       state->call->reply_data.dsize);
312                 call->reply_data.dsize = state->call->reply_data.dsize;
313         } else {
314                 call->reply_data.dptr = NULL;
315                 call->reply_data.dsize = 0;
316         }
317         call->status = state->call->status;
318         talloc_free(state);
319
320         return call->status;
321 }
322
323
324
325
326 /*
327   destroy a ctdb_call in client
328 */
329 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
330 {
331         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
332         return 0;
333 }
334
335 /*
336   construct an event driven local ctdb_call
337
338   this is used so that locally processed ctdb_call requests are processed
339   in an event driven manner
340 */
341 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
342                                                                   struct ctdb_call *call,
343                                                                   struct ctdb_ltdb_header *header,
344                                                                   TDB_DATA *data)
345 {
346         struct ctdb_client_call_state *state;
347         struct ctdb_context *ctdb = ctdb_db->ctdb;
348         int ret;
349
350         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
351         CTDB_NO_MEMORY_NULL(ctdb, state);
352         state->call = talloc_zero(state, struct ctdb_call);
353         CTDB_NO_MEMORY_NULL(ctdb, state->call);
354
355         talloc_steal(state, data->dptr);
356
357         state->state   = CTDB_CALL_DONE;
358         *(state->call) = *call;
359         state->ctdb_db = ctdb_db;
360
361         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
362         if (ret != 0) {
363                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
364         }
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                               struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
400                 ret = -1;
401         }
402
403         if (ret == 0 && header.dmaster == ctdb->pnn) {
404                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
405                 talloc_free(data.dptr);
406                 ctdb_ltdb_unlock(ctdb_db, call->key);
407                 return state;
408         }
409
410         ctdb_ltdb_unlock(ctdb_db, call->key);
411         talloc_free(data.dptr);
412
413         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
414         if (state == NULL) {
415                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
416                 return NULL;
417         }
418         state->call = talloc_zero(state, struct ctdb_call);
419         if (state->call == NULL) {
420                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
421                 return NULL;
422         }
423
424         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
425         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
426         if (c == NULL) {
427                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
428                 return NULL;
429         }
430
431         state->reqid     = ctdb_reqid_new(ctdb, state);
432         state->ctdb_db = ctdb_db;
433         talloc_set_destructor(state, ctdb_client_call_destructor);
434
435         c->hdr.reqid     = state->reqid;
436         c->flags         = call->flags;
437         c->db_id         = ctdb_db->db_id;
438         c->callid        = call->call_id;
439         c->hopcount      = 0;
440         c->keylen        = call->key.dsize;
441         c->calldatalen   = call->call_data.dsize;
442         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
443         memcpy(&c->data[call->key.dsize], 
444                call->call_data.dptr, call->call_data.dsize);
445         *(state->call)              = *call;
446         state->call->call_data.dptr = &c->data[call->key.dsize];
447         state->call->key.dptr       = &c->data[0];
448
449         state->state  = CTDB_CALL_WAIT;
450
451
452         ctdb_client_queue_pkt(ctdb, &c->hdr);
453
454         return state;
455 }
456
457
458 /*
459   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
460 */
461 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
462 {
463         struct ctdb_client_call_state *state;
464
465         state = ctdb_call_send(ctdb_db, call);
466         return ctdb_call_recv(state, call);
467 }
468
469
470 /*
471   tell the daemon what messaging srvid we will use, and register the message
472   handler function in the client
473 */
474 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
475                              ctdb_msg_fn_t handler,
476                              void *private_data)
477 {
478         int res;
479         int32_t status;
480
481         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
482                            tdb_null, NULL, NULL, &status, NULL, NULL);
483         if (res != 0 || status != 0) {
484                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
485                 return -1;
486         }
487
488         /* also need to register the handler with our own ctdb structure */
489         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
490 }
491
492 /*
493   tell the daemon we no longer want a srvid
494 */
495 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 {
497         int res;
498         int32_t status;
499
500         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
504                 return -1;
505         }
506
507         /* also need to register the handler with our own ctdb structure */
508         ctdb_deregister_message_handler(ctdb, srvid, private_data);
509         return 0;
510 }
511
512 /*
513  * check server ids
514  */
515 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
516                                        uint8_t *result)
517 {
518         TDB_DATA indata, outdata;
519         int res;
520         int32_t status;
521         int i;
522
523         indata.dptr = (uint8_t *)ids;
524         indata.dsize = num * sizeof(*ids);
525
526         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
527                            indata, ctdb, &outdata, &status, NULL, NULL);
528         if (res != 0 || status != 0) {
529                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
530                 return -1;
531         }
532
533         if (outdata.dsize != num*sizeof(uint8_t)) {
534                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
535                                   (long unsigned int)num*sizeof(uint8_t),
536                                   outdata.dsize));
537                 talloc_free(outdata.dptr);
538                 return -1;
539         }
540
541         for (i=0; i<num; i++) {
542                 result[i] = outdata.dptr[i];
543         }
544
545         talloc_free(outdata.dptr);
546         return 0;
547 }
548
549 /*
550   send a message - from client context
551  */
552 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
553                       uint64_t srvid, TDB_DATA data)
554 {
555         struct ctdb_req_message *r;
556         int len, res;
557
558         len = offsetof(struct ctdb_req_message, data) + data.dsize;
559         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
560                                len, struct ctdb_req_message);
561         CTDB_NO_MEMORY(ctdb, r);
562
563         r->hdr.destnode  = pnn;
564         r->srvid         = srvid;
565         r->datalen       = data.dsize;
566         memcpy(&r->data[0], data.dptr, data.dsize);
567         
568         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
569         talloc_free(r);
570         return res;
571 }
572
573
574 /*
575   cancel a ctdb_fetch_lock operation, releasing the lock
576  */
577 static int fetch_lock_destructor(struct ctdb_record_handle *h)
578 {
579         ctdb_ltdb_unlock(h->ctdb_db, h->key);
580         return 0;
581 }
582
583 /*
584   force the migration of a record to this node
585  */
586 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
587 {
588         struct ctdb_call call;
589         ZERO_STRUCT(call);
590         call.call_id = CTDB_NULL_FUNC;
591         call.key = key;
592         call.flags = CTDB_IMMEDIATE_MIGRATION;
593         return ctdb_call(ctdb_db, &call);
594 }
595
596 /*
597   try to fetch a readonly copy of a record
598  */
599 static int
600 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
601 {
602         int ret;
603
604         struct ctdb_call call;
605         ZERO_STRUCT(call);
606
607         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
608         call.call_data.dptr = NULL;
609         call.call_data.dsize = 0;
610         call.key = key;
611         call.flags = CTDB_WANT_READONLY;
612         ret = ctdb_call(ctdb_db, &call);
613
614         if (ret != 0) {
615                 return -1;
616         }
617         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
618                 return -1;
619         }
620
621         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
622         if (*hdr == NULL) {
623                 talloc_free(call.reply_data.dptr);
624                 return -1;
625         }
626
627         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
628         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
629         if (data->dptr == NULL) {
630                 talloc_free(call.reply_data.dptr);
631                 talloc_free(hdr);
632                 return -1;
633         }
634
635         return 0;
636 }
637
638 /*
639   get a lock on a record, and return the records data. Blocks until it gets the lock
640  */
641 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
642                                            TDB_DATA key, TDB_DATA *data)
643 {
644         int ret;
645         struct ctdb_record_handle *h;
646
647         /*
648           procedure is as follows:
649
650           1) get the chain lock. 
651           2) check if we are dmaster
652           3) if we are the dmaster then return handle 
653           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
654              reply from ctdbd
655           5) when we get the reply, goto (1)
656          */
657
658         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
659         if (h == NULL) {
660                 return NULL;
661         }
662
663         h->ctdb_db = ctdb_db;
664         h->key     = key;
665         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
666         if (h->key.dptr == NULL) {
667                 talloc_free(h);
668                 return NULL;
669         }
670         h->data    = data;
671
672         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
673                  (const char *)key.dptr));
674
675 again:
676         /* step 1 - get the chain lock */
677         ret = ctdb_ltdb_lock(ctdb_db, key);
678         if (ret != 0) {
679                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
680                 talloc_free(h);
681                 return NULL;
682         }
683
684         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
685
686         talloc_set_destructor(h, fetch_lock_destructor);
687
688         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
689
690         /* when torturing, ensure we test the remote path */
691         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
692             random() % 5 == 0) {
693                 h->header.dmaster = (uint32_t)-1;
694         }
695
696
697         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
698
699         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
700                 ctdb_ltdb_unlock(ctdb_db, key);
701                 ret = ctdb_client_force_migration(ctdb_db, key);
702                 if (ret != 0) {
703                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
704                         talloc_free(h);
705                         return NULL;
706                 }
707                 goto again;
708         }
709
710         /* if this is a request for read/write and we have delegations
711            we have to revoke all delegations first
712         */
713         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
714             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
715                 ctdb_ltdb_unlock(ctdb_db, key);
716                 ret = ctdb_client_force_migration(ctdb_db, key);
717                 if (ret != 0) {
718                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
719                         talloc_free(h);
720                         return NULL;
721                 }
722                 goto again;
723         }
724
725         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
726         return h;
727 }
728
729 /*
730   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
731  */
732 struct ctdb_record_handle *
733 ctdb_fetch_readonly_lock(
734         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
735         TDB_DATA key, TDB_DATA *data,
736         int read_only)
737 {
738         int ret;
739         struct ctdb_record_handle *h;
740         struct ctdb_ltdb_header *roheader = NULL;
741
742         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
743         if (h == NULL) {
744                 return NULL;
745         }
746
747         h->ctdb_db = ctdb_db;
748         h->key     = key;
749         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
750         if (h->key.dptr == NULL) {
751                 talloc_free(h);
752                 return NULL;
753         }
754         h->data    = data;
755
756         data->dptr = NULL;
757         data->dsize = 0;
758
759
760 again:
761         talloc_free(roheader);
762         roheader = NULL;
763
764         talloc_free(data->dptr);
765         data->dptr = NULL;
766         data->dsize = 0;
767
768         /* Lock the record/chain */
769         ret = ctdb_ltdb_lock(ctdb_db, key);
770         if (ret != 0) {
771                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
772                 talloc_free(h);
773                 return NULL;
774         }
775
776         talloc_set_destructor(h, fetch_lock_destructor);
777
778         /* Check if record exists yet in the TDB */
779         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
780         if (ret != 0) {
781                 ctdb_ltdb_unlock(ctdb_db, key);
782                 ret = ctdb_client_force_migration(ctdb_db, key);
783                 if (ret != 0) {
784                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
785                         talloc_free(h);
786                         return NULL;
787                 }
788                 goto again;
789         }
790
791         /* if this is a request for read/write and we have delegations
792            we have to revoke all delegations first
793         */
794         if ((read_only == 0) 
795         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
796         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
797                 ctdb_ltdb_unlock(ctdb_db, key);
798                 ret = ctdb_client_force_migration(ctdb_db, key);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
801                         talloc_free(h);
802                         return NULL;
803                 }
804                 goto again;
805         }
806
807         /* if we are dmaster, just return the handle */
808         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
809                 return h;
810         }
811
812         if (read_only != 0) {
813                 TDB_DATA rodata = {NULL, 0};
814
815                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
816                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
817                         return h;
818                 }
819
820                 ctdb_ltdb_unlock(ctdb_db, key);
821                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
822                 if (ret != 0) {
823                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
824                         ret = ctdb_client_force_migration(ctdb_db, key);
825                         if (ret != 0) {
826                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
827                                 talloc_free(h);
828                                 return NULL;
829                         }
830
831                         goto again;
832                 }
833
834                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
835                         ret = ctdb_client_force_migration(ctdb_db, key);
836                         if (ret != 0) {
837                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                                 talloc_free(h);
839                                 return NULL;
840                         }
841
842                         goto again;
843                 }
844
845                 ret = ctdb_ltdb_lock(ctdb_db, key);
846                 if (ret != 0) {
847                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
848                         talloc_free(h);
849                         return NULL;
850                 }
851
852                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
853                 if (ret != 0) {
854                         ctdb_ltdb_unlock(ctdb_db, key);
855
856                         ret = ctdb_client_force_migration(ctdb_db, key);
857                         if (ret != 0) {
858                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
859                                 talloc_free(h);
860                                 return NULL;
861                         }
862
863                         goto again;
864                 }
865
866                 return h;
867         }
868
869         /* we are not dmaster and this was not a request for a readonly lock
870          * so unlock the record, migrate it and try again
871          */
872         ctdb_ltdb_unlock(ctdb_db, key);
873         ret = ctdb_client_force_migration(ctdb_db, key);
874         if (ret != 0) {
875                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
876                 talloc_free(h);
877                 return NULL;
878         }
879         goto again;
880 }
881
882 /*
883   store some data to the record that was locked with ctdb_fetch_lock()
884 */
885 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
886 {
887         if (h->ctdb_db->persistent) {
888                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
889                 return -1;
890         }
891
892         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
893 }
894
895 /*
896   non-locking fetch of a record
897  */
898 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
899                TDB_DATA key, TDB_DATA *data)
900 {
901         struct ctdb_call call;
902         int ret;
903
904         call.call_id = CTDB_FETCH_FUNC;
905         call.call_data.dptr = NULL;
906         call.call_data.dsize = 0;
907         call.key = key;
908
909         ret = ctdb_call(ctdb_db, &call);
910
911         if (ret == 0) {
912                 *data = call.reply_data;
913                 talloc_steal(mem_ctx, data->dptr);
914         }
915
916         return ret;
917 }
918
919
920
921 /*
922    called when a control completes or timesout to invoke the callback
923    function the user provided
924 */
925 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
926         struct timeval t, void *private_data)
927 {
928         struct ctdb_client_control_state *state;
929         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
930         int ret;
931
932         state = talloc_get_type(private_data, struct ctdb_client_control_state);
933         talloc_steal(tmp_ctx, state);
934
935         ret = ctdb_control_recv(state->ctdb, state, state,
936                         NULL, 
937                         NULL, 
938                         NULL);
939         if (ret != 0) {
940                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
941         }
942
943         talloc_free(tmp_ctx);
944 }
945
946 /*
947   called when a CTDB_REPLY_CONTROL packet comes in in the client
948
949   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
950   contains any reply data from the control
951 */
952 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
953                                       struct ctdb_req_header *hdr)
954 {
955         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
956         struct ctdb_client_control_state *state;
957
958         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
959         if (state == NULL) {
960                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
961                 return;
962         }
963
964         if (hdr->reqid != state->reqid) {
965                 /* we found a record  but it was the wrong one */
966                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
967                 return;
968         }
969
970         state->outdata.dptr = c->data;
971         state->outdata.dsize = c->datalen;
972         state->status = c->status;
973         if (c->errorlen) {
974                 state->errormsg = talloc_strndup(state, 
975                                                  (char *)&c->data[c->datalen], 
976                                                  c->errorlen);
977         }
978
979         /* state->outdata now uses resources from c so we dont want c
980            to just dissappear from under us while state is still alive
981         */
982         talloc_steal(state, c);
983
984         state->state = CTDB_CONTROL_DONE;
985
986         /* if we had a callback registered for this control, pull the response
987            and call the callback.
988         */
989         if (state->async.fn) {
990                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
991         }
992 }
993
994
995 /*
996   destroy a ctdb_control in client
997 */
998 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
999 {
1000         ctdb_reqid_remove(state->ctdb, state->reqid);
1001         return 0;
1002 }
1003
1004
1005 /* time out handler for ctdb_control */
1006 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1007         struct timeval t, void *private_data)
1008 {
1009         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1010
1011         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1012                          "dstnode:%u\n", state->reqid, state->c->opcode,
1013                          state->c->hdr.destnode));
1014
1015         state->state = CTDB_CONTROL_TIMEOUT;
1016
1017         /* if we had a callback registered for this control, pull the response
1018            and call the callback.
1019         */
1020         if (state->async.fn) {
1021                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1022         }
1023 }
1024
1025 /* async version of send control request */
1026 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1027                 uint32_t destnode, uint64_t srvid, 
1028                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1029                 TALLOC_CTX *mem_ctx,
1030                 struct timeval *timeout,
1031                 char **errormsg)
1032 {
1033         struct ctdb_client_control_state *state;
1034         size_t len;
1035         struct ctdb_req_control *c;
1036         int ret;
1037
1038         if (errormsg) {
1039                 *errormsg = NULL;
1040         }
1041
1042         /* if the domain socket is not yet open, open it */
1043         if (ctdb->daemon.sd==-1) {
1044                 ctdb_socket_connect(ctdb);
1045         }
1046
1047         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1048         CTDB_NO_MEMORY_NULL(ctdb, state);
1049
1050         state->ctdb       = ctdb;
1051         state->reqid      = ctdb_reqid_new(ctdb, state);
1052         state->state      = CTDB_CONTROL_WAIT;
1053         state->errormsg   = NULL;
1054
1055         talloc_set_destructor(state, ctdb_client_control_destructor);
1056
1057         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1058         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1059                                len, struct ctdb_req_control);
1060         state->c            = c;        
1061         CTDB_NO_MEMORY_NULL(ctdb, c);
1062         c->hdr.reqid        = state->reqid;
1063         c->hdr.destnode     = destnode;
1064         c->opcode           = opcode;
1065         c->client_id        = 0;
1066         c->flags            = flags;
1067         c->srvid            = srvid;
1068         c->datalen          = data.dsize;
1069         if (data.dsize) {
1070                 memcpy(&c->data[0], data.dptr, data.dsize);
1071         }
1072
1073         /* timeout */
1074         if (timeout && !timeval_is_zero(timeout)) {
1075                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1076         }
1077
1078         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1079         if (ret != 0) {
1080                 talloc_free(state);
1081                 return NULL;
1082         }
1083
1084         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1085                 talloc_free(state);
1086                 return NULL;
1087         }
1088
1089         return state;
1090 }
1091
1092
1093 /* async version of receive control reply */
1094 int ctdb_control_recv(struct ctdb_context *ctdb, 
1095                 struct ctdb_client_control_state *state, 
1096                 TALLOC_CTX *mem_ctx,
1097                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1098 {
1099         TALLOC_CTX *tmp_ctx;
1100
1101         if (status != NULL) {
1102                 *status = -1;
1103         }
1104         if (errormsg != NULL) {
1105                 *errormsg = NULL;
1106         }
1107
1108         if (state == NULL) {
1109                 return -1;
1110         }
1111
1112         /* prevent double free of state */
1113         tmp_ctx = talloc_new(ctdb);
1114         talloc_steal(tmp_ctx, state);
1115
1116         /* loop one event at a time until we either timeout or the control
1117            completes.
1118         */
1119         while (state->state == CTDB_CONTROL_WAIT) {
1120                 event_loop_once(ctdb->ev);
1121         }
1122
1123         if (state->state != CTDB_CONTROL_DONE) {
1124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1125                 if (state->async.fn) {
1126                         state->async.fn(state);
1127                 }
1128                 talloc_free(tmp_ctx);
1129                 return -1;
1130         }
1131
1132         if (state->errormsg) {
1133                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1134                 if (errormsg) {
1135                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1136                 }
1137                 if (state->async.fn) {
1138                         state->async.fn(state);
1139                 }
1140                 talloc_free(tmp_ctx);
1141                 return -1;
1142         }
1143
1144         if (outdata) {
1145                 *outdata = state->outdata;
1146                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1147         }
1148
1149         if (status) {
1150                 *status = state->status;
1151         }
1152
1153         if (state->async.fn) {
1154                 state->async.fn(state);
1155         }
1156
1157         talloc_free(tmp_ctx);
1158         return 0;
1159 }
1160
1161
1162
1163 /*
1164   send a ctdb control message
1165   timeout specifies how long we should wait for a reply.
1166   if timeout is NULL we wait indefinitely
1167  */
1168 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1169                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1170                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1171                  struct timeval *timeout,
1172                  char **errormsg)
1173 {
1174         struct ctdb_client_control_state *state;
1175
1176         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1177                         flags, data, mem_ctx,
1178                         timeout, errormsg);
1179
1180         /* FIXME: Error conditions in ctdb_control_send return NULL without
1181          * setting errormsg.  So, there is no way to distinguish between sucess
1182          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1183         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1184                 if (status != NULL) {
1185                         *status = 0;
1186                 }
1187                 return 0;
1188         }
1189
1190         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1191                         errormsg);
1192 }
1193
1194
1195
1196
1197 /*
1198   a process exists call. Returns 0 if process exists, -1 otherwise
1199  */
1200 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1201 {
1202         int ret;
1203         TDB_DATA data;
1204         int32_t status;
1205
1206         data.dptr = (uint8_t*)&pid;
1207         data.dsize = sizeof(pid);
1208
1209         ret = ctdb_control(ctdb, destnode, 0, 
1210                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1211                            NULL, NULL, &status, NULL, NULL);
1212         if (ret != 0) {
1213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1214                 return -1;
1215         }
1216
1217         return status;
1218 }
1219
1220 /*
1221   get remote statistics
1222  */
1223 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1224 {
1225         int ret;
1226         TDB_DATA data;
1227         int32_t res;
1228
1229         ret = ctdb_control(ctdb, destnode, 0, 
1230                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1231                            ctdb, &data, &res, NULL, NULL);
1232         if (ret != 0 || res != 0) {
1233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1234                 return -1;
1235         }
1236
1237         if (data.dsize != sizeof(struct ctdb_statistics)) {
1238                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1239                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1240                       return -1;
1241         }
1242
1243         *status = *(struct ctdb_statistics *)data.dptr;
1244         talloc_free(data.dptr);
1245                         
1246         return 0;
1247 }
1248
1249 /*
1250  * get db statistics
1251  */
1252 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1253                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1254 {
1255         int ret;
1256         TDB_DATA indata, outdata;
1257         int32_t res;
1258         struct ctdb_db_statistics *wire, *s;
1259         char *ptr;
1260         int i;
1261
1262         indata.dptr = (uint8_t *)&dbid;
1263         indata.dsize = sizeof(dbid);
1264
1265         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1266                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1267         if (ret != 0 || res != 0) {
1268                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1269                 return -1;
1270         }
1271
1272         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1273                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1274                                  outdata.dsize,
1275                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1276                 return -1;
1277         }
1278
1279         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1280         if (s == NULL) {
1281                 talloc_free(outdata.dptr);
1282                 CTDB_NO_MEMORY(ctdb, s);
1283         }
1284
1285         wire = (struct ctdb_db_statistics *)outdata.dptr;
1286         *s = *wire;
1287         ptr = &wire->hot_keys_wire[0];
1288         for (i=0; i<wire->num_hot_keys; i++) {
1289                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1290                 if (s->hot_keys[i].key.dptr == NULL) {
1291                         talloc_free(outdata.dptr);
1292                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1293                 }
1294
1295                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1296                 ptr += wire->hot_keys[i].key.dsize;
1297         }
1298
1299         talloc_free(outdata.dptr);
1300         *dbstat = s;
1301         return 0;
1302 }
1303
1304 /*
1305   shutdown a remote ctdb node
1306  */
1307 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_control_send(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1313                            NULL, &timeout, NULL);
1314         if (state == NULL) {
1315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1316                 return -1;
1317         }
1318
1319         return 0;
1320 }
1321
1322 /*
1323   get vnn map from a remote node
1324  */
1325 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1326 {
1327         int ret;
1328         TDB_DATA outdata;
1329         int32_t res;
1330         struct ctdb_vnn_map_wire *map;
1331
1332         ret = ctdb_control(ctdb, destnode, 0, 
1333                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1334                            mem_ctx, &outdata, &res, &timeout, NULL);
1335         if (ret != 0 || res != 0) {
1336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1337                 return -1;
1338         }
1339         
1340         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1341         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1342             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1343                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1344                 return -1;
1345         }
1346
1347         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1348         CTDB_NO_MEMORY(ctdb, *vnnmap);
1349         (*vnnmap)->generation = map->generation;
1350         (*vnnmap)->size       = map->size;
1351         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1352
1353         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1354         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1355         talloc_free(outdata.dptr);
1356                     
1357         return 0;
1358 }
1359
1360
1361 /*
1362   get the recovery mode of a remote node
1363  */
1364 struct ctdb_client_control_state *
1365 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1366 {
1367         return ctdb_control_send(ctdb, destnode, 0, 
1368                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1369                            mem_ctx, &timeout, NULL);
1370 }
1371
1372 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1373 {
1374         int ret;
1375         int32_t res;
1376
1377         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1380                 return -1;
1381         }
1382
1383         if (recmode) {
1384                 *recmode = (uint32_t)res;
1385         }
1386
1387         return 0;
1388 }
1389
1390 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1391 {
1392         struct ctdb_client_control_state *state;
1393
1394         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1395         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1396 }
1397
1398
1399
1400
1401 /*
1402   set the recovery mode of a remote node
1403  */
1404 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1405 {
1406         int ret;
1407         TDB_DATA data;
1408         int32_t res;
1409
1410         data.dsize = sizeof(uint32_t);
1411         data.dptr = (unsigned char *)&recmode;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424
1425
1426 /*
1427   get the recovery master of a remote node
1428  */
1429 struct ctdb_client_control_state *
1430 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1431                         struct timeval timeout, uint32_t destnode)
1432 {
1433         return ctdb_control_send(ctdb, destnode, 0, 
1434                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1435                            mem_ctx, &timeout, NULL);
1436 }
1437
1438 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1439 {
1440         int ret;
1441         int32_t res;
1442
1443         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1446                 return -1;
1447         }
1448
1449         if (recmaster) {
1450                 *recmaster = (uint32_t)res;
1451         }
1452
1453         return 0;
1454 }
1455
1456 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1457 {
1458         struct ctdb_client_control_state *state;
1459
1460         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1461         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1462 }
1463
1464
1465 /*
1466   set the recovery master of a remote node
1467  */
1468 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1469 {
1470         int ret;
1471         TDB_DATA data;
1472         int32_t res;
1473
1474         ZERO_STRUCT(data);
1475         data.dsize = sizeof(uint32_t);
1476         data.dptr = (unsigned char *)&recmaster;
1477
1478         ret = ctdb_control(ctdb, destnode, 0, 
1479                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1480                            NULL, NULL, &res, &timeout, NULL);
1481         if (ret != 0 || res != 0) {
1482                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1483                 return -1;
1484         }
1485
1486         return 0;
1487 }
1488
1489
1490 /*
1491   get a list of databases off a remote node
1492  */
1493 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1494                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1495 {
1496         int ret;
1497         TDB_DATA outdata;
1498         int32_t res;
1499
1500         ret = ctdb_control(ctdb, destnode, 0, 
1501                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1502                            mem_ctx, &outdata, &res, &timeout, NULL);
1503         if (ret != 0 || res != 0) {
1504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1505                 return -1;
1506         }
1507
1508         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1509         talloc_free(outdata.dptr);
1510                     
1511         return 0;
1512 }
1513
1514 /*
1515   get a list of nodes (vnn and flags ) from a remote node
1516  */
1517 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1518                 struct timeval timeout, uint32_t destnode, 
1519                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1520 {
1521         int ret;
1522         TDB_DATA outdata;
1523         int32_t res;
1524
1525         ret = ctdb_control(ctdb, destnode, 0, 
1526                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1527                            mem_ctx, &outdata, &res, &timeout, NULL);
1528         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1530                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1531         }
1532         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1533                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1534                 return -1;
1535         }
1536
1537         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1538         talloc_free(outdata.dptr);
1539                     
1540         return 0;
1541 }
1542
1543 /*
1544   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1545  */
1546 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1547                 struct timeval timeout, uint32_t destnode, 
1548                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1549 {
1550         int ret, i, len;
1551         TDB_DATA outdata;
1552         struct ctdb_node_mapv4 *nodemapv4;
1553         int32_t res;
1554
1555         ret = ctdb_control(ctdb, destnode, 0, 
1556                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1557                            mem_ctx, &outdata, &res, &timeout, NULL);
1558         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1559                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1560                 return -1;
1561         }
1562
1563         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1564
1565         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1566         (*nodemap) = talloc_zero_size(mem_ctx, len);
1567         CTDB_NO_MEMORY(ctdb, (*nodemap));
1568
1569         (*nodemap)->num = nodemapv4->num;
1570         for (i=0; i<nodemapv4->num; i++) {
1571                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1572                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1573                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1574                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1575         }
1576                 
1577         talloc_free(outdata.dptr);
1578                     
1579         return 0;
1580 }
1581
1582 /*
1583   drop the transport, reload the nodes file and restart the transport
1584  */
1585 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1586                     struct timeval timeout, uint32_t destnode)
1587 {
1588         int ret;
1589         int32_t res;
1590
1591         ret = ctdb_control(ctdb, destnode, 0, 
1592                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1593                            NULL, NULL, &res, &timeout, NULL);
1594         if (ret != 0 || res != 0) {
1595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1596                 return -1;
1597         }
1598
1599         return 0;
1600 }
1601
1602
1603 /*
1604   set vnn map on a node
1605  */
1606 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1607                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1608 {
1609         int ret;
1610         TDB_DATA data;
1611         int32_t res;
1612         struct ctdb_vnn_map_wire *map;
1613         size_t len;
1614
1615         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1616         map = talloc_size(mem_ctx, len);
1617         CTDB_NO_MEMORY(ctdb, map);
1618
1619         map->generation = vnnmap->generation;
1620         map->size = vnnmap->size;
1621         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1622         
1623         data.dsize = len;
1624         data.dptr  = (uint8_t *)map;
1625
1626         ret = ctdb_control(ctdb, destnode, 0, 
1627                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1628                            NULL, NULL, &res, &timeout, NULL);
1629         if (ret != 0 || res != 0) {
1630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1631                 return -1;
1632         }
1633
1634         talloc_free(map);
1635
1636         return 0;
1637 }
1638
1639
1640 /*
1641   async send for pull database
1642  */
1643 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1644         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1645         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1646 {
1647         TDB_DATA indata;
1648         struct ctdb_control_pulldb *pull;
1649         struct ctdb_client_control_state *state;
1650
1651         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1652         CTDB_NO_MEMORY_NULL(ctdb, pull);
1653
1654         pull->db_id   = dbid;
1655         pull->lmaster = lmaster;
1656
1657         indata.dsize = sizeof(struct ctdb_control_pulldb);
1658         indata.dptr  = (unsigned char *)pull;
1659
1660         state = ctdb_control_send(ctdb, destnode, 0, 
1661                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1662                                   mem_ctx, &timeout, NULL);
1663         talloc_free(pull);
1664
1665         return state;
1666 }
1667
1668 /*
1669   async recv for pull database
1670  */
1671 int ctdb_ctrl_pulldb_recv(
1672         struct ctdb_context *ctdb, 
1673         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1674         TDB_DATA *outdata)
1675 {
1676         int ret;
1677         int32_t res;
1678
1679         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1680         if ( (ret != 0) || (res != 0) ){
1681                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1682                 return -1;
1683         }
1684
1685         return 0;
1686 }
1687
1688 /*
1689   pull all keys and records for a specific database on a node
1690  */
1691 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1692                 uint32_t dbid, uint32_t lmaster, 
1693                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1694                 TDB_DATA *outdata)
1695 {
1696         struct ctdb_client_control_state *state;
1697
1698         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1699                                       timeout);
1700         
1701         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1702 }
1703
1704
1705 /*
1706   change dmaster for all keys in the database to the new value
1707  */
1708 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1709                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1710 {
1711         int ret;
1712         TDB_DATA indata;
1713         int32_t res;
1714
1715         indata.dsize = 2*sizeof(uint32_t);
1716         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1717
1718         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1719         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1720
1721         ret = ctdb_control(ctdb, destnode, 0, 
1722                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1723                            NULL, NULL, &res, &timeout, NULL);
1724         if (ret != 0 || res != 0) {
1725                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1726                 return -1;
1727         }
1728
1729         return 0;
1730 }
1731
1732 /*
1733   ping a node, return number of clients connected
1734  */
1735 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1736 {
1737         int ret;
1738         int32_t res;
1739
1740         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1741                            tdb_null, NULL, NULL, &res, NULL, NULL);
1742         if (ret != 0) {
1743                 return -1;
1744         }
1745         return res;
1746 }
1747
1748 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1749                            struct timeval timeout, 
1750                            uint32_t destnode,
1751                            uint32_t *runstate)
1752 {
1753         TDB_DATA outdata;
1754         int32_t res;
1755         int ret;
1756
1757         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1758                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1759         if (ret != 0 || res != 0) {
1760                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1761                 return ret != 0 ? ret : res;
1762         }
1763
1764         if (outdata.dsize != sizeof(uint32_t)) {
1765                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1766                 talloc_free(outdata.dptr);
1767                 return -1;
1768         }
1769
1770         if (runstate != NULL) {
1771                 *runstate = *(uint32_t *)outdata.dptr;
1772         }
1773         talloc_free(outdata.dptr);
1774
1775         return 0;
1776 }
1777
1778 /*
1779   find the real path to a ltdb 
1780  */
1781 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1782                    const char **path)
1783 {
1784         int ret;
1785         int32_t res;
1786         TDB_DATA data;
1787
1788         data.dptr = (uint8_t *)&dbid;
1789         data.dsize = sizeof(dbid);
1790
1791         ret = ctdb_control(ctdb, destnode, 0, 
1792                            CTDB_CONTROL_GETDBPATH, 0, data, 
1793                            mem_ctx, &data, &res, &timeout, NULL);
1794         if (ret != 0 || res != 0) {
1795                 return -1;
1796         }
1797
1798         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1799         if ((*path) == NULL) {
1800                 return -1;
1801         }
1802
1803         talloc_free(data.dptr);
1804
1805         return 0;
1806 }
1807
1808 /*
1809   find the name of a db 
1810  */
1811 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1812                    const char **name)
1813 {
1814         int ret;
1815         int32_t res;
1816         TDB_DATA data;
1817
1818         data.dptr = (uint8_t *)&dbid;
1819         data.dsize = sizeof(dbid);
1820
1821         ret = ctdb_control(ctdb, destnode, 0, 
1822                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1823                            mem_ctx, &data, &res, &timeout, NULL);
1824         if (ret != 0 || res != 0) {
1825                 return -1;
1826         }
1827
1828         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1829         if ((*name) == NULL) {
1830                 return -1;
1831         }
1832
1833         talloc_free(data.dptr);
1834
1835         return 0;
1836 }
1837
1838 /*
1839   get the health status of a db
1840  */
1841 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1842                           struct timeval timeout,
1843                           uint32_t destnode,
1844                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1845                           const char **reason)
1846 {
1847         int ret;
1848         int32_t res;
1849         TDB_DATA data;
1850
1851         data.dptr = (uint8_t *)&dbid;
1852         data.dsize = sizeof(dbid);
1853
1854         ret = ctdb_control(ctdb, destnode, 0,
1855                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1856                            mem_ctx, &data, &res, &timeout, NULL);
1857         if (ret != 0 || res != 0) {
1858                 return -1;
1859         }
1860
1861         if (data.dsize == 0) {
1862                 (*reason) = NULL;
1863                 return 0;
1864         }
1865
1866         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1867         if ((*reason) == NULL) {
1868                 return -1;
1869         }
1870
1871         talloc_free(data.dptr);
1872
1873         return 0;
1874 }
1875
1876 /*
1877  * get db sequence number
1878  */
1879 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1880                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1881 {
1882         int ret;
1883         int32_t res;
1884         TDB_DATA data, outdata;
1885
1886         data.dptr = (uint8_t *)&dbid;
1887         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1888
1889         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1890                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1891         if (ret != 0 || res != 0) {
1892                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1893                 return -1;
1894         }
1895
1896         if (outdata.dsize != sizeof(uint64_t)) {
1897                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1898                 talloc_free(outdata.dptr);
1899                 return -1;
1900         }
1901
1902         if (seqnum != NULL) {
1903                 *seqnum = *(uint64_t *)outdata.dptr;
1904         }
1905         talloc_free(outdata.dptr);
1906
1907         return 0;
1908 }
1909
1910 /*
1911   create a database
1912  */
1913 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1914                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1915 {
1916         int ret;
1917         int32_t res;
1918         TDB_DATA data;
1919         uint64_t tdb_flags = 0;
1920
1921         data.dptr = discard_const(name);
1922         data.dsize = strlen(name)+1;
1923
1924         /* Make sure that volatile databases use jenkins hash */
1925         if (!persistent) {
1926                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1927         }
1928
1929 #ifdef TDB_MUTEX_LOCKING
1930         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1931                 tdb_flags |= TDB_MUTEX_LOCKING;
1932         }
1933 #endif
1934
1935         ret = ctdb_control(ctdb, destnode, tdb_flags,
1936                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1937                            0, data, 
1938                            mem_ctx, &data, &res, &timeout, NULL);
1939
1940         if (ret != 0 || res != 0) {
1941                 return -1;
1942         }
1943
1944         return 0;
1945 }
1946
1947 /*
1948   get debug level on a node
1949  */
1950 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1951 {
1952         int ret;
1953         int32_t res;
1954         TDB_DATA data;
1955
1956         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1957                            ctdb, &data, &res, NULL, NULL);
1958         if (ret != 0 || res != 0) {
1959                 return -1;
1960         }
1961         if (data.dsize != sizeof(int32_t)) {
1962                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1963                          (unsigned)data.dsize));
1964                 return -1;
1965         }
1966         *level = *(int32_t *)data.dptr;
1967         talloc_free(data.dptr);
1968         return 0;
1969 }
1970
1971 /*
1972   set debug level on a node
1973  */
1974 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1975 {
1976         int ret;
1977         int32_t res;
1978         TDB_DATA data;
1979
1980         data.dptr = (uint8_t *)&level;
1981         data.dsize = sizeof(level);
1982
1983         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1984                            NULL, NULL, &res, NULL, NULL);
1985         if (ret != 0 || res != 0) {
1986                 return -1;
1987         }
1988         return 0;
1989 }
1990
1991
1992 /*
1993   get a list of connected nodes
1994  */
1995 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1996                                 struct timeval timeout,
1997                                 TALLOC_CTX *mem_ctx,
1998                                 uint32_t *num_nodes)
1999 {
2000         struct ctdb_node_map *map=NULL;
2001         int ret, i;
2002         uint32_t *nodes;
2003
2004         *num_nodes = 0;
2005
2006         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2007         if (ret != 0) {
2008                 return NULL;
2009         }
2010
2011         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2012         if (nodes == NULL) {
2013                 return NULL;
2014         }
2015
2016         for (i=0;i<map->num;i++) {
2017                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2018                         nodes[*num_nodes] = map->nodes[i].pnn;
2019                         (*num_nodes)++;
2020                 }
2021         }
2022
2023         return nodes;
2024 }
2025
2026
2027 /*
2028   reset remote status
2029  */
2030 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2031 {
2032         int ret;
2033         int32_t res;
2034
2035         ret = ctdb_control(ctdb, destnode, 0, 
2036                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2037                            NULL, NULL, &res, NULL, NULL);
2038         if (ret != 0 || res != 0) {
2039                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2040                 return -1;
2041         }
2042         return 0;
2043 }
2044
2045 /*
2046   attach to a specific database - client call
2047 */
2048 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2049                                     struct timeval timeout,
2050                                     const char *name,
2051                                     bool persistent,
2052                                     uint32_t tdb_flags)
2053 {
2054         struct ctdb_db_context *ctdb_db;
2055         TDB_DATA data;
2056         int ret;
2057         int32_t res;
2058
2059         ctdb_db = ctdb_db_handle(ctdb, name);
2060         if (ctdb_db) {
2061                 return ctdb_db;
2062         }
2063
2064         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2065         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2066
2067         ctdb_db->ctdb = ctdb;
2068         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2069         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2070
2071         data.dptr = discard_const(name);
2072         data.dsize = strlen(name)+1;
2073
2074         /* CTDB has switched to using jenkins hash for volatile databases.
2075          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2076          * always set it.
2077          */
2078         if (!persistent) {
2079                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2080         }
2081
2082 #ifdef TDB_MUTEX_LOCKING
2083         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
2084                 tdb_flags |= TDB_MUTEX_LOCKING;
2085         }
2086 #endif
2087
2088         /* tell ctdb daemon to attach */
2089         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2090                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2091                            0, data, ctdb_db, &data, &res, NULL, NULL);
2092         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2093                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2094                 talloc_free(ctdb_db);
2095                 return NULL;
2096         }
2097         
2098         ctdb_db->db_id = *(uint32_t *)data.dptr;
2099         talloc_free(data.dptr);
2100
2101         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2102         if (ret != 0) {
2103                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2104                 talloc_free(ctdb_db);
2105                 return NULL;
2106         }
2107
2108         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2109         if (ctdb->valgrinding) {
2110                 tdb_flags |= TDB_NOMMAP;
2111         }
2112         tdb_flags |= TDB_DISALLOW_NESTING;
2113
2114         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2115                                       O_RDWR, 0);
2116         if (ctdb_db->ltdb == NULL) {
2117                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2118                 talloc_free(ctdb_db);
2119                 return NULL;
2120         }
2121
2122         ctdb_db->persistent = persistent;
2123
2124         DLIST_ADD(ctdb->db_list, ctdb_db);
2125
2126         /* add well known functions */
2127         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2128         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2129         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2130
2131         return ctdb_db;
2132 }
2133
2134 /*
2135  * detach from a specific database - client call
2136  */
2137 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2138 {
2139         int ret;
2140         int32_t status;
2141         TDB_DATA data;
2142
2143         data.dsize = sizeof(db_id);
2144         data.dptr = (uint8_t *)&db_id;
2145
2146         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2147                            0, data, NULL, NULL, &status, NULL, NULL);
2148         if (ret != 0 || status != 0) {
2149                 return -1;
2150         }
2151         return 0;
2152 }
2153
2154 /*
2155   setup a call for a database
2156  */
2157 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2158 {
2159         struct ctdb_registered_call *call;
2160
2161 #if 0
2162         TDB_DATA data;
2163         int32_t status;
2164         struct ctdb_control_set_call c;
2165         int ret;
2166
2167         /* this is no longer valid with the separate daemon architecture */
2168         c.db_id = ctdb_db->db_id;
2169         c.fn    = fn;
2170         c.id    = id;
2171
2172         data.dptr = (uint8_t *)&c;
2173         data.dsize = sizeof(c);
2174
2175         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2176                            data, NULL, NULL, &status, NULL, NULL);
2177         if (ret != 0 || status != 0) {
2178                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2179                 return -1;
2180         }
2181 #endif
2182
2183         /* also register locally */
2184         call = talloc(ctdb_db, struct ctdb_registered_call);
2185         call->fn = fn;
2186         call->id = id;
2187
2188         DLIST_ADD(ctdb_db->calls, call);        
2189         return 0;
2190 }
2191
2192
2193 struct traverse_state {
2194         bool done;
2195         uint32_t count;
2196         ctdb_traverse_func fn;
2197         void *private_data;
2198         bool listemptyrecords;
2199 };
2200
2201 /*
2202   called on each key during a ctdb_traverse
2203  */
2204 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2205 {
2206         struct traverse_state *state = (struct traverse_state *)p;
2207         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2208         TDB_DATA key;
2209
2210         if (data.dsize < sizeof(uint32_t) ||
2211             d->length != data.dsize) {
2212                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2213                 state->done = true;
2214                 return;
2215         }
2216
2217         key.dsize = d->keylen;
2218         key.dptr  = &d->data[0];
2219         data.dsize = d->datalen;
2220         data.dptr = &d->data[d->keylen];
2221
2222         if (key.dsize == 0 && data.dsize == 0) {
2223                 /* end of traverse */
2224                 state->done = true;
2225                 return;
2226         }
2227
2228         if (!state->listemptyrecords &&
2229             data.dsize == sizeof(struct ctdb_ltdb_header))
2230         {
2231                 /* empty records are deleted records in ctdb */
2232                 return;
2233         }
2234
2235         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2236                 state->done = true;
2237         }
2238
2239         state->count++;
2240 }
2241
2242 /**
2243  * start a cluster wide traverse, calling the supplied fn on each record
2244  * return the number of records traversed, or -1 on error
2245  *
2246  * Extendet variant with a flag to signal whether empty records should
2247  * be listed.
2248  */
2249 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2250                              ctdb_traverse_func fn,
2251                              bool withemptyrecords,
2252                              void *private_data)
2253 {
2254         TDB_DATA data;
2255         struct ctdb_traverse_start_ext t;
2256         int32_t status;
2257         int ret;
2258         uint64_t srvid = (getpid() | 0xFLL<<60);
2259         struct traverse_state state;
2260
2261         state.done = false;
2262         state.count = 0;
2263         state.private_data = private_data;
2264         state.fn = fn;
2265         state.listemptyrecords = withemptyrecords;
2266
2267         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2268         if (ret != 0) {
2269                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2270                 return -1;
2271         }
2272
2273         t.db_id = ctdb_db->db_id;
2274         t.srvid = srvid;
2275         t.reqid = 0;
2276         t.withemptyrecords = withemptyrecords;
2277
2278         data.dptr = (uint8_t *)&t;
2279         data.dsize = sizeof(t);
2280
2281         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2282                            data, NULL, NULL, &status, NULL, NULL);
2283         if (ret != 0 || status != 0) {
2284                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2285                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2286                 return -1;
2287         }
2288
2289         while (!state.done) {
2290                 event_loop_once(ctdb_db->ctdb->ev);
2291         }
2292
2293         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2294         if (ret != 0) {
2295                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2296                 return -1;
2297         }
2298
2299         return state.count;
2300 }
2301
2302 /**
2303  * start a cluster wide traverse, calling the supplied fn on each record
2304  * return the number of records traversed, or -1 on error
2305  *
2306  * Standard version which does not list the empty records:
2307  * These are considered deleted.
2308  */
2309 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2310 {
2311         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2312 }
2313
2314 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2315 /*
2316   called on each key during a catdb
2317  */
2318 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2319 {
2320         int i;
2321         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2322         FILE *f = c->f;
2323         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2324
2325         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2326         for (i=0;i<key.dsize;i++) {
2327                 if (ISASCII(key.dptr[i])) {
2328                         fprintf(f, "%c", key.dptr[i]);
2329                 } else {
2330                         fprintf(f, "\\%02X", key.dptr[i]);
2331                 }
2332         }
2333         fprintf(f, "\"\n");
2334
2335         fprintf(f, "dmaster: %u\n", h->dmaster);
2336         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2337
2338         if (c->printlmaster && ctdb->vnn_map != NULL) {
2339                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2340         }
2341
2342         if (c->printhash) {
2343                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2344         }
2345
2346         if (c->printrecordflags) {
2347                 fprintf(f, "flags: 0x%08x", h->flags);
2348                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2349                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2350                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2351                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2352                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2353                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2354                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2355                 fprintf(f, "\n");
2356         }
2357
2358         if (c->printdatasize) {
2359                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2360         } else {
2361                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2362                 for (i=sizeof(*h);i<data.dsize;i++) {
2363                         if (ISASCII(data.dptr[i])) {
2364                                 fprintf(f, "%c", data.dptr[i]);
2365                         } else {
2366                                 fprintf(f, "\\%02X", data.dptr[i]);
2367                         }
2368                 }
2369                 fprintf(f, "\"\n");
2370         }
2371
2372         fprintf(f, "\n");
2373
2374         return 0;
2375 }
2376
2377 /*
2378   convenience function to list all keys to stdout
2379  */
2380 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2381                  struct ctdb_dump_db_context *ctx)
2382 {
2383         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2384                                  ctx->printemptyrecords, ctx);
2385 }
2386
2387 /*
2388   get the pid of a ctdb daemon
2389  */
2390 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2391 {
2392         int ret;
2393         int32_t res;
2394
2395         ret = ctdb_control(ctdb, destnode, 0, 
2396                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2397                            NULL, NULL, &res, &timeout, NULL);
2398         if (ret != 0) {
2399                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2400                 return -1;
2401         }
2402
2403         *pid = res;
2404
2405         return 0;
2406 }
2407
2408
2409 /*
2410   async freeze send control
2411  */
2412 struct ctdb_client_control_state *
2413 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2414 {
2415         return ctdb_control_send(ctdb, destnode, priority, 
2416                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2417                            mem_ctx, &timeout, NULL);
2418 }
2419
2420 /* 
2421    async freeze recv control
2422 */
2423 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2424 {
2425         int ret;
2426         int32_t res;
2427
2428         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2429         if ( (ret != 0) || (res != 0) ){
2430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2431                 return -1;
2432         }
2433
2434         return 0;
2435 }
2436
2437 /*
2438   freeze databases of a certain priority
2439  */
2440 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2441 {
2442         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2443         struct ctdb_client_control_state *state;
2444         int ret;
2445
2446         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2447         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2448         talloc_free(tmp_ctx);
2449
2450         return ret;
2451 }
2452
2453 /* Freeze all databases */
2454 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2455 {
2456         int i;
2457
2458         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2459                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2460                         return -1;
2461                 }
2462         }
2463         return 0;
2464 }
2465
2466 /*
2467   thaw databases of a certain priority
2468  */
2469 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2470 {
2471         int ret;
2472         int32_t res;
2473
2474         ret = ctdb_control(ctdb, destnode, priority, 
2475                            CTDB_CONTROL_THAW, 0, tdb_null, 
2476                            NULL, NULL, &res, &timeout, NULL);
2477         if (ret != 0 || res != 0) {
2478                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2479                 return -1;
2480         }
2481
2482         return 0;
2483 }
2484
2485 /* thaw all databases */
2486 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2487 {
2488         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2489 }
2490
2491 /*
2492   get pnn of a node, or -1
2493  */
2494 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2495 {
2496         int ret;
2497         int32_t res;
2498
2499         ret = ctdb_control(ctdb, destnode, 0, 
2500                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2501                            NULL, NULL, &res, &timeout, NULL);
2502         if (ret != 0) {
2503                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2504                 return -1;
2505         }
2506
2507         return res;
2508 }
2509
2510 /*
2511   get the monitoring mode of a remote node
2512  */
2513 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2514 {
2515         int ret;
2516         int32_t res;
2517
2518         ret = ctdb_control(ctdb, destnode, 0, 
2519                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2520                            NULL, NULL, &res, &timeout, NULL);
2521         if (ret != 0) {
2522                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2523                 return -1;
2524         }
2525
2526         *monmode = res;
2527
2528         return 0;
2529 }
2530
2531
2532 /*
2533  set the monitoring mode of a remote node to active
2534  */
2535 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2536 {
2537         int ret;
2538         
2539
2540         ret = ctdb_control(ctdb, destnode, 0, 
2541                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2542                            NULL, NULL,NULL, &timeout, NULL);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2545                 return -1;
2546         }
2547
2548         
2549
2550         return 0;
2551 }
2552
2553 /*
2554   set the monitoring mode of a remote node to disable
2555  */
2556 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2557 {
2558         int ret;
2559         
2560
2561         ret = ctdb_control(ctdb, destnode, 0, 
2562                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2563                            NULL, NULL, NULL, &timeout, NULL);
2564         if (ret != 0) {
2565                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2566                 return -1;
2567         }
2568
2569         
2570
2571         return 0;
2572 }
2573
2574
2575
2576 /* 
2577   sent to a node to make it take over an ip address
2578 */
2579 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2580                           uint32_t destnode, struct ctdb_public_ip *ip)
2581 {
2582         TDB_DATA data;
2583         struct ctdb_public_ipv4 ipv4;
2584         int ret;
2585         int32_t res;
2586
2587         if (ip->addr.sa.sa_family == AF_INET) {
2588                 ipv4.pnn = ip->pnn;
2589                 ipv4.sin = ip->addr.ip;
2590
2591                 data.dsize = sizeof(ipv4);
2592                 data.dptr  = (uint8_t *)&ipv4;
2593
2594                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2595                            NULL, &res, &timeout, NULL);
2596         } else {
2597                 data.dsize = sizeof(*ip);
2598                 data.dptr  = (uint8_t *)ip;
2599
2600                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2601                            NULL, &res, &timeout, NULL);
2602         }
2603
2604         if (ret != 0 || res != 0) {
2605                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2606                 return -1;
2607         }
2608
2609         return 0;       
2610 }
2611
2612
2613 /* 
2614   sent to a node to make it release an ip address
2615 */
2616 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2617                          uint32_t destnode, struct ctdb_public_ip *ip)
2618 {
2619         TDB_DATA data;
2620         struct ctdb_public_ipv4 ipv4;
2621         int ret;
2622         int32_t res;
2623
2624         if (ip->addr.sa.sa_family == AF_INET) {
2625                 ipv4.pnn = ip->pnn;
2626                 ipv4.sin = ip->addr.ip;
2627
2628                 data.dsize = sizeof(ipv4);
2629                 data.dptr  = (uint8_t *)&ipv4;
2630
2631                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2632                                    NULL, &res, &timeout, NULL);
2633         } else {
2634                 data.dsize = sizeof(*ip);
2635                 data.dptr  = (uint8_t *)ip;
2636
2637                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2638                                    NULL, &res, &timeout, NULL);
2639         }
2640
2641         if (ret != 0 || res != 0) {
2642                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2643                 return -1;
2644         }
2645
2646         return 0;       
2647 }
2648
2649
2650 /*
2651   get a tunable
2652  */
2653 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2654                           struct timeval timeout, 
2655                           uint32_t destnode,
2656                           const char *name, uint32_t *value)
2657 {
2658         struct ctdb_control_get_tunable *t;
2659         TDB_DATA data, outdata;
2660         int32_t res;
2661         int ret;
2662
2663         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2664         data.dptr  = talloc_size(ctdb, data.dsize);
2665         CTDB_NO_MEMORY(ctdb, data.dptr);
2666
2667         t = (struct ctdb_control_get_tunable *)data.dptr;
2668         t->length = strlen(name)+1;
2669         memcpy(t->name, name, t->length);
2670
2671         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2672                            &outdata, &res, &timeout, NULL);
2673         talloc_free(data.dptr);
2674         if (ret != 0 || res != 0) {
2675                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2676                 return ret != 0 ? ret : res;
2677         }
2678
2679         if (outdata.dsize != sizeof(uint32_t)) {
2680                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2681                 talloc_free(outdata.dptr);
2682                 return -1;
2683         }
2684         
2685         *value = *(uint32_t *)outdata.dptr;
2686         talloc_free(outdata.dptr);
2687
2688         return 0;
2689 }
2690
2691 /*
2692   set a tunable
2693  */
2694 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2695                           struct timeval timeout, 
2696                           uint32_t destnode,
2697                           const char *name, uint32_t value)
2698 {
2699         struct ctdb_control_set_tunable *t;
2700         TDB_DATA data;
2701         int32_t res;
2702         int ret;
2703
2704         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2705         data.dptr  = talloc_size(ctdb, data.dsize);
2706         CTDB_NO_MEMORY(ctdb, data.dptr);
2707
2708         t = (struct ctdb_control_set_tunable *)data.dptr;
2709         t->length = strlen(name)+1;
2710         memcpy(t->name, name, t->length);
2711         t->value = value;
2712
2713         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2714                            NULL, &res, &timeout, NULL);
2715         talloc_free(data.dptr);
2716         if (ret != 0 || res != 0) {
2717                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2718                 return -1;
2719         }
2720
2721         return 0;
2722 }
2723
2724 /*
2725   list tunables
2726  */
2727 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2728                             struct timeval timeout, 
2729                             uint32_t destnode,
2730                             TALLOC_CTX *mem_ctx,
2731                             const char ***list, uint32_t *count)
2732 {
2733         TDB_DATA outdata;
2734         int32_t res;
2735         int ret;
2736         struct ctdb_control_list_tunable *t;
2737         char *p, *s, *ptr;
2738
2739         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2740                            mem_ctx, &outdata, &res, &timeout, NULL);
2741         if (ret != 0 || res != 0) {
2742                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2743                 return -1;
2744         }
2745
2746         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2747         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2748             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2749                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2750                 talloc_free(outdata.dptr);
2751                 return -1;              
2752         }
2753         
2754         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2755         CTDB_NO_MEMORY(ctdb, p);
2756
2757         talloc_free(outdata.dptr);
2758         
2759         (*list) = NULL;
2760         (*count) = 0;
2761
2762         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2763                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2764                 CTDB_NO_MEMORY(ctdb, *list);
2765                 (*list)[*count] = talloc_strdup(*list, s);
2766                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2767                 (*count)++;
2768         }
2769
2770         talloc_free(p);
2771
2772         return 0;
2773 }
2774
2775
2776 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2777                                    struct timeval timeout, uint32_t destnode,
2778                                    TALLOC_CTX *mem_ctx,
2779                                    uint32_t flags,
2780                                    struct ctdb_all_public_ips **ips)
2781 {
2782         int ret;
2783         TDB_DATA outdata;
2784         int32_t res;
2785
2786         ret = ctdb_control(ctdb, destnode, 0, 
2787                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2788                            mem_ctx, &outdata, &res, &timeout, NULL);
2789         if (ret == 0 && res == -1) {
2790                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2791                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2792         }
2793         if (ret != 0 || res != 0) {
2794           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2795                 return -1;
2796         }
2797
2798         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2799         talloc_free(outdata.dptr);
2800                     
2801         return 0;
2802 }
2803
2804 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2805                              struct timeval timeout, uint32_t destnode,
2806                              TALLOC_CTX *mem_ctx,
2807                              struct ctdb_all_public_ips **ips)
2808 {
2809         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2810                                               destnode, mem_ctx,
2811                                               0, ips);
2812 }
2813
2814 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2815                         struct timeval timeout, uint32_t destnode, 
2816                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2817 {
2818         int ret, i, len;
2819         TDB_DATA outdata;
2820         int32_t res;
2821         struct ctdb_all_public_ipsv4 *ipsv4;
2822
2823         ret = ctdb_control(ctdb, destnode, 0, 
2824                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2825                            mem_ctx, &outdata, &res, &timeout, NULL);
2826         if (ret != 0 || res != 0) {
2827                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2828                 return -1;
2829         }
2830
2831         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2832         len = offsetof(struct ctdb_all_public_ips, ips) +
2833                 ipsv4->num*sizeof(struct ctdb_public_ip);
2834         *ips = talloc_zero_size(mem_ctx, len);
2835         CTDB_NO_MEMORY(ctdb, *ips);
2836         (*ips)->num = ipsv4->num;
2837         for (i=0; i<ipsv4->num; i++) {
2838                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2839                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2840         }
2841
2842         talloc_free(outdata.dptr);
2843                     
2844         return 0;
2845 }
2846
2847 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2848                                  struct timeval timeout, uint32_t destnode,
2849                                  TALLOC_CTX *mem_ctx,
2850                                  const ctdb_sock_addr *addr,
2851                                  struct ctdb_control_public_ip_info **_info)
2852 {
2853         int ret;
2854         TDB_DATA indata;
2855         TDB_DATA outdata;
2856         int32_t res;
2857         struct ctdb_control_public_ip_info *info;
2858         uint32_t len;
2859         uint32_t i;
2860
2861         indata.dptr = discard_const_p(uint8_t, addr);
2862         indata.dsize = sizeof(*addr);
2863
2864         ret = ctdb_control(ctdb, destnode, 0,
2865                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2866                            mem_ctx, &outdata, &res, &timeout, NULL);
2867         if (ret != 0 || res != 0) {
2868                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2869                                 "failed ret:%d res:%d\n",
2870                                 ret, res));
2871                 return -1;
2872         }
2873
2874         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2875         if (len > outdata.dsize) {
2876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2877                                 "returned invalid data with size %u > %u\n",
2878                                 (unsigned int)outdata.dsize,
2879                                 (unsigned int)len));
2880                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2881                 return -1;
2882         }
2883
2884         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2885         len += info->num*sizeof(struct ctdb_control_iface_info);
2886
2887         if (len > outdata.dsize) {
2888                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2889                                 "returned invalid data with size %u > %u\n",
2890                                 (unsigned int)outdata.dsize,
2891                                 (unsigned int)len));
2892                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2893                 return -1;
2894         }
2895
2896         /* make sure we null terminate the returned strings */
2897         for (i=0; i < info->num; i++) {
2898                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2899         }
2900
2901         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2902                                                                 outdata.dptr,
2903                                                                 outdata.dsize);
2904         talloc_free(outdata.dptr);
2905         if (*_info == NULL) {
2906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2907                                 "talloc_memdup size %u failed\n",
2908                                 (unsigned int)outdata.dsize));
2909                 return -1;
2910         }
2911
2912         return 0;
2913 }
2914
2915 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2916                          struct timeval timeout, uint32_t destnode,
2917                          TALLOC_CTX *mem_ctx,
2918                          struct ctdb_control_get_ifaces **_ifaces)
2919 {
2920         int ret;
2921         TDB_DATA outdata;
2922         int32_t res;
2923         struct ctdb_control_get_ifaces *ifaces;
2924         uint32_t len;
2925         uint32_t i;
2926
2927         ret = ctdb_control(ctdb, destnode, 0,
2928                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2929                            mem_ctx, &outdata, &res, &timeout, NULL);
2930         if (ret != 0 || res != 0) {
2931                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2932                                 "failed ret:%d res:%d\n",
2933                                 ret, res));
2934                 return -1;
2935         }
2936
2937         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2938         if (len > outdata.dsize) {
2939                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2940                                 "returned invalid data with size %u > %u\n",
2941                                 (unsigned int)outdata.dsize,
2942                                 (unsigned int)len));
2943                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2944                 return -1;
2945         }
2946
2947         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2948         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2949
2950         if (len > outdata.dsize) {
2951                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2952                                 "returned invalid data with size %u > %u\n",
2953                                 (unsigned int)outdata.dsize,
2954                                 (unsigned int)len));
2955                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2956                 return -1;
2957         }
2958
2959         /* make sure we null terminate the returned strings */
2960         for (i=0; i < ifaces->num; i++) {
2961                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2962         }
2963
2964         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2965                                                                   outdata.dptr,
2966                                                                   outdata.dsize);
2967         talloc_free(outdata.dptr);
2968         if (*_ifaces == NULL) {
2969                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2970                                 "talloc_memdup size %u failed\n",
2971                                 (unsigned int)outdata.dsize));
2972                 return -1;
2973         }
2974
2975         return 0;
2976 }
2977
2978 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2979                              struct timeval timeout, uint32_t destnode,
2980                              TALLOC_CTX *mem_ctx,
2981                              const struct ctdb_control_iface_info *info)
2982 {
2983         int ret;
2984         TDB_DATA indata;
2985         int32_t res;
2986
2987         indata.dptr = discard_const_p(uint8_t, info);
2988         indata.dsize = sizeof(*info);
2989
2990         ret = ctdb_control(ctdb, destnode, 0,
2991                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2992                            mem_ctx, NULL, &res, &timeout, NULL);
2993         if (ret != 0 || res != 0) {
2994                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2995                                 "failed ret:%d res:%d\n",
2996                                 ret, res));
2997                 return -1;
2998         }
2999
3000         return 0;
3001 }
3002
3003 /*
3004   set/clear the permanent disabled bit on a remote node
3005  */
3006 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
3007                        uint32_t set, uint32_t clear)
3008 {
3009         int ret;
3010         TDB_DATA data;
3011         struct ctdb_node_map *nodemap=NULL;
3012         struct ctdb_node_flag_change c;
3013         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3014         uint32_t recmaster;
3015         uint32_t *nodes;
3016
3017
3018         /* find the recovery master */
3019         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
3020         if (ret != 0) {
3021                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
3022                 talloc_free(tmp_ctx);
3023                 return ret;
3024         }
3025
3026
3027         /* read the node flags from the recmaster */
3028         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
3029         if (ret != 0) {
3030                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
3031                 talloc_free(tmp_ctx);
3032                 return -1;
3033         }
3034         if (destnode >= nodemap->num) {
3035                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
3036                 talloc_free(tmp_ctx);
3037                 return -1;
3038         }
3039
3040         c.pnn       = destnode;
3041         c.old_flags = nodemap->nodes[destnode].flags;
3042         c.new_flags = c.old_flags;
3043         c.new_flags |= set;
3044         c.new_flags &= ~clear;
3045
3046         data.dsize = sizeof(c);
3047         data.dptr = (unsigned char *)&c;
3048
3049         /* send the flags update to all connected nodes */
3050         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3051
3052         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3053                                         nodes, 0,
3054                                         timeout, false, data,
3055                                         NULL, NULL,
3056                                         NULL) != 0) {
3057                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3058
3059                 talloc_free(tmp_ctx);
3060                 return -1;
3061         }
3062
3063         talloc_free(tmp_ctx);
3064         return 0;
3065 }
3066
3067
3068 /*
3069   get all tunables
3070  */
3071 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3072                                struct timeval timeout, 
3073                                uint32_t destnode,
3074                                struct ctdb_tunable *tunables)
3075 {
3076         TDB_DATA outdata;
3077         int ret;
3078         int32_t res;
3079
3080         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3081                            &outdata, &res, &timeout, NULL);
3082         if (ret != 0 || res != 0) {
3083                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3084                 return -1;
3085         }
3086
3087         if (outdata.dsize != sizeof(*tunables)) {
3088                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3089                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3090                 return -1;              
3091         }
3092
3093         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3094         talloc_free(outdata.dptr);
3095         return 0;
3096 }
3097
3098 /*
3099   add a public address to a node
3100  */
3101 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3102                       struct timeval timeout, 
3103                       uint32_t destnode,
3104                       struct ctdb_control_ip_iface *pub)
3105 {
3106         TDB_DATA data;
3107         int32_t res;
3108         int ret;
3109
3110         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3111         data.dptr  = (unsigned char *)pub;
3112
3113         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3114                            NULL, &res, &timeout, NULL);
3115         if (ret != 0 || res != 0) {
3116                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3117                 return -1;
3118         }
3119
3120         return 0;
3121 }
3122
3123 /*
3124   delete a public address from a node
3125  */
3126 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3127                       struct timeval timeout, 
3128                       uint32_t destnode,
3129                       struct ctdb_control_ip_iface *pub)
3130 {
3131         TDB_DATA data;
3132         int32_t res;
3133         int ret;
3134
3135         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3136         data.dptr  = (unsigned char *)pub;
3137
3138         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3139                            NULL, &res, &timeout, NULL);
3140         if (ret != 0 || res != 0) {
3141                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3142                 return -1;
3143         }
3144
3145         return 0;
3146 }
3147
3148 /*
3149   kill a tcp connection
3150  */
3151 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3152                       struct timeval timeout, 
3153                       uint32_t destnode,
3154                       struct ctdb_control_killtcp *killtcp)
3155 {
3156         TDB_DATA data;
3157         int32_t res;
3158         int ret;
3159
3160         data.dsize = sizeof(struct ctdb_control_killtcp);
3161         data.dptr  = (unsigned char *)killtcp;
3162
3163         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3164                            NULL, &res, &timeout, NULL);
3165         if (ret != 0 || res != 0) {
3166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3167                 return -1;
3168         }
3169
3170         return 0;
3171 }
3172
3173 /*
3174   send a gratious arp
3175  */
3176 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3177                       struct timeval timeout, 
3178                       uint32_t destnode,
3179                       ctdb_sock_addr *addr,
3180                       const char *ifname)
3181 {
3182         TDB_DATA data;
3183         int32_t res;
3184         int ret, len;
3185         struct ctdb_control_gratious_arp *gratious_arp;
3186         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3187
3188
3189         len = strlen(ifname)+1;
3190         gratious_arp = talloc_size(tmp_ctx, 
3191                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3192         CTDB_NO_MEMORY(ctdb, gratious_arp);
3193
3194         gratious_arp->addr = *addr;
3195         gratious_arp->len = len;
3196         memcpy(&gratious_arp->iface[0], ifname, len);
3197
3198
3199         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3200         data.dptr  = (unsigned char *)gratious_arp;
3201
3202         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3203                            NULL, &res, &timeout, NULL);
3204         if (ret != 0 || res != 0) {
3205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3206                 talloc_free(tmp_ctx);
3207                 return -1;
3208         }
3209
3210         talloc_free(tmp_ctx);
3211         return 0;
3212 }
3213
3214 /*
3215   get a list of all tcp tickles that a node knows about for a particular vnn
3216  */
3217 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3218                               struct timeval timeout, uint32_t destnode, 
3219                               TALLOC_CTX *mem_ctx, 
3220                               ctdb_sock_addr *addr,
3221                               struct ctdb_control_tcp_tickle_list **list)
3222 {
3223         int ret;
3224         TDB_DATA data, outdata;
3225         int32_t status;
3226
3227         data.dptr = (uint8_t*)addr;
3228         data.dsize = sizeof(ctdb_sock_addr);
3229
3230         ret = ctdb_control(ctdb, destnode, 0, 
3231                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3232                            mem_ctx, &outdata, &status, NULL, NULL);
3233         if (ret != 0 || status != 0) {
3234                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3235                 return -1;
3236         }
3237
3238         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3239
3240         return status;
3241 }
3242
3243 /*
3244   register a server id
3245  */
3246 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3247                       struct timeval timeout, 
3248                       struct ctdb_server_id *id)
3249 {
3250         TDB_DATA data;
3251         int32_t res;
3252         int ret;
3253
3254         data.dsize = sizeof(struct ctdb_server_id);
3255         data.dptr  = (unsigned char *)id;
3256
3257         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3258                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3259                         0, data, NULL,
3260                         NULL, &res, &timeout, NULL);
3261         if (ret != 0 || res != 0) {
3262                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3263                 return -1;
3264         }
3265
3266         return 0;
3267 }
3268
3269 /*
3270   unregister a server id
3271  */
3272 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3273                       struct timeval timeout, 
3274                       struct ctdb_server_id *id)
3275 {
3276         TDB_DATA data;
3277         int32_t res;
3278         int ret;
3279
3280         data.dsize = sizeof(struct ctdb_server_id);
3281         data.dptr  = (unsigned char *)id;
3282
3283         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3284                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3285                         0, data, NULL,
3286                         NULL, &res, &timeout, NULL);
3287         if (ret != 0 || res != 0) {
3288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3289                 return -1;
3290         }
3291
3292         return 0;
3293 }
3294
3295
3296 /*
3297   check if a server id exists
3298
3299   if a server id does exist, return *status == 1, otherwise *status == 0
3300  */
3301 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3302                       struct timeval timeout, 
3303                       uint32_t destnode,
3304                       struct ctdb_server_id *id,
3305                       uint32_t *status)
3306 {
3307         TDB_DATA data;
3308         int32_t res;
3309         int ret;
3310
3311         data.dsize = sizeof(struct ctdb_server_id);
3312         data.dptr  = (unsigned char *)id;
3313
3314         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3315                         0, data, NULL,
3316                         NULL, &res, &timeout, NULL);
3317         if (ret != 0) {
3318                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3319                 return -1;
3320         }
3321
3322         if (res) {
3323                 *status = 1;
3324         } else {
3325                 *status = 0;
3326         }
3327
3328         return 0;
3329 }
3330
3331 /*
3332    get the list of server ids that are registered on a node
3333 */
3334 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3335                 TALLOC_CTX *mem_ctx,
3336                 struct timeval timeout, uint32_t destnode, 
3337                 struct ctdb_server_id_list **svid_list)
3338 {
3339         int ret;
3340         TDB_DATA outdata;
3341         int32_t res;
3342
3343         ret = ctdb_control(ctdb, destnode, 0, 
3344                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3345                            mem_ctx, &outdata, &res, &timeout, NULL);
3346         if (ret != 0 || res != 0) {
3347                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3348                 return -1;
3349         }
3350
3351         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3352                     
3353         return 0;
3354 }
3355
3356 /*
3357   initialise the ctdb daemon for client applications
3358
3359   NOTE: In current code the daemon does not fork. This is for testing purposes only
3360   and to simplify the code.
3361 */
3362 struct ctdb_context *ctdb_init(struct event_context *ev)
3363 {
3364         int ret;
3365         struct ctdb_context *ctdb;
3366
3367         ctdb = talloc_zero(ev, struct ctdb_context);
3368         if (ctdb == NULL) {
3369                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3370                 return NULL;
3371         }
3372         ctdb->ev  = ev;
3373         ctdb->idr = idr_init(ctdb);
3374         /* Wrap early to exercise code. */
3375         ctdb->lastid = INT_MAX-200;
3376         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3377
3378         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3379         if (ret != 0) {
3380                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3381                 talloc_free(ctdb);
3382                 return NULL;
3383         }
3384
3385         ctdb->statistics.statistics_start_time = timeval_current();
3386
3387         return ctdb;
3388 }
3389
3390
3391 /*
3392   set some ctdb flags
3393 */
3394 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3395 {
3396         ctdb->flags |= flags;
3397 }
3398
3399 /*
3400   setup the local socket name
3401 */
3402 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3403 {
3404         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3405         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3406
3407         return 0;
3408 }
3409
3410 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3411 {
3412         return ctdb->daemon.name;
3413 }
3414
3415 /*
3416   return the pnn of this node
3417 */
3418 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3419 {
3420         return ctdb->pnn;
3421 }
3422
3423
3424 /*
3425   get the uptime of a remote node
3426  */
3427 struct ctdb_client_control_state *
3428 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3429 {
3430         return ctdb_control_send(ctdb, destnode, 0, 
3431                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3432                            mem_ctx, &timeout, NULL);
3433 }
3434
3435 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3436 {
3437         int ret;
3438         int32_t res;
3439         TDB_DATA outdata;
3440
3441         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3442         if (ret != 0 || res != 0) {
3443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3444                 return -1;
3445         }
3446
3447         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3448
3449         return 0;
3450 }
3451
3452 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3453 {
3454         struct ctdb_client_control_state *state;
3455
3456         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3457         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3458 }
3459
3460 /*
3461   send a control to execute the "recovered" event script on a node
3462  */
3463 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3464 {
3465         int ret;
3466         int32_t status;
3467
3468         ret = ctdb_control(ctdb, destnode, 0, 
3469                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3470                            NULL, NULL, &status, &timeout, NULL);
3471         if (ret != 0 || status != 0) {
3472                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3473                 return -1;
3474         }
3475
3476         return 0;
3477 }
3478
3479 /* 
3480   callback for the async helpers used when sending the same control
3481   to multiple nodes in parallell.
3482 */
3483 static void async_callback(struct ctdb_client_control_state *state)
3484 {
3485         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3486         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3487         int ret;
3488         TDB_DATA outdata;
3489         int32_t res = -1;
3490         uint32_t destnode = state->c->hdr.destnode;
3491
3492         outdata.dsize = 0;
3493         outdata.dptr = NULL;
3494
3495         /* one more node has responded with recmode data */
3496         data->count--;
3497
3498         /* if we failed to push the db, then return an error and let
3499            the main loop try again.
3500         */
3501         if (state->state != CTDB_CONTROL_DONE) {
3502                 if ( !data->dont_log_errors) {
3503                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3504                 }
3505                 data->fail_count++;
3506                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3507                         res = -ETIME;
3508                 } else {
3509                         res = -1;
3510                 }
3511                 if (data->fail_callback) {
3512                         data->fail_callback(ctdb, destnode, res, outdata,
3513                                         data->callback_data);
3514                 }
3515                 return;
3516         }
3517         
3518         state->async.fn = NULL;
3519
3520         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3521         if ((ret != 0) || (res != 0)) {
3522                 if ( !data->dont_log_errors) {
3523                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3524                 }
3525                 data->fail_count++;
3526                 if (data->fail_callback) {
3527                         data->fail_callback(ctdb, destnode, res, outdata,
3528                                         data->callback_data);
3529                 }
3530         }
3531         if ((ret == 0) && (data->callback != NULL)) {
3532                 data->callback(ctdb, destnode, res, outdata,
3533                                         data->callback_data);
3534         }
3535 }
3536
3537
3538 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3539 {
3540         /* set up the callback functions */
3541         state->async.fn = async_callback;
3542         state->async.private_data = data;
3543         
3544         /* one more control to wait for to complete */
3545         data->count++;
3546 }
3547
3548
3549 /* wait for up to the maximum number of seconds allowed
3550    or until all nodes we expect a response from has replied
3551 */
3552 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3553 {
3554         while (data->count > 0) {
3555                 event_loop_once(ctdb->ev);
3556         }
3557         if (data->fail_count != 0) {
3558                 if (!data->dont_log_errors) {
3559                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3560                                  data->fail_count));
3561                 }
3562                 return -1;
3563         }
3564         return 0;
3565 }
3566
3567
3568 /* 
3569    perform a simple control on the listed nodes
3570    The control cannot return data
3571  */
3572 int ctdb_client_async_control(struct ctdb_context *ctdb,
3573                                 enum ctdb_controls opcode,
3574                                 uint32_t *nodes,
3575                                 uint64_t srvid,
3576                                 struct timeval timeout,
3577                                 bool dont_log_errors,
3578                                 TDB_DATA data,
3579                                 client_async_callback client_callback,
3580                                 client_async_callback fail_callback,
3581                                 void *callback_data)
3582 {
3583         struct client_async_data *async_data;
3584         struct ctdb_client_control_state *state;
3585         int j, num_nodes;
3586
3587         async_data = talloc_zero(ctdb, struct client_async_data);
3588         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3589         async_data->dont_log_errors = dont_log_errors;
3590         async_data->callback = client_callback;
3591         async_data->fail_callback = fail_callback;
3592         async_data->callback_data = callback_data;
3593         async_data->opcode        = opcode;
3594
3595         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3596
3597         /* loop over all nodes and send an async control to each of them */
3598         for (j=0; j<num_nodes; j++) {
3599                 uint32_t pnn = nodes[j];
3600
3601                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3602                                           0, data, async_data, &timeout, NULL);
3603                 if (state == NULL) {
3604                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3605                         talloc_free(async_data);
3606                         return -1;
3607                 }
3608                 
3609                 ctdb_client_async_add(async_data, state);
3610         }
3611
3612         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3613                 talloc_free(async_data);
3614                 return -1;
3615         }
3616
3617         talloc_free(async_data);
3618         return 0;
3619 }
3620
3621 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3622                                 struct ctdb_vnn_map *vnn_map,
3623                                 TALLOC_CTX *mem_ctx,
3624                                 bool include_self)
3625 {
3626         int i, j, num_nodes;
3627         uint32_t *nodes;
3628
3629         for (i=num_nodes=0;i<vnn_map->size;i++) {
3630                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3631                         continue;
3632                 }
3633                 num_nodes++;
3634         } 
3635
3636         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3637         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3638
3639         for (i=j=0;i<vnn_map->size;i++) {
3640                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3641                         continue;
3642                 }
3643                 nodes[j++] = vnn_map->map[i];
3644         } 
3645
3646         return nodes;
3647 }
3648
3649 /* Get list of nodes not including those with flags specified by mask.
3650  * If exclude_pnn is not -1 then exclude that pnn from the list.
3651  */
3652 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3653                         struct ctdb_node_map *node_map,
3654                         TALLOC_CTX *mem_ctx,
3655                         uint32_t mask,
3656                         int exclude_pnn)
3657 {
3658         int i, j, num_nodes;
3659         uint32_t *nodes;
3660
3661         for (i=num_nodes=0;i<node_map->num;i++) {
3662                 if (node_map->nodes[i].flags & mask) {
3663                         continue;
3664                 }
3665                 if (node_map->nodes[i].pnn == exclude_pnn) {
3666                         continue;
3667                 }
3668                 num_nodes++;
3669         } 
3670
3671         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3672         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3673
3674         for (i=j=0;i<node_map->num;i++) {
3675                 if (node_map->nodes[i].flags & mask) {
3676                         continue;
3677                 }
3678                 if (node_map->nodes[i].pnn == exclude_pnn) {
3679                         continue;
3680                 }
3681                 nodes[j++] = node_map->nodes[i].pnn;
3682         } 
3683
3684         return nodes;
3685 }
3686
3687 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3688                                 struct ctdb_node_map *node_map,
3689                                 TALLOC_CTX *mem_ctx,
3690                                 bool include_self)
3691 {
3692         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3693                              include_self ? -1 : ctdb->pnn);
3694 }
3695
3696 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3697                                 struct ctdb_node_map *node_map,
3698                                 TALLOC_CTX *mem_ctx,
3699                                 bool include_self)
3700 {
3701         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3702                              include_self ? -1 : ctdb->pnn);
3703 }
3704
3705 /* 
3706   this is used to test if a pnn lock exists and if it exists will return
3707   the number of connections that pnn has reported or -1 if that recovery
3708   daemon is not running.
3709 */
3710 int
3711 ctdb_read_pnn_lock(int fd, int32_t pnn)
3712 {
3713         struct flock lock;
3714         char c;
3715
3716         lock.l_type = F_WRLCK;
3717         lock.l_whence = SEEK_SET;
3718         lock.l_start = pnn;
3719         lock.l_len = 1;
3720         lock.l_pid = 0;
3721
3722         if (fcntl(fd, F_GETLK, &lock) != 0) {
3723                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3724                 return -1;
3725         }
3726
3727         if (lock.l_type == F_UNLCK) {
3728                 return -1;
3729         }
3730
3731         if (pread(fd, &c, 1, pnn) == -1) {
3732                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3733                 return -1;
3734         }
3735
3736         return c;
3737 }
3738
3739 /*
3740   get capabilities of a remote node
3741  */
3742 struct ctdb_client_control_state *
3743 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3744 {
3745         return ctdb_control_send(ctdb, destnode, 0, 
3746                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3747                            mem_ctx, &timeout, NULL);
3748 }
3749
3750 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3751 {
3752         int ret;
3753         int32_t res;
3754         TDB_DATA outdata;
3755
3756         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3757         if ( (ret != 0) || (res != 0) ) {
3758                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3759                 return -1;
3760         }
3761
3762         if (capabilities) {
3763                 *capabilities = *((uint32_t *)outdata.dptr);
3764         }
3765
3766         return 0;
3767 }
3768
3769 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3770 {
3771         struct ctdb_client_control_state *state;
3772         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3773         int ret;
3774
3775         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3776         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3777         talloc_free(tmp_ctx);
3778         return ret;
3779 }
3780
3781 struct server_id {
3782         uint64_t pid;
3783         uint32_t task_id;
3784         uint32_t vnn;
3785         uint64_t unique_id;
3786 };
3787
3788 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3789 {
3790         struct server_id id;
3791
3792         id.pid = getpid();
3793         id.task_id = reqid;
3794         id.vnn = ctdb_get_pnn(ctdb);
3795         id.unique_id = id.vnn;
3796         id.unique_id = (id.unique_id << 32) | reqid;
3797
3798         return id;
3799 }
3800
3801 static bool server_id_equal(struct server_id *id1, struct server_id *id2)
3802 {
3803         if (id1->pid != id2->pid) {
3804                 return false;
3805         }
3806
3807         if (id1->task_id != id2->task_id) {
3808                 return false;
3809         }
3810
3811         if (id1->vnn != id2->vnn) {
3812                 return false;
3813         }
3814
3815         if (id1->unique_id != id2->unique_id) {
3816                 return false;
3817         }
3818
3819         return true;
3820 }
3821
3822 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3823 {
3824         struct ctdb_server_id sid;
3825         int ret;
3826         uint32_t result;
3827
3828         sid.type = SERVER_TYPE_SAMBA;
3829         sid.pnn = id->vnn;
3830         sid.server_id = id->pid;
3831
3832         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3833                                         id->vnn, &sid, &result);
3834         if (ret != 0) {
3835                 /* If control times out, assume server_id exists. */
3836                 return true;
3837         }
3838
3839         if (result) {
3840                 return true;
3841         }
3842
3843         return false;
3844 }
3845
3846
3847 enum g_lock_type {
3848         G_LOCK_READ = 0,
3849         G_LOCK_WRITE = 1,
3850 };
3851
3852 struct g_lock_rec {
3853         enum g_lock_type type;
3854         struct server_id id;
3855 };
3856
3857 struct g_lock_recs {
3858         unsigned int num;
3859         struct g_lock_rec *lock;
3860 };
3861
3862 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3863                          struct g_lock_recs **locks)
3864 {
3865         struct g_lock_recs *recs;
3866
3867         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3868         if (recs == NULL) {
3869                 return false;
3870         }
3871
3872         if (data.dsize == 0) {
3873                 goto done;
3874         }
3875
3876         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3877                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3878                                   (unsigned long)data.dsize));
3879                 talloc_free(recs);
3880                 return false;
3881         }
3882
3883         recs->num = data.dsize / sizeof(struct g_lock_rec);
3884         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3885         if (recs->lock == NULL) {
3886                 talloc_free(recs);
3887                 return false;
3888         }
3889
3890 done:
3891         if (locks != NULL) {
3892                 *locks = recs;
3893         }
3894
3895         return true;
3896 }
3897
3898
3899 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3900                         struct ctdb_db_context *ctdb_db,
3901                         const char *keyname, uint32_t reqid)
3902 {
3903         TDB_DATA key, data;
3904         struct ctdb_record_handle *h;
3905         struct g_lock_recs *locks;
3906         struct server_id id;
3907         struct timeval t_start;
3908         int i;
3909
3910         key.dptr = (uint8_t *)discard_const(keyname);
3911         key.dsize = strlen(keyname) + 1;
3912
3913         t_start = timeval_current();
3914
3915 again:
3916         /* Keep trying for an hour. */
3917         if (timeval_elapsed(&t_start) > 3600) {
3918                 return false;
3919         }
3920
3921         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3922         if (h == NULL) {
3923                 return false;
3924         }
3925
3926         if (!g_lock_parse(h, data, &locks)) {
3927                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3928                 talloc_free(data.dptr);
3929                 talloc_free(h);
3930                 return false;
3931         }
3932
3933         talloc_free(data.dptr);
3934
3935         id = server_id_get(ctdb_db->ctdb, reqid);
3936
3937         i = 0;
3938         while (i < locks->num) {
3939                 if (server_id_equal(&locks->lock[i].id, &id)) {
3940                         /* Internal error */
3941                         talloc_free(h);
3942                         return false;
3943                 }
3944
3945                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3946                         if (i < locks->num-1) {
3947                                 locks->lock[i] = locks->lock[locks->num-1];
3948                         }
3949                         locks->num--;
3950                         continue;
3951                 }
3952
3953                 /* This entry is locked. */
3954                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3955                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3956                                    (unsigned long long)id.pid,
3957                                    id.task_id, id.vnn,
3958                                    (unsigned long long)id.unique_id));
3959                 talloc_free(h);
3960                 goto again;
3961         }
3962
3963         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3964                                      locks->num+1);
3965         if (locks->lock == NULL) {
3966                 talloc_free(h);
3967                 return false;
3968         }
3969
3970         locks->lock[locks->num].type = G_LOCK_WRITE;
3971         locks->lock[locks->num].id = id;
3972         locks->num++;
3973
3974         data.dptr = (uint8_t *)locks->lock;
3975         data.dsize = locks->num * sizeof(struct g_lock_rec);
3976
3977         if (ctdb_record_store(h, data) != 0) {
3978                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3979                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3980                                   (unsigned long long)id.pid,
3981                                   id.task_id, id.vnn,
3982                                   (unsigned long long)id.unique_id));
3983                 talloc_free(h);
3984                 return false;
3985         }
3986
3987         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3988                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3989                            (unsigned long long)id.pid,
3990                            id.task_id, id.vnn,
3991                            (unsigned long long)id.unique_id));
3992
3993         talloc_free(h);
3994         return true;
3995 }
3996
3997 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3998                           struct ctdb_db_context *ctdb_db,
3999                           const char *keyname, uint32_t reqid)
4000 {
4001         TDB_DATA key, data;
4002         struct ctdb_record_handle *h;
4003         struct g_lock_recs *locks;
4004         struct server_id id;
4005         int i;
4006         bool found = false;
4007
4008         key.dptr = (uint8_t *)discard_const(keyname);
4009         key.dsize = strlen(keyname) + 1;
4010         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
4011         if (h == NULL) {
4012                 return false;
4013         }
4014
4015         if (!g_lock_parse(h, data, &locks)) {
4016                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4017                 talloc_free(data.dptr);
4018                 talloc_free(h);
4019                 return false;
4020         }
4021
4022         talloc_free(data.dptr);
4023
4024         id = server_id_get(ctdb_db->ctdb, reqid);
4025
4026         for (i=0; i<locks->num; i++) {
4027                 if (server_id_equal(&locks->lock[i].id, &id)) {
4028                         if (i < locks->num-1) {
4029                                 locks->lock[i] = locks->lock[locks->num-1];
4030                         }
4031                         locks->num--;
4032                         found = true;
4033                         break;
4034                 }
4035         }
4036
4037         if (!found) {
4038                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4039                 talloc_free(h);
4040                 return false;
4041         }
4042
4043         data.dptr = (uint8_t *)locks->lock;
4044         data.dsize = locks->num * sizeof(struct g_lock_rec);
4045
4046         if (ctdb_record_store(h, data) != 0) {
4047                 talloc_free(h);
4048                 return false;
4049         }
4050
4051         talloc_free(h);
4052         return true;
4053 }
4054
4055
4056 struct ctdb_transaction_handle {
4057         struct ctdb_db_context *ctdb_db;
4058         struct ctdb_db_context *g_lock_db;
4059         char *lock_name;
4060         uint32_t reqid;
4061         /*
4062          * we store reads and writes done under a transaction:
4063          * - one list stores both reads and writes (m_all)
4064          * - the other just writes (m_write)
4065          */
4066         struct ctdb_marshall_buffer *m_all;
4067         struct ctdb_marshall_buffer *m_write;
4068 };
4069
4070 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4071 {
4072         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4073         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4074         return 0;
4075 }
4076
4077
4078 /**
4079  * start a transaction on a database
4080  */
4081 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4082                                                        TALLOC_CTX *mem_ctx)
4083 {
4084         struct ctdb_transaction_handle *h;
4085         struct ctdb_server_id id;
4086
4087         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4088         if (h == NULL) {
4089                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4090                 return NULL;
4091         }
4092
4093         h->ctdb_db = ctdb_db;
4094         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4095                                        (unsigned int)ctdb_db->db_id);
4096         if (h->lock_name == NULL) {
4097                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4098                 talloc_free(h);
4099                 return NULL;
4100         }
4101
4102         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4103                                    "g_lock.tdb", false, 0);
4104         if (!h->g_lock_db) {
4105                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4106                 talloc_free(h);
4107                 return NULL;
4108         }
4109
4110         id.type = SERVER_TYPE_SAMBA;
4111         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4112         id.server_id = getpid();
4113
4114         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4115                                          &id) != 0) {
4116                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4117                 talloc_free(h);
4118                 return NULL;
4119         }
4120
4121         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4122
4123         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4124                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4125                 talloc_free(h);
4126                 return NULL;
4127         }
4128
4129         talloc_set_destructor(h, ctdb_transaction_destructor);
4130         return h;
4131 }
4132
4133 /**
4134  * fetch a record inside a transaction
4135  */
4136 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4137                            TALLOC_CTX *mem_ctx,
4138                            TDB_DATA key, TDB_DATA *data)
4139 {
4140         struct ctdb_ltdb_header header;
4141         int ret;
4142
4143         ZERO_STRUCT(header);
4144
4145         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4146         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4147                 /* record doesn't exist yet */
4148                 *data = tdb_null;
4149                 ret = 0;
4150         }
4151
4152         if (ret != 0) {
4153                 return ret;
4154         }
4155
4156         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4157         if (h->m_all == NULL) {
4158                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4159                 return -1;
4160         }
4161
4162         return 0;
4163 }
4164
4165 /**
4166  * stores a record inside a transaction
4167  */
4168 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4169                            TDB_DATA key, TDB_DATA data)
4170 {
4171         TALLOC_CTX *tmp_ctx = talloc_new(h);
4172         struct ctdb_ltdb_header header;
4173         TDB_DATA olddata;
4174         int ret;
4175
4176         /* we need the header so we can update the RSN */
4177         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4178         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4179                 /* the record doesn't exist - create one with us as dmaster.
4180                    This is only safe because we are in a transaction and this
4181                    is a persistent database */
4182                 ZERO_STRUCT(header);
4183         } else if (ret != 0) {
4184                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4185                 talloc_free(tmp_ctx);
4186                 return ret;
4187         }
4188
4189         if (data.dsize == olddata.dsize &&
4190             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4191             header.rsn != 0) {
4192                 /* save writing the same data */
4193                 talloc_free(tmp_ctx);
4194                 return 0;
4195         }
4196
4197         header.dmaster = h->ctdb_db->ctdb->pnn;
4198         header.rsn++;
4199
4200         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4201         if (h->m_all == NULL) {
4202                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4203                 talloc_free(tmp_ctx);
4204                 return -1;
4205         }
4206
4207         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4208         if (h->m_write == NULL) {
4209                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4210                 talloc_free(tmp_ctx);
4211                 return -1;
4212         }
4213
4214         talloc_free(tmp_ctx);
4215         return 0;
4216 }
4217
4218 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4219 {
4220         const char *keyname = CTDB_DB_SEQNUM_KEY;
4221         TDB_DATA key, data;
4222         struct ctdb_ltdb_header header;
4223         int ret;
4224
4225         key.dptr = (uint8_t *)discard_const(keyname);
4226         key.dsize = strlen(keyname) + 1;
4227
4228         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4229         if (ret != 0) {
4230                 *seqnum = 0;
4231                 return 0;
4232         }
4233
4234         if (data.dsize == 0) {
4235                 *seqnum = 0;
4236                 return 0;
4237         }
4238
4239         if (data.dsize != sizeof(*seqnum)) {
4240                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4241                                   data.dsize));
4242                 talloc_free(data.dptr);
4243                 return -1;
4244         }
4245
4246         *seqnum = *(uint64_t *)data.dptr;
4247         talloc_free(data.dptr);
4248
4249         return 0;
4250 }
4251
4252
4253 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4254                                 uint64_t seqnum)
4255 {
4256         const char *keyname = CTDB_DB_SEQNUM_KEY;
4257         TDB_DATA key, data;
4258
4259         key.dptr = (uint8_t *)discard_const(keyname);
4260         key.dsize = strlen(keyname) + 1;
4261
4262         data.dptr = (uint8_t *)&seqnum;
4263         data.dsize = sizeof(seqnum);
4264
4265         return ctdb_transaction_store(h, key, data);
4266 }
4267
4268
4269 /**
4270  * commit a transaction
4271  */
4272 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4273 {
4274         int ret;
4275         uint64_t old_seqnum, new_seqnum;
4276         int32_t status;
4277         struct timeval timeout;
4278
4279         if (h->m_write == NULL) {
4280                 /* no changes were made */
4281                 talloc_free(h);
4282                 return 0;
4283         }
4284
4285         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4286         if (ret != 0) {
4287                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4288                 ret = -1;
4289                 goto done;
4290         }
4291
4292         new_seqnum = old_seqnum + 1;
4293         ret = ctdb_store_db_seqnum(h, new_seqnum);
4294         if (ret != 0) {
4295                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4296                 ret = -1;
4297                 goto done;
4298         }
4299
4300 again:
4301         timeout = timeval_current_ofs(3,0);
4302         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4303                            h->ctdb_db->db_id,
4304                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4305                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4306                            &status, &timeout, NULL);
4307         if (ret != 0 || status != 0) {
4308                 /*
4309                  * TRANS3_COMMIT control will only fail if recovery has been
4310                  * triggered.  Check if the database has been updated or not.
4311                  */
4312                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4313                 if (ret != 0) {
4314                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4315                         goto done;
4316                 }
4317
4318                 if (new_seqnum == old_seqnum) {
4319                         /* Database not yet updated, try again */
4320                         goto again;
4321                 }
4322
4323                 if (new_seqnum != (old_seqnum + 1)) {
4324                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4325                                           (long long unsigned)new_seqnum,
4326                                           (long long unsigned)old_seqnum));
4327                         ret = -1;
4328                         goto done;
4329                 }
4330         }
4331
4332         ret = 0;
4333
4334 done:
4335         talloc_free(h);
4336         return ret;
4337 }
4338
4339 /**
4340  * cancel a transaction
4341  */
4342 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4343 {
4344         talloc_free(h);
4345         return 0;
4346 }
4347
4348
4349 /*
4350   recovery daemon ping to main daemon
4351  */
4352 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4353 {
4354         int ret;
4355         int32_t res;
4356
4357         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4358                            ctdb, NULL, &res, NULL, NULL);
4359         if (ret != 0 || res != 0) {
4360                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4361                 return -1;
4362         }
4363
4364         return 0;
4365 }
4366
4367 /* When forking the main daemon and the child process needs to connect
4368  * back to the daemon as a client process, this function can be used
4369  * to change the ctdb context from daemon into client mode.  The child
4370  * process must be created using ctdb_fork() and not fork() -
4371  * ctdb_fork() does some necessary housekeeping.
4372  */
4373 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4374 {
4375         int ret;
4376         va_list ap;
4377
4378         /* Add extra information so we can identify this in the logs */
4379         va_start(ap, fmt);
4380         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4381         va_end(ap);
4382
4383         /* get a new event context */
4384         ctdb->ev = event_context_init(ctdb);
4385         tevent_loop_allow_nesting(ctdb->ev);
4386
4387         /* Connect to main CTDB daemon */
4388         ret = ctdb_socket_connect(ctdb);
4389         if (ret != 0) {
4390                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4391                 return -1;
4392         }
4393
4394         ctdb->can_send_controls = true;
4395
4396         return 0;
4397 }
4398
4399 /*
4400   get the status of running the monitor eventscripts: NULL means never run.
4401  */
4402 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4403                 struct timeval timeout, uint32_t destnode, 
4404                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4405                 struct ctdb_scripts_wire **scripts)
4406 {
4407         int ret;
4408         TDB_DATA outdata, indata;
4409         int32_t res;
4410         uint32_t uinttype = type;
4411
4412         indata.dptr = (uint8_t *)&uinttype;
4413         indata.dsize = sizeof(uinttype);
4414
4415         ret = ctdb_control(ctdb, destnode, 0, 
4416                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4417                            mem_ctx, &outdata, &res, &timeout, NULL);
4418         if (ret != 0 || res != 0) {
4419                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4420                 return -1;
4421         }
4422
4423         if (outdata.dsize == 0) {
4424                 *scripts = NULL;
4425         } else {
4426                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4427                 talloc_free(outdata.dptr);
4428         }
4429                     
4430         return 0;
4431 }
4432
4433 /*
4434   tell the main daemon how long it took to lock the reclock file
4435  */
4436 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4437 {
4438         int ret;
4439         int32_t res;
4440         TDB_DATA data;
4441
4442         data.dptr = (uint8_t *)&latency;
4443         data.dsize = sizeof(latency);
4444
4445         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4446                            ctdb, NULL, &res, NULL, NULL);
4447         if (ret != 0 || res != 0) {
4448                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4449                 return -1;
4450         }
4451
4452         return 0;
4453 }
4454
4455 /*
4456   get the name of the reclock file
4457  */
4458 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4459                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4460                          const char **name)
4461 {
4462         int ret;
4463         int32_t res;
4464         TDB_DATA data;
4465
4466         ret = ctdb_control(ctdb, destnode, 0, 
4467                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4468                            mem_ctx, &data, &res, &timeout, NULL);
4469         if (ret != 0 || res != 0) {
4470                 return -1;
4471         }
4472
4473         if (data.dsize == 0) {
4474                 *name = NULL;
4475         } else {
4476                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4477         }
4478         talloc_free(data.dptr);
4479
4480         return 0;
4481 }
4482
4483 /*
4484   set the reclock filename for a node
4485  */
4486 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4487 {
4488         int ret;
4489         TDB_DATA data;
4490         int32_t res;
4491
4492         if (reclock == NULL) {
4493                 data.dsize = 0;
4494                 data.dptr  = NULL;
4495         } else {
4496                 data.dsize = strlen(reclock) + 1;
4497                 data.dptr  = discard_const(reclock);
4498         }
4499
4500         ret = ctdb_control(ctdb, destnode, 0, 
4501                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4502                            NULL, NULL, &res, &timeout, NULL);
4503         if (ret != 0 || res != 0) {
4504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4505                 return -1;
4506         }
4507
4508         return 0;
4509 }
4510
4511 /*
4512   stop a node
4513  */
4514 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4515 {
4516         int ret;
4517         int32_t res;
4518
4519         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4520                            ctdb, NULL, &res, &timeout, NULL);
4521         if (ret != 0 || res != 0) {
4522                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4523                 return -1;
4524         }
4525
4526         return 0;
4527 }
4528
4529 /*
4530   continue a node
4531  */
4532 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4533 {
4534         int ret;
4535
4536         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4537                            ctdb, NULL, NULL, &timeout, NULL);
4538         if (ret != 0) {
4539                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4540                 return -1;
4541         }
4542
4543         return 0;
4544 }
4545
4546 /*
4547   set the natgw state for a node
4548  */
4549 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4550 {
4551         int ret;
4552         TDB_DATA data;
4553         int32_t res;
4554
4555         data.dsize = sizeof(natgwstate);
4556         data.dptr  = (uint8_t *)&natgwstate;
4557
4558         ret = ctdb_control(ctdb, destnode, 0, 
4559                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4560                            NULL, NULL, &res, &timeout, NULL);
4561         if (ret != 0 || res != 0) {
4562                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4563                 return -1;
4564         }
4565
4566         return 0;
4567 }
4568
4569 /*
4570   set the lmaster role for a node
4571  */
4572 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4573 {
4574         int ret;
4575         TDB_DATA data;
4576         int32_t res;
4577
4578         data.dsize = sizeof(lmasterrole);
4579         data.dptr  = (uint8_t *)&lmasterrole;
4580
4581         ret = ctdb_control(ctdb, destnode, 0, 
4582                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4583                            NULL, NULL, &res, &timeout, NULL);
4584         if (ret != 0 || res != 0) {
4585                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4586                 return -1;
4587         }
4588
4589         return 0;
4590 }
4591
4592 /*
4593   set the recmaster role for a node
4594  */
4595 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4596 {
4597         int ret;
4598         TDB_DATA data;
4599         int32_t res;
4600
4601         data.dsize = sizeof(recmasterrole);
4602         data.dptr  = (uint8_t *)&recmasterrole;
4603
4604         ret = ctdb_control(ctdb, destnode, 0, 
4605                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4606                            NULL, NULL, &res, &timeout, NULL);
4607         if (ret != 0 || res != 0) {
4608                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4609                 return -1;
4610         }
4611
4612         return 0;
4613 }
4614
4615 /* enable an eventscript
4616  */
4617 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4618 {
4619         int ret;
4620         TDB_DATA data;
4621         int32_t res;
4622
4623         data.dsize = strlen(script) + 1;
4624         data.dptr  = discard_const(script);
4625
4626         ret = ctdb_control(ctdb, destnode, 0, 
4627                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4628                            NULL, NULL, &res, &timeout, NULL);
4629         if (ret != 0 || res != 0) {
4630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4631                 return -1;
4632         }
4633
4634         return 0;
4635 }
4636
4637 /* disable an eventscript
4638  */
4639 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4640 {
4641         int ret;
4642         TDB_DATA data;
4643         int32_t res;
4644
4645         data.dsize = strlen(script) + 1;
4646         data.dptr  = discard_const(script);
4647
4648         ret = ctdb_control(ctdb, destnode, 0, 
4649                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4650                            NULL, NULL, &res, &timeout, NULL);
4651         if (ret != 0 || res != 0) {
4652                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4653                 return -1;
4654         }
4655
4656         return 0;
4657 }
4658
4659
4660 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4661 {
4662         int ret;
4663         TDB_DATA data;
4664         int32_t res;
4665
4666         data.dsize = sizeof(*bantime);
4667         data.dptr  = (uint8_t *)bantime;
4668
4669         ret = ctdb_control(ctdb, destnode, 0, 
4670                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4671                            NULL, NULL, &res, &timeout, NULL);
4672         if (ret != 0 || res != 0) {
4673                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4674                 return -1;
4675         }
4676
4677         return 0;
4678 }
4679
4680
4681 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4682 {
4683         int ret;
4684         TDB_DATA outdata;
4685         int32_t res;
4686         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4687
4688         ret = ctdb_control(ctdb, destnode, 0, 
4689                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4690                            tmp_ctx, &outdata, &res, &timeout, NULL);
4691         if (ret != 0 || res != 0) {
4692                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4693                 talloc_free(tmp_ctx);
4694                 return -1;
4695         }
4696
4697         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4698         talloc_free(tmp_ctx);
4699
4700         return 0;
4701 }
4702
4703
4704 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4705 {
4706         int ret;
4707         int32_t res;
4708         TDB_DATA data;
4709         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4710
4711         data.dptr = (uint8_t*)db_prio;
4712         data.dsize = sizeof(*db_prio);
4713
4714         ret = ctdb_control(ctdb, destnode, 0, 
4715                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4716                            tmp_ctx, NULL, &res, &timeout, NULL);
4717         if (ret != 0 || res != 0) {
4718                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4719                 talloc_free(tmp_ctx);
4720                 return -1;
4721         }
4722
4723         talloc_free(tmp_ctx);
4724
4725         return 0;
4726 }
4727
4728 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4729 {
4730         int ret;
4731         int32_t res;
4732         TDB_DATA data;
4733         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4734
4735         data.dptr = (uint8_t*)&db_id;
4736         data.dsize = sizeof(db_id);
4737
4738         ret = ctdb_control(ctdb, destnode, 0, 
4739                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4740                            tmp_ctx, NULL, &res, &timeout, NULL);
4741         if (ret != 0 || res < 0) {
4742                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4743                 talloc_free(tmp_ctx);
4744                 return -1;
4745         }
4746
4747         if (priority) {
4748                 *priority = res;
4749         }
4750
4751         talloc_free(tmp_ctx);
4752
4753         return 0;
4754 }
4755
4756 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4757 {
4758         int ret;
4759         TDB_DATA outdata;
4760         int32_t res;
4761
4762         ret = ctdb_control(ctdb, destnode, 0, 
4763                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4764                            mem_ctx, &outdata, &res, &timeout, NULL);
4765         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4766                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4767                 return -1;
4768         }
4769
4770         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4771         talloc_free(outdata.dptr);
4772                     
4773         return 0;
4774 }
4775
4776 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4777 {
4778         if (h == NULL) {
4779                 return NULL;
4780         }
4781
4782         return &h->header;
4783 }
4784
4785
4786 struct ctdb_client_control_state *
4787 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4788 {
4789         struct ctdb_client_control_state *handle;
4790         struct ctdb_marshall_buffer *m;
4791         struct ctdb_rec_data *rec;
4792         TDB_DATA outdata;
4793
4794         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4795         if (m == NULL) {
4796                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4797                 return NULL;
4798         }
4799
4800         m->db_id = ctdb_db->db_id;
4801
4802         rec = ctdb_marshall_record(m, 0, key, header, data);
4803         if (rec == NULL) {
4804                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4805                 talloc_free(m);
4806                 return NULL;
4807         }
4808         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4809         if (m == NULL) {
4810                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4811                 talloc_free(m);
4812                 return NULL;
4813         }
4814         m->count++;
4815         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4816
4817
4818         outdata.dptr = (uint8_t *)m;
4819         outdata.dsize = talloc_get_size(m);
4820
4821         handle = ctdb_control_send(ctdb, destnode, 0, 
4822                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4823                            mem_ctx, &timeout, NULL);
4824         talloc_free(m);
4825         return handle;
4826 }
4827
4828 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4829 {
4830         int ret;
4831         int32_t res;
4832
4833         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4834         if ( (ret != 0) || (res != 0) ){
4835                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4836                 return -1;
4837         }
4838
4839         return 0;
4840 }
4841
4842 int
4843 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4844 {
4845         struct ctdb_client_control_state *state;
4846
4847         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4848         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4849 }
4850
4851
4852
4853
4854
4855
4856 /*
4857   set a database to be readonly
4858  */
4859 struct ctdb_client_control_state *
4860 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4861 {
4862         TDB_DATA data;
4863
4864         data.dptr = (uint8_t *)&dbid;
4865         data.dsize = sizeof(dbid);
4866
4867         return ctdb_control_send(ctdb, destnode, 0, 
4868                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4869                            ctdb, NULL, NULL);
4870 }
4871
4872 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4873 {
4874         int ret;
4875         int32_t res;
4876
4877         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4878         if (ret != 0 || res != 0) {
4879           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4880                 return -1;
4881         }
4882
4883         return 0;
4884 }
4885
4886 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4887 {
4888         struct ctdb_client_control_state *state;
4889
4890         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4891         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4892 }
4893
4894 /*
4895   set a database to be sticky
4896  */
4897 struct ctdb_client_control_state *
4898 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4899 {
4900         TDB_DATA data;
4901
4902         data.dptr = (uint8_t *)&dbid;
4903         data.dsize = sizeof(dbid);
4904
4905         return ctdb_control_send(ctdb, destnode, 0, 
4906                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4907                            ctdb, NULL, NULL);
4908 }
4909
4910 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4911 {
4912         int ret;
4913         int32_t res;
4914
4915         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4916         if (ret != 0 || res != 0) {
4917                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4918                 return -1;
4919         }
4920
4921         return 0;
4922 }
4923
4924 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4925 {
4926         struct ctdb_client_control_state *state;
4927
4928         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4929         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4930 }