ctdb-daemon: formatting fix
[kai/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tdb_wrap/tdb_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 /*
33   allocate a packet for use in client<->daemon communication
34  */
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36                                             TALLOC_CTX *mem_ctx, 
37                                             enum ctdb_operation operation, 
38                                             size_t length, size_t slength,
39                                             const char *type)
40 {
41         int size;
42         struct ctdb_req_header *hdr;
43
44         length = MAX(length, slength);
45         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46
47         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
48         if (hdr == NULL) {
49                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50                          operation, (unsigned)length));
51                 return NULL;
52         }
53         talloc_set_name_const(hdr, type);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_PROTOCOL;
58         hdr->srcnode      = ctdb->pnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, bool updatetdb)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88         c->header = header;
89
90         for (fn=ctdb_db->calls;fn;fn=fn->next) {
91                 if (fn->id == call->call_id) break;
92         }
93         if (fn == NULL) {
94                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
95                 talloc_free(c);
96                 return -1;
97         }
98
99         if (fn->fn(c) != 0) {
100                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         /* we need to force the record to be written out if this was a remote access */
106         if (c->new_data == NULL) {
107                 c->new_data = &c->record_data;
108         }
109
110         if (c->new_data && updatetdb) {
111                 /* XXX check that we always have the lock here? */
112                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
113                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
114                         talloc_free(c);
115                         return -1;
116                 }
117         }
118
119         if (c->reply_data) {
120                 call->reply_data = *c->reply_data;
121
122                 talloc_steal(call, call->reply_data.dptr);
123                 talloc_set_name_const(call->reply_data.dptr, __location__);
124         } else {
125                 call->reply_data.dptr = NULL;
126                 call->reply_data.dsize = 0;
127         }
128         call->status = c->status;
129
130         talloc_free(c);
131
132         return 0;
133 }
134
135
136 /*
137   queue a packet for sending from client to daemon
138 */
139 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
140 {
141         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
142 }
143
144
145 /*
146   called when a CTDB_REPLY_CALL packet comes in in the client
147
148   This packet comes in response to a CTDB_REQ_CALL request packet. It
149   contains any reply data from the call
150 */
151 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
152 {
153         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
154         struct ctdb_client_call_state *state;
155
156         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
157         if (state == NULL) {
158                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
159                 return;
160         }
161
162         if (hdr->reqid != state->reqid) {
163                 /* we found a record  but it was the wrong one */
164                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
165                 return;
166         }
167
168         state->call->reply_data.dptr = c->data;
169         state->call->reply_data.dsize = c->datalen;
170         state->call->status = c->status;
171
172         talloc_steal(state, c);
173
174         state->state = CTDB_CALL_DONE;
175
176         if (state->async.fn) {
177                 state->async.fn(state);
178         }
179 }
180
181 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
182
183 /*
184   this is called in the client, when data comes in from the daemon
185  */
186 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
187 {
188         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
189         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
190         TALLOC_CTX *tmp_ctx;
191
192         /* place the packet as a child of a tmp_ctx. We then use
193            talloc_free() below to free it. If any of the calls want
194            to keep it, then they will steal it somewhere else, and the
195            talloc_free() will be a no-op */
196         tmp_ctx = talloc_new(ctdb);
197         talloc_steal(tmp_ctx, hdr);
198
199         if (cnt == 0) {
200                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
201                 exit(1);
202         }
203
204         if (cnt < sizeof(*hdr)) {
205                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
206                 goto done;
207         }
208         if (cnt != hdr->length) {
209                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
210                                (unsigned)hdr->length, (unsigned)cnt);
211                 goto done;
212         }
213
214         if (hdr->ctdb_magic != CTDB_MAGIC) {
215                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
216                 goto done;
217         }
218
219         if (hdr->ctdb_version != CTDB_PROTOCOL) {
220                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
221                 goto done;
222         }
223
224         switch (hdr->operation) {
225         case CTDB_REPLY_CALL:
226                 ctdb_client_reply_call(ctdb, hdr);
227                 break;
228
229         case CTDB_REQ_MESSAGE:
230                 ctdb_request_message(ctdb, hdr);
231                 break;
232
233         case CTDB_REPLY_CONTROL:
234                 ctdb_client_reply_control(ctdb, hdr);
235                 break;
236
237         default:
238                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
239         }
240
241 done:
242         talloc_free(tmp_ctx);
243 }
244
245 /*
246   connect to a unix domain socket
247 */
248 int ctdb_socket_connect(struct ctdb_context *ctdb)
249 {
250         struct sockaddr_un addr;
251
252         memset(&addr, 0, sizeof(addr));
253         addr.sun_family = AF_UNIX;
254         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
255
256         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
257         if (ctdb->daemon.sd == -1) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
259                 return -1;
260         }
261
262         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
263                 close(ctdb->daemon.sd);
264                 ctdb->daemon.sd = -1;
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
266                 return -1;
267         }
268
269         set_nonblocking(ctdb->daemon.sd);
270         set_close_on_exec(ctdb->daemon.sd);
271         
272         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
273                                               CTDB_DS_ALIGNMENT, 
274                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
275         return 0;
276 }
277
278
279 struct ctdb_record_handle {
280         struct ctdb_db_context *ctdb_db;
281         TDB_DATA key;
282         TDB_DATA *data;
283         struct ctdb_ltdb_header header;
284 };
285
286
287 /*
288   make a recv call to the local ctdb daemon - called from client context
289
290   This is called when the program wants to wait for a ctdb_call to complete and get the 
291   results. This call will block unless the call has already completed.
292 */
293 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
294 {
295         if (state == NULL) {
296                 return -1;
297         }
298
299         while (state->state < CTDB_CALL_DONE) {
300                 event_loop_once(state->ctdb_db->ctdb->ev);
301         }
302         if (state->state != CTDB_CALL_DONE) {
303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
304                 talloc_free(state);
305                 return -1;
306         }
307
308         if (state->call->reply_data.dsize) {
309                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
310                                                       state->call->reply_data.dptr,
311                                                       state->call->reply_data.dsize);
312                 call->reply_data.dsize = state->call->reply_data.dsize;
313         } else {
314                 call->reply_data.dptr = NULL;
315                 call->reply_data.dsize = 0;
316         }
317         call->status = state->call->status;
318         talloc_free(state);
319
320         return call->status;
321 }
322
323
324
325
326 /*
327   destroy a ctdb_call in client
328 */
329 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
330 {
331         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
332         return 0;
333 }
334
335 /*
336   construct an event driven local ctdb_call
337
338   this is used so that locally processed ctdb_call requests are processed
339   in an event driven manner
340 */
341 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
342                                                                   struct ctdb_call *call,
343                                                                   struct ctdb_ltdb_header *header,
344                                                                   TDB_DATA *data)
345 {
346         struct ctdb_client_call_state *state;
347         struct ctdb_context *ctdb = ctdb_db->ctdb;
348         int ret;
349
350         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
351         CTDB_NO_MEMORY_NULL(ctdb, state);
352         state->call = talloc_zero(state, struct ctdb_call);
353         CTDB_NO_MEMORY_NULL(ctdb, state->call);
354
355         talloc_steal(state, data->dptr);
356
357         state->state   = CTDB_CALL_DONE;
358         *(state->call) = *call;
359         state->ctdb_db = ctdb_db;
360
361         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
362         if (ret != 0) {
363                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
364         }
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                               struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
400                 ret = -1;
401         }
402
403         if (ret == 0 && header.dmaster == ctdb->pnn) {
404                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
405                 talloc_free(data.dptr);
406                 ctdb_ltdb_unlock(ctdb_db, call->key);
407                 return state;
408         }
409
410         ctdb_ltdb_unlock(ctdb_db, call->key);
411         talloc_free(data.dptr);
412
413         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
414         if (state == NULL) {
415                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
416                 return NULL;
417         }
418         state->call = talloc_zero(state, struct ctdb_call);
419         if (state->call == NULL) {
420                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
421                 return NULL;
422         }
423
424         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
425         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
426         if (c == NULL) {
427                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
428                 return NULL;
429         }
430
431         state->reqid     = ctdb_reqid_new(ctdb, state);
432         state->ctdb_db = ctdb_db;
433         talloc_set_destructor(state, ctdb_client_call_destructor);
434
435         c->hdr.reqid     = state->reqid;
436         c->flags         = call->flags;
437         c->db_id         = ctdb_db->db_id;
438         c->callid        = call->call_id;
439         c->hopcount      = 0;
440         c->keylen        = call->key.dsize;
441         c->calldatalen   = call->call_data.dsize;
442         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
443         memcpy(&c->data[call->key.dsize], 
444                call->call_data.dptr, call->call_data.dsize);
445         *(state->call)              = *call;
446         state->call->call_data.dptr = &c->data[call->key.dsize];
447         state->call->key.dptr       = &c->data[0];
448
449         state->state  = CTDB_CALL_WAIT;
450
451
452         ctdb_client_queue_pkt(ctdb, &c->hdr);
453
454         return state;
455 }
456
457
458 /*
459   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
460 */
461 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
462 {
463         struct ctdb_client_call_state *state;
464
465         state = ctdb_call_send(ctdb_db, call);
466         return ctdb_call_recv(state, call);
467 }
468
469
470 /*
471   tell the daemon what messaging srvid we will use, and register the message
472   handler function in the client
473 */
474 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
475                              ctdb_msg_fn_t handler,
476                              void *private_data)
477 {
478         int res;
479         int32_t status;
480
481         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
482                            tdb_null, NULL, NULL, &status, NULL, NULL);
483         if (res != 0 || status != 0) {
484                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
485                 return -1;
486         }
487
488         /* also need to register the handler with our own ctdb structure */
489         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
490 }
491
492 /*
493   tell the daemon we no longer want a srvid
494 */
495 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 {
497         int res;
498         int32_t status;
499
500         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
504                 return -1;
505         }
506
507         /* also need to register the handler with our own ctdb structure */
508         ctdb_deregister_message_handler(ctdb, srvid, private_data);
509         return 0;
510 }
511
512 /*
513  * check server ids
514  */
515 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
516                                        uint8_t *result)
517 {
518         TDB_DATA indata, outdata;
519         int res;
520         int32_t status;
521         int i;
522
523         indata.dptr = (uint8_t *)ids;
524         indata.dsize = num * sizeof(*ids);
525
526         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
527                            indata, ctdb, &outdata, &status, NULL, NULL);
528         if (res != 0 || status != 0) {
529                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
530                 return -1;
531         }
532
533         if (outdata.dsize != num*sizeof(uint8_t)) {
534                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
535                                   (long unsigned int)num*sizeof(uint8_t),
536                                   outdata.dsize));
537                 talloc_free(outdata.dptr);
538                 return -1;
539         }
540
541         for (i=0; i<num; i++) {
542                 result[i] = outdata.dptr[i];
543         }
544
545         talloc_free(outdata.dptr);
546         return 0;
547 }
548
549 /*
550   send a message - from client context
551  */
552 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
553                       uint64_t srvid, TDB_DATA data)
554 {
555         struct ctdb_req_message *r;
556         int len, res;
557
558         len = offsetof(struct ctdb_req_message, data) + data.dsize;
559         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
560                                len, struct ctdb_req_message);
561         CTDB_NO_MEMORY(ctdb, r);
562
563         r->hdr.destnode  = pnn;
564         r->srvid         = srvid;
565         r->datalen       = data.dsize;
566         memcpy(&r->data[0], data.dptr, data.dsize);
567         
568         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
569         talloc_free(r);
570         return res;
571 }
572
573
574 /*
575   cancel a ctdb_fetch_lock operation, releasing the lock
576  */
577 static int fetch_lock_destructor(struct ctdb_record_handle *h)
578 {
579         ctdb_ltdb_unlock(h->ctdb_db, h->key);
580         return 0;
581 }
582
583 /*
584   force the migration of a record to this node
585  */
586 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
587 {
588         struct ctdb_call call;
589         ZERO_STRUCT(call);
590         call.call_id = CTDB_NULL_FUNC;
591         call.key = key;
592         call.flags = CTDB_IMMEDIATE_MIGRATION;
593         return ctdb_call(ctdb_db, &call);
594 }
595
596 /*
597   try to fetch a readonly copy of a record
598  */
599 static int
600 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
601 {
602         int ret;
603
604         struct ctdb_call call;
605         ZERO_STRUCT(call);
606
607         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
608         call.call_data.dptr = NULL;
609         call.call_data.dsize = 0;
610         call.key = key;
611         call.flags = CTDB_WANT_READONLY;
612         ret = ctdb_call(ctdb_db, &call);
613
614         if (ret != 0) {
615                 return -1;
616         }
617         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
618                 return -1;
619         }
620
621         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
622         if (*hdr == NULL) {
623                 talloc_free(call.reply_data.dptr);
624                 return -1;
625         }
626
627         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
628         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
629         if (data->dptr == NULL) {
630                 talloc_free(call.reply_data.dptr);
631                 talloc_free(hdr);
632                 return -1;
633         }
634
635         return 0;
636 }
637
638 /*
639   get a lock on a record, and return the records data. Blocks until it gets the lock
640  */
641 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
642                                            TDB_DATA key, TDB_DATA *data)
643 {
644         int ret;
645         struct ctdb_record_handle *h;
646
647         /*
648           procedure is as follows:
649
650           1) get the chain lock. 
651           2) check if we are dmaster
652           3) if we are the dmaster then return handle 
653           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
654              reply from ctdbd
655           5) when we get the reply, goto (1)
656          */
657
658         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
659         if (h == NULL) {
660                 return NULL;
661         }
662
663         h->ctdb_db = ctdb_db;
664         h->key     = key;
665         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
666         if (h->key.dptr == NULL) {
667                 talloc_free(h);
668                 return NULL;
669         }
670         h->data    = data;
671
672         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
673                  (const char *)key.dptr));
674
675 again:
676         /* step 1 - get the chain lock */
677         ret = ctdb_ltdb_lock(ctdb_db, key);
678         if (ret != 0) {
679                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
680                 talloc_free(h);
681                 return NULL;
682         }
683
684         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
685
686         talloc_set_destructor(h, fetch_lock_destructor);
687
688         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
689
690         /* when torturing, ensure we test the remote path */
691         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
692             random() % 5 == 0) {
693                 h->header.dmaster = (uint32_t)-1;
694         }
695
696
697         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
698
699         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
700                 ctdb_ltdb_unlock(ctdb_db, key);
701                 ret = ctdb_client_force_migration(ctdb_db, key);
702                 if (ret != 0) {
703                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
704                         talloc_free(h);
705                         return NULL;
706                 }
707                 goto again;
708         }
709
710         /* if this is a request for read/write and we have delegations
711            we have to revoke all delegations first
712         */
713         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
714             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
715                 ctdb_ltdb_unlock(ctdb_db, key);
716                 ret = ctdb_client_force_migration(ctdb_db, key);
717                 if (ret != 0) {
718                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
719                         talloc_free(h);
720                         return NULL;
721                 }
722                 goto again;
723         }
724
725         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
726         return h;
727 }
728
729 /*
730   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
731  */
732 struct ctdb_record_handle *
733 ctdb_fetch_readonly_lock(
734         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
735         TDB_DATA key, TDB_DATA *data,
736         int read_only)
737 {
738         int ret;
739         struct ctdb_record_handle *h;
740         struct ctdb_ltdb_header *roheader = NULL;
741
742         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
743         if (h == NULL) {
744                 return NULL;
745         }
746
747         h->ctdb_db = ctdb_db;
748         h->key     = key;
749         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
750         if (h->key.dptr == NULL) {
751                 talloc_free(h);
752                 return NULL;
753         }
754         h->data    = data;
755
756         data->dptr = NULL;
757         data->dsize = 0;
758
759
760 again:
761         talloc_free(roheader);
762         roheader = NULL;
763
764         talloc_free(data->dptr);
765         data->dptr = NULL;
766         data->dsize = 0;
767
768         /* Lock the record/chain */
769         ret = ctdb_ltdb_lock(ctdb_db, key);
770         if (ret != 0) {
771                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
772                 talloc_free(h);
773                 return NULL;
774         }
775
776         talloc_set_destructor(h, fetch_lock_destructor);
777
778         /* Check if record exists yet in the TDB */
779         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
780         if (ret != 0) {
781                 ctdb_ltdb_unlock(ctdb_db, key);
782                 ret = ctdb_client_force_migration(ctdb_db, key);
783                 if (ret != 0) {
784                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
785                         talloc_free(h);
786                         return NULL;
787                 }
788                 goto again;
789         }
790
791         /* if this is a request for read/write and we have delegations
792            we have to revoke all delegations first
793         */
794         if ((read_only == 0) 
795         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
796         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
797                 ctdb_ltdb_unlock(ctdb_db, key);
798                 ret = ctdb_client_force_migration(ctdb_db, key);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
801                         talloc_free(h);
802                         return NULL;
803                 }
804                 goto again;
805         }
806
807         /* if we are dmaster, just return the handle */
808         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
809                 return h;
810         }
811
812         if (read_only != 0) {
813                 TDB_DATA rodata = {NULL, 0};
814
815                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
816                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
817                         return h;
818                 }
819
820                 ctdb_ltdb_unlock(ctdb_db, key);
821                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
822                 if (ret != 0) {
823                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
824                         ret = ctdb_client_force_migration(ctdb_db, key);
825                         if (ret != 0) {
826                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
827                                 talloc_free(h);
828                                 return NULL;
829                         }
830
831                         goto again;
832                 }
833
834                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
835                         ret = ctdb_client_force_migration(ctdb_db, key);
836                         if (ret != 0) {
837                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                                 talloc_free(h);
839                                 return NULL;
840                         }
841
842                         goto again;
843                 }
844
845                 ret = ctdb_ltdb_lock(ctdb_db, key);
846                 if (ret != 0) {
847                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
848                         talloc_free(h);
849                         return NULL;
850                 }
851
852                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
853                 if (ret != 0) {
854                         ctdb_ltdb_unlock(ctdb_db, key);
855
856                         ret = ctdb_client_force_migration(ctdb_db, key);
857                         if (ret != 0) {
858                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
859                                 talloc_free(h);
860                                 return NULL;
861                         }
862
863                         goto again;
864                 }
865
866                 return h;
867         }
868
869         /* we are not dmaster and this was not a request for a readonly lock
870          * so unlock the record, migrate it and try again
871          */
872         ctdb_ltdb_unlock(ctdb_db, key);
873         ret = ctdb_client_force_migration(ctdb_db, key);
874         if (ret != 0) {
875                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
876                 talloc_free(h);
877                 return NULL;
878         }
879         goto again;
880 }
881
882 /*
883   store some data to the record that was locked with ctdb_fetch_lock()
884 */
885 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
886 {
887         if (h->ctdb_db->persistent) {
888                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
889                 return -1;
890         }
891
892         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
893 }
894
895 /*
896   non-locking fetch of a record
897  */
898 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
899                TDB_DATA key, TDB_DATA *data)
900 {
901         struct ctdb_call call;
902         int ret;
903
904         call.call_id = CTDB_FETCH_FUNC;
905         call.call_data.dptr = NULL;
906         call.call_data.dsize = 0;
907         call.key = key;
908
909         ret = ctdb_call(ctdb_db, &call);
910
911         if (ret == 0) {
912                 *data = call.reply_data;
913                 talloc_steal(mem_ctx, data->dptr);
914         }
915
916         return ret;
917 }
918
919
920
921 /*
922    called when a control completes or timesout to invoke the callback
923    function the user provided
924 */
925 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
926         struct timeval t, void *private_data)
927 {
928         struct ctdb_client_control_state *state;
929         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
930         int ret;
931
932         state = talloc_get_type(private_data, struct ctdb_client_control_state);
933         talloc_steal(tmp_ctx, state);
934
935         ret = ctdb_control_recv(state->ctdb, state, state,
936                         NULL, 
937                         NULL, 
938                         NULL);
939         if (ret != 0) {
940                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
941         }
942
943         talloc_free(tmp_ctx);
944 }
945
946 /*
947   called when a CTDB_REPLY_CONTROL packet comes in in the client
948
949   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
950   contains any reply data from the control
951 */
952 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
953                                       struct ctdb_req_header *hdr)
954 {
955         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
956         struct ctdb_client_control_state *state;
957
958         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
959         if (state == NULL) {
960                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
961                 return;
962         }
963
964         if (hdr->reqid != state->reqid) {
965                 /* we found a record  but it was the wrong one */
966                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
967                 return;
968         }
969
970         state->outdata.dptr = c->data;
971         state->outdata.dsize = c->datalen;
972         state->status = c->status;
973         if (c->errorlen) {
974                 state->errormsg = talloc_strndup(state, 
975                                                  (char *)&c->data[c->datalen], 
976                                                  c->errorlen);
977         }
978
979         /* state->outdata now uses resources from c so we dont want c
980            to just dissappear from under us while state is still alive
981         */
982         talloc_steal(state, c);
983
984         state->state = CTDB_CONTROL_DONE;
985
986         /* if we had a callback registered for this control, pull the response
987            and call the callback.
988         */
989         if (state->async.fn) {
990                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
991         }
992 }
993
994
995 /*
996   destroy a ctdb_control in client
997 */
998 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
999 {
1000         ctdb_reqid_remove(state->ctdb, state->reqid);
1001         return 0;
1002 }
1003
1004
1005 /* time out handler for ctdb_control */
1006 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1007         struct timeval t, void *private_data)
1008 {
1009         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1010
1011         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1012                          "dstnode:%u\n", state->reqid, state->c->opcode,
1013                          state->c->hdr.destnode));
1014
1015         state->state = CTDB_CONTROL_TIMEOUT;
1016
1017         /* if we had a callback registered for this control, pull the response
1018            and call the callback.
1019         */
1020         if (state->async.fn) {
1021                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1022         }
1023 }
1024
1025 /* async version of send control request */
1026 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1027                 uint32_t destnode, uint64_t srvid, 
1028                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1029                 TALLOC_CTX *mem_ctx,
1030                 struct timeval *timeout,
1031                 char **errormsg)
1032 {
1033         struct ctdb_client_control_state *state;
1034         size_t len;
1035         struct ctdb_req_control *c;
1036         int ret;
1037
1038         if (errormsg) {
1039                 *errormsg = NULL;
1040         }
1041
1042         /* if the domain socket is not yet open, open it */
1043         if (ctdb->daemon.sd==-1) {
1044                 ctdb_socket_connect(ctdb);
1045         }
1046
1047         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1048         CTDB_NO_MEMORY_NULL(ctdb, state);
1049
1050         state->ctdb       = ctdb;
1051         state->reqid      = ctdb_reqid_new(ctdb, state);
1052         state->state      = CTDB_CONTROL_WAIT;
1053         state->errormsg   = NULL;
1054
1055         talloc_set_destructor(state, ctdb_client_control_destructor);
1056
1057         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1058         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1059                                len, struct ctdb_req_control);
1060         state->c            = c;        
1061         CTDB_NO_MEMORY_NULL(ctdb, c);
1062         c->hdr.reqid        = state->reqid;
1063         c->hdr.destnode     = destnode;
1064         c->opcode           = opcode;
1065         c->client_id        = 0;
1066         c->flags            = flags;
1067         c->srvid            = srvid;
1068         c->datalen          = data.dsize;
1069         if (data.dsize) {
1070                 memcpy(&c->data[0], data.dptr, data.dsize);
1071         }
1072
1073         /* timeout */
1074         if (timeout && !timeval_is_zero(timeout)) {
1075                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1076         }
1077
1078         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1079         if (ret != 0) {
1080                 talloc_free(state);
1081                 return NULL;
1082         }
1083
1084         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1085                 talloc_free(state);
1086                 return NULL;
1087         }
1088
1089         return state;
1090 }
1091
1092
1093 /* async version of receive control reply */
1094 int ctdb_control_recv(struct ctdb_context *ctdb, 
1095                 struct ctdb_client_control_state *state, 
1096                 TALLOC_CTX *mem_ctx,
1097                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1098 {
1099         TALLOC_CTX *tmp_ctx;
1100
1101         if (status != NULL) {
1102                 *status = -1;
1103         }
1104         if (errormsg != NULL) {
1105                 *errormsg = NULL;
1106         }
1107
1108         if (state == NULL) {
1109                 return -1;
1110         }
1111
1112         /* prevent double free of state */
1113         tmp_ctx = talloc_new(ctdb);
1114         talloc_steal(tmp_ctx, state);
1115
1116         /* loop one event at a time until we either timeout or the control
1117            completes.
1118         */
1119         while (state->state == CTDB_CONTROL_WAIT) {
1120                 event_loop_once(ctdb->ev);
1121         }
1122
1123         if (state->state != CTDB_CONTROL_DONE) {
1124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1125                 if (state->async.fn) {
1126                         state->async.fn(state);
1127                 }
1128                 talloc_free(tmp_ctx);
1129                 return -1;
1130         }
1131
1132         if (state->errormsg) {
1133                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1134                 if (errormsg) {
1135                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1136                 }
1137                 if (state->async.fn) {
1138                         state->async.fn(state);
1139                 }
1140                 talloc_free(tmp_ctx);
1141                 return (status == 0 ? -1 : state->status);
1142         }
1143
1144         if (outdata) {
1145                 *outdata = state->outdata;
1146                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1147         }
1148
1149         if (status) {
1150                 *status = state->status;
1151         }
1152
1153         if (state->async.fn) {
1154                 state->async.fn(state);
1155         }
1156
1157         talloc_free(tmp_ctx);
1158         return 0;
1159 }
1160
1161
1162
1163 /*
1164   send a ctdb control message
1165   timeout specifies how long we should wait for a reply.
1166   if timeout is NULL we wait indefinitely
1167  */
1168 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1169                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1170                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1171                  struct timeval *timeout,
1172                  char **errormsg)
1173 {
1174         struct ctdb_client_control_state *state;
1175
1176         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1177                         flags, data, mem_ctx,
1178                         timeout, errormsg);
1179
1180         /* FIXME: Error conditions in ctdb_control_send return NULL without
1181          * setting errormsg.  So, there is no way to distinguish between sucess
1182          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1183         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1184                 if (status != NULL) {
1185                         *status = 0;
1186                 }
1187                 return 0;
1188         }
1189
1190         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1191                         errormsg);
1192 }
1193
1194
1195
1196
1197 /*
1198   a process exists call. Returns 0 if process exists, -1 otherwise
1199  */
1200 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1201 {
1202         int ret;
1203         TDB_DATA data;
1204         int32_t status;
1205
1206         data.dptr = (uint8_t*)&pid;
1207         data.dsize = sizeof(pid);
1208
1209         ret = ctdb_control(ctdb, destnode, 0, 
1210                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1211                            NULL, NULL, &status, NULL, NULL);
1212         if (ret != 0) {
1213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1214                 return -1;
1215         }
1216
1217         return status;
1218 }
1219
1220 /*
1221   get remote statistics
1222  */
1223 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1224 {
1225         int ret;
1226         TDB_DATA data;
1227         int32_t res;
1228
1229         ret = ctdb_control(ctdb, destnode, 0, 
1230                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1231                            ctdb, &data, &res, NULL, NULL);
1232         if (ret != 0 || res != 0) {
1233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1234                 return -1;
1235         }
1236
1237         if (data.dsize != sizeof(struct ctdb_statistics)) {
1238                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1239                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1240                       return -1;
1241         }
1242
1243         *status = *(struct ctdb_statistics *)data.dptr;
1244         talloc_free(data.dptr);
1245                         
1246         return 0;
1247 }
1248
1249 /*
1250  * get db statistics
1251  */
1252 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1253                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1254 {
1255         int ret;
1256         TDB_DATA indata, outdata;
1257         int32_t res;
1258         struct ctdb_db_statistics *wire, *s;
1259         char *ptr;
1260         int i;
1261
1262         indata.dptr = (uint8_t *)&dbid;
1263         indata.dsize = sizeof(dbid);
1264
1265         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1266                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1267         if (ret != 0 || res != 0) {
1268                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1269                 return -1;
1270         }
1271
1272         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1273                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1274                                  outdata.dsize,
1275                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1276                 return -1;
1277         }
1278
1279         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1280         if (s == NULL) {
1281                 talloc_free(outdata.dptr);
1282                 CTDB_NO_MEMORY(ctdb, s);
1283         }
1284
1285         wire = (struct ctdb_db_statistics *)outdata.dptr;
1286         memcpy(s, wire, offsetof(struct ctdb_db_statistics, hot_keys_wire));
1287         ptr = &wire->hot_keys_wire[0];
1288         for (i=0; i<wire->num_hot_keys; i++) {
1289                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1290                 if (s->hot_keys[i].key.dptr == NULL) {
1291                         talloc_free(outdata.dptr);
1292                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1293                 }
1294
1295                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1296                 ptr += wire->hot_keys[i].key.dsize;
1297         }
1298
1299         talloc_free(outdata.dptr);
1300         *dbstat = s;
1301         return 0;
1302 }
1303
1304 /*
1305   shutdown a remote ctdb node
1306  */
1307 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_control_send(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1313                            NULL, &timeout, NULL);
1314         if (state == NULL) {
1315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1316                 return -1;
1317         }
1318
1319         return 0;
1320 }
1321
1322 /*
1323   get vnn map from a remote node
1324  */
1325 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1326 {
1327         int ret;
1328         TDB_DATA outdata;
1329         int32_t res;
1330         struct ctdb_vnn_map_wire *map;
1331
1332         ret = ctdb_control(ctdb, destnode, 0, 
1333                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1334                            mem_ctx, &outdata, &res, &timeout, NULL);
1335         if (ret != 0 || res != 0) {
1336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1337                 return -1;
1338         }
1339         
1340         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1341         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1342             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1343                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1344                 return -1;
1345         }
1346
1347         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1348         CTDB_NO_MEMORY(ctdb, *vnnmap);
1349         (*vnnmap)->generation = map->generation;
1350         (*vnnmap)->size       = map->size;
1351         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1352
1353         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1354         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1355         talloc_free(outdata.dptr);
1356                     
1357         return 0;
1358 }
1359
1360
1361 /*
1362   get the recovery mode of a remote node
1363  */
1364 struct ctdb_client_control_state *
1365 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1366 {
1367         return ctdb_control_send(ctdb, destnode, 0, 
1368                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1369                            mem_ctx, &timeout, NULL);
1370 }
1371
1372 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1373 {
1374         int ret;
1375         int32_t res;
1376
1377         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1380                 return -1;
1381         }
1382
1383         if (recmode) {
1384                 *recmode = (uint32_t)res;
1385         }
1386
1387         return 0;
1388 }
1389
1390 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1391 {
1392         struct ctdb_client_control_state *state;
1393
1394         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1395         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1396 }
1397
1398
1399
1400
1401 /*
1402   set the recovery mode of a remote node
1403  */
1404 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1405 {
1406         int ret;
1407         TDB_DATA data;
1408         int32_t res;
1409
1410         data.dsize = sizeof(uint32_t);
1411         data.dptr = (unsigned char *)&recmode;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424
1425
1426 /*
1427   get the recovery master of a remote node
1428  */
1429 struct ctdb_client_control_state *
1430 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1431                         struct timeval timeout, uint32_t destnode)
1432 {
1433         return ctdb_control_send(ctdb, destnode, 0, 
1434                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1435                            mem_ctx, &timeout, NULL);
1436 }
1437
1438 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1439 {
1440         int ret;
1441         int32_t res;
1442
1443         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1446                 return -1;
1447         }
1448
1449         if (recmaster) {
1450                 *recmaster = (uint32_t)res;
1451         }
1452
1453         return 0;
1454 }
1455
1456 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1457 {
1458         struct ctdb_client_control_state *state;
1459
1460         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1461         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1462 }
1463
1464
1465 /*
1466   set the recovery master of a remote node
1467  */
1468 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1469 {
1470         int ret;
1471         TDB_DATA data;
1472         int32_t res;
1473
1474         ZERO_STRUCT(data);
1475         data.dsize = sizeof(uint32_t);
1476         data.dptr = (unsigned char *)&recmaster;
1477
1478         ret = ctdb_control(ctdb, destnode, 0, 
1479                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1480                            NULL, NULL, &res, &timeout, NULL);
1481         if (ret != 0 || res != 0) {
1482                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1483                 return -1;
1484         }
1485
1486         return 0;
1487 }
1488
1489
1490 /*
1491   get a list of databases off a remote node
1492  */
1493 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1494                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1495 {
1496         int ret;
1497         TDB_DATA outdata;
1498         int32_t res;
1499
1500         ret = ctdb_control(ctdb, destnode, 0, 
1501                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1502                            mem_ctx, &outdata, &res, &timeout, NULL);
1503         if (ret != 0 || res != 0) {
1504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1505                 return -1;
1506         }
1507
1508         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1509         talloc_free(outdata.dptr);
1510                     
1511         return 0;
1512 }
1513
1514 /*
1515   get a list of nodes (vnn and flags ) from a remote node
1516  */
1517 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1518                 struct timeval timeout, uint32_t destnode, 
1519                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1520 {
1521         int ret;
1522         TDB_DATA outdata;
1523         int32_t res;
1524
1525         ret = ctdb_control(ctdb, destnode, 0,
1526                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1527                            mem_ctx, &outdata, &res, &timeout, NULL);
1528         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1530                 return -1;
1531         }
1532
1533         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1534         talloc_free(outdata.dptr);
1535         return 0;
1536 }
1537
1538 /*
1539   load nodes file on a remote node and return as a node map
1540  */
1541 int ctdb_ctrl_getnodesfile(struct ctdb_context *ctdb,
1542                            struct timeval timeout, uint32_t destnode,
1543                            TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1544 {
1545         int ret;
1546         TDB_DATA outdata;
1547         int32_t res;
1548
1549         ret = ctdb_control(ctdb, destnode, 0,
1550                            CTDB_CONTROL_GET_NODES_FILE, 0, tdb_null,
1551                            mem_ctx, &outdata, &res, &timeout, NULL);
1552         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1553                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1554                 return -1;
1555         }
1556
1557         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1558         talloc_free(outdata.dptr);
1559
1560         return 0;
1561 }
1562
1563 /*
1564   drop the transport, reload the nodes file and restart the transport
1565  */
1566 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1567                     struct timeval timeout, uint32_t destnode)
1568 {
1569         int ret;
1570         int32_t res;
1571
1572         ret = ctdb_control(ctdb, destnode, 0, 
1573                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1574                            NULL, NULL, &res, &timeout, NULL);
1575         if (ret != 0 || res != 0) {
1576                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1577                 return -1;
1578         }
1579
1580         return 0;
1581 }
1582
1583
1584 /*
1585   set vnn map on a node
1586  */
1587 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1588                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1589 {
1590         int ret;
1591         TDB_DATA data;
1592         int32_t res;
1593         struct ctdb_vnn_map_wire *map;
1594         size_t len;
1595
1596         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1597         map = talloc_size(mem_ctx, len);
1598         CTDB_NO_MEMORY(ctdb, map);
1599
1600         map->generation = vnnmap->generation;
1601         map->size = vnnmap->size;
1602         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1603         
1604         data.dsize = len;
1605         data.dptr  = (uint8_t *)map;
1606
1607         ret = ctdb_control(ctdb, destnode, 0, 
1608                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1609                            NULL, NULL, &res, &timeout, NULL);
1610         if (ret != 0 || res != 0) {
1611                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1612                 return -1;
1613         }
1614
1615         talloc_free(map);
1616
1617         return 0;
1618 }
1619
1620
1621 /*
1622   async send for pull database
1623  */
1624 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1625         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1626         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1627 {
1628         TDB_DATA indata;
1629         struct ctdb_control_pulldb *pull;
1630         struct ctdb_client_control_state *state;
1631
1632         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1633         CTDB_NO_MEMORY_NULL(ctdb, pull);
1634
1635         pull->db_id   = dbid;
1636         pull->lmaster = lmaster;
1637
1638         indata.dsize = sizeof(struct ctdb_control_pulldb);
1639         indata.dptr  = (unsigned char *)pull;
1640
1641         state = ctdb_control_send(ctdb, destnode, 0, 
1642                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1643                                   mem_ctx, &timeout, NULL);
1644         talloc_free(pull);
1645
1646         return state;
1647 }
1648
1649 /*
1650   async recv for pull database
1651  */
1652 int ctdb_ctrl_pulldb_recv(
1653         struct ctdb_context *ctdb, 
1654         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1655         TDB_DATA *outdata)
1656 {
1657         int ret;
1658         int32_t res;
1659
1660         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1661         if ( (ret != 0) || (res != 0) ){
1662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1663                 return -1;
1664         }
1665
1666         return 0;
1667 }
1668
1669 /*
1670   pull all keys and records for a specific database on a node
1671  */
1672 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1673                 uint32_t dbid, uint32_t lmaster, 
1674                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1675                 TDB_DATA *outdata)
1676 {
1677         struct ctdb_client_control_state *state;
1678
1679         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1680                                       timeout);
1681         
1682         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1683 }
1684
1685
1686 /*
1687   change dmaster for all keys in the database to the new value
1688  */
1689 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1690                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1691 {
1692         int ret;
1693         TDB_DATA indata;
1694         int32_t res;
1695
1696         indata.dsize = 2*sizeof(uint32_t);
1697         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1698
1699         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1700         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1701
1702         ret = ctdb_control(ctdb, destnode, 0, 
1703                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1704                            NULL, NULL, &res, &timeout, NULL);
1705         if (ret != 0 || res != 0) {
1706                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1707                 return -1;
1708         }
1709
1710         return 0;
1711 }
1712
1713 /*
1714   ping a node, return number of clients connected
1715  */
1716 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1717 {
1718         int ret;
1719         int32_t res;
1720
1721         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1722                            tdb_null, NULL, NULL, &res, NULL, NULL);
1723         if (ret != 0) {
1724                 return -1;
1725         }
1726         return res;
1727 }
1728
1729 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1730                            struct timeval timeout, 
1731                            uint32_t destnode,
1732                            uint32_t *runstate)
1733 {
1734         TDB_DATA outdata;
1735         int32_t res;
1736         int ret;
1737
1738         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1739                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1740         if (ret != 0 || res != 0) {
1741                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1742                 return ret != 0 ? ret : res;
1743         }
1744
1745         if (outdata.dsize != sizeof(uint32_t)) {
1746                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1747                 talloc_free(outdata.dptr);
1748                 return -1;
1749         }
1750
1751         if (runstate != NULL) {
1752                 *runstate = *(uint32_t *)outdata.dptr;
1753         }
1754         talloc_free(outdata.dptr);
1755
1756         return 0;
1757 }
1758
1759 /*
1760   find the real path to a ltdb 
1761  */
1762 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1763                    const char **path)
1764 {
1765         int ret;
1766         int32_t res;
1767         TDB_DATA data;
1768
1769         data.dptr = (uint8_t *)&dbid;
1770         data.dsize = sizeof(dbid);
1771
1772         ret = ctdb_control(ctdb, destnode, 0, 
1773                            CTDB_CONTROL_GETDBPATH, 0, data, 
1774                            mem_ctx, &data, &res, &timeout, NULL);
1775         if (ret != 0 || res != 0) {
1776                 return -1;
1777         }
1778
1779         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1780         if ((*path) == NULL) {
1781                 return -1;
1782         }
1783
1784         talloc_free(data.dptr);
1785
1786         return 0;
1787 }
1788
1789 /*
1790   find the name of a db 
1791  */
1792 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1793                    const char **name)
1794 {
1795         int ret;
1796         int32_t res;
1797         TDB_DATA data;
1798
1799         data.dptr = (uint8_t *)&dbid;
1800         data.dsize = sizeof(dbid);
1801
1802         ret = ctdb_control(ctdb, destnode, 0, 
1803                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1804                            mem_ctx, &data, &res, &timeout, NULL);
1805         if (ret != 0 || res != 0) {
1806                 return -1;
1807         }
1808
1809         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1810         if ((*name) == NULL) {
1811                 return -1;
1812         }
1813
1814         talloc_free(data.dptr);
1815
1816         return 0;
1817 }
1818
1819 /*
1820   get the health status of a db
1821  */
1822 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1823                           struct timeval timeout,
1824                           uint32_t destnode,
1825                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1826                           const char **reason)
1827 {
1828         int ret;
1829         int32_t res;
1830         TDB_DATA data;
1831
1832         data.dptr = (uint8_t *)&dbid;
1833         data.dsize = sizeof(dbid);
1834
1835         ret = ctdb_control(ctdb, destnode, 0,
1836                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1837                            mem_ctx, &data, &res, &timeout, NULL);
1838         if (ret != 0 || res != 0) {
1839                 return -1;
1840         }
1841
1842         if (data.dsize == 0) {
1843                 (*reason) = NULL;
1844                 return 0;
1845         }
1846
1847         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1848         if ((*reason) == NULL) {
1849                 return -1;
1850         }
1851
1852         talloc_free(data.dptr);
1853
1854         return 0;
1855 }
1856
1857 /*
1858  * get db sequence number
1859  */
1860 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1861                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1862 {
1863         int ret;
1864         int32_t res;
1865         TDB_DATA data, outdata;
1866
1867         data.dptr = (uint8_t *)&dbid;
1868         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1869
1870         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1871                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1872         if (ret != 0 || res != 0) {
1873                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1874                 return -1;
1875         }
1876
1877         if (outdata.dsize != sizeof(uint64_t)) {
1878                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1879                 talloc_free(outdata.dptr);
1880                 return -1;
1881         }
1882
1883         if (seqnum != NULL) {
1884                 *seqnum = *(uint64_t *)outdata.dptr;
1885         }
1886         talloc_free(outdata.dptr);
1887
1888         return 0;
1889 }
1890
1891 /*
1892   create a database
1893  */
1894 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1895                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1896 {
1897         int ret;
1898         int32_t res;
1899         TDB_DATA data;
1900         uint64_t tdb_flags = 0;
1901
1902         data.dptr = discard_const(name);
1903         data.dsize = strlen(name)+1;
1904
1905         /* Make sure that volatile databases use jenkins hash */
1906         if (!persistent) {
1907                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1908         }
1909
1910 #ifdef TDB_MUTEX_LOCKING
1911         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1912                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
1913         }
1914 #endif
1915
1916         ret = ctdb_control(ctdb, destnode, tdb_flags,
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1918                            0, data, 
1919                            mem_ctx, &data, &res, &timeout, NULL);
1920
1921         if (ret != 0 || res != 0) {
1922                 return -1;
1923         }
1924
1925         return 0;
1926 }
1927
1928 /*
1929   get debug level on a node
1930  */
1931 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1932 {
1933         int ret;
1934         int32_t res;
1935         TDB_DATA data;
1936
1937         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1938                            ctdb, &data, &res, NULL, NULL);
1939         if (ret != 0 || res != 0) {
1940                 return -1;
1941         }
1942         if (data.dsize != sizeof(int32_t)) {
1943                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1944                          (unsigned)data.dsize));
1945                 return -1;
1946         }
1947         *level = *(int32_t *)data.dptr;
1948         talloc_free(data.dptr);
1949         return 0;
1950 }
1951
1952 /*
1953   set debug level on a node
1954  */
1955 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1956 {
1957         int ret;
1958         int32_t res;
1959         TDB_DATA data;
1960
1961         data.dptr = (uint8_t *)&level;
1962         data.dsize = sizeof(level);
1963
1964         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1965                            NULL, NULL, &res, NULL, NULL);
1966         if (ret != 0 || res != 0) {
1967                 return -1;
1968         }
1969         return 0;
1970 }
1971
1972
1973 /*
1974   get a list of connected nodes
1975  */
1976 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1977                                 struct timeval timeout,
1978                                 TALLOC_CTX *mem_ctx,
1979                                 uint32_t *num_nodes)
1980 {
1981         struct ctdb_node_map *map=NULL;
1982         int ret, i;
1983         uint32_t *nodes;
1984
1985         *num_nodes = 0;
1986
1987         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1988         if (ret != 0) {
1989                 return NULL;
1990         }
1991
1992         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1993         if (nodes == NULL) {
1994                 return NULL;
1995         }
1996
1997         for (i=0;i<map->num;i++) {
1998                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1999                         nodes[*num_nodes] = map->nodes[i].pnn;
2000                         (*num_nodes)++;
2001                 }
2002         }
2003
2004         return nodes;
2005 }
2006
2007
2008 /*
2009   reset remote status
2010  */
2011 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2012 {
2013         int ret;
2014         int32_t res;
2015
2016         ret = ctdb_control(ctdb, destnode, 0, 
2017                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2018                            NULL, NULL, &res, NULL, NULL);
2019         if (ret != 0 || res != 0) {
2020                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2021                 return -1;
2022         }
2023         return 0;
2024 }
2025
2026 /*
2027   attach to a specific database - client call
2028 */
2029 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2030                                     struct timeval timeout,
2031                                     const char *name,
2032                                     bool persistent,
2033                                     uint32_t tdb_flags)
2034 {
2035         struct ctdb_db_context *ctdb_db;
2036         TDB_DATA data;
2037         int ret;
2038         int32_t res;
2039 #ifdef TDB_MUTEX_LOCKING
2040         uint32_t mutex_enabled = 0;
2041 #endif
2042
2043         ctdb_db = ctdb_db_handle(ctdb, name);
2044         if (ctdb_db) {
2045                 return ctdb_db;
2046         }
2047
2048         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2049         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2050
2051         ctdb_db->ctdb = ctdb;
2052         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2053         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2054
2055         data.dptr = discard_const(name);
2056         data.dsize = strlen(name)+1;
2057
2058         /* CTDB has switched to using jenkins hash for volatile databases.
2059          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2060          * always set it.
2061          */
2062         if (!persistent) {
2063                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2064         }
2065
2066 #ifdef TDB_MUTEX_LOCKING
2067         if (!persistent) {
2068                 ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
2069                                             CTDB_CURRENT_NODE,
2070                                             "TDBMutexEnabled",
2071                                             &mutex_enabled);
2072                 if (ret != 0) {
2073                         DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
2074                 }
2075
2076                 if (mutex_enabled == 1) {
2077                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2078                 }
2079         }
2080 #endif
2081
2082         /* tell ctdb daemon to attach */
2083         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2084                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2085                            0, data, ctdb_db, &data, &res, NULL, NULL);
2086         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2087                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2088                 talloc_free(ctdb_db);
2089                 return NULL;
2090         }
2091         
2092         ctdb_db->db_id = *(uint32_t *)data.dptr;
2093         talloc_free(data.dptr);
2094
2095         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2096         if (ret != 0) {
2097                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2098                 talloc_free(ctdb_db);
2099                 return NULL;
2100         }
2101
2102         if (persistent) {
2103                 tdb_flags = TDB_DEFAULT;
2104         } else {
2105                 tdb_flags = TDB_NOSYNC;
2106 #ifdef TDB_MUTEX_LOCKING
2107                 if (mutex_enabled) {
2108                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2109                 }
2110 #endif
2111         }
2112         if (ctdb->valgrinding) {
2113                 tdb_flags |= TDB_NOMMAP;
2114         }
2115         tdb_flags |= TDB_DISALLOW_NESTING;
2116
2117         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2118                                       O_RDWR, 0);
2119         if (ctdb_db->ltdb == NULL) {
2120                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2121                 talloc_free(ctdb_db);
2122                 return NULL;
2123         }
2124
2125         ctdb_db->persistent = persistent;
2126
2127         DLIST_ADD(ctdb->db_list, ctdb_db);
2128
2129         /* add well known functions */
2130         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2131         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2132         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2133
2134         return ctdb_db;
2135 }
2136
2137 /*
2138  * detach from a specific database - client call
2139  */
2140 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2141 {
2142         int ret;
2143         int32_t status;
2144         TDB_DATA data;
2145
2146         data.dsize = sizeof(db_id);
2147         data.dptr = (uint8_t *)&db_id;
2148
2149         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2150                            0, data, NULL, NULL, &status, NULL, NULL);
2151         if (ret != 0 || status != 0) {
2152                 return -1;
2153         }
2154         return 0;
2155 }
2156
2157 /*
2158   setup a call for a database
2159  */
2160 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2161 {
2162         struct ctdb_registered_call *call;
2163
2164         /* register locally */
2165         call = talloc(ctdb_db, struct ctdb_registered_call);
2166         call->fn = fn;
2167         call->id = id;
2168
2169         DLIST_ADD(ctdb_db->calls, call);
2170         return 0;
2171 }
2172
2173
2174 struct traverse_state {
2175         bool done;
2176         uint32_t count;
2177         ctdb_traverse_func fn;
2178         void *private_data;
2179         bool listemptyrecords;
2180 };
2181
2182 /*
2183   called on each key during a ctdb_traverse
2184  */
2185 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2186 {
2187         struct traverse_state *state = (struct traverse_state *)p;
2188         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2189         TDB_DATA key;
2190
2191         if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) {
2192                 DEBUG(DEBUG_ERR, ("Bad data size %u in traverse_handler\n",
2193                                   (unsigned)data.dsize));
2194                 state->done = true;
2195                 return;
2196         }
2197
2198         key.dsize = d->keylen;
2199         key.dptr  = &d->data[0];
2200         data.dsize = d->datalen;
2201         data.dptr = &d->data[d->keylen];
2202
2203         if (key.dsize == 0 && data.dsize == 0) {
2204                 /* end of traverse */
2205                 state->done = true;
2206                 return;
2207         }
2208
2209         if (!state->listemptyrecords &&
2210             data.dsize == sizeof(struct ctdb_ltdb_header))
2211         {
2212                 /* empty records are deleted records in ctdb */
2213                 return;
2214         }
2215
2216         if (state->fn(key, data, state->private_data) != 0) {
2217                 state->done = true;
2218         }
2219
2220         state->count++;
2221 }
2222
2223 /**
2224  * start a cluster wide traverse, calling the supplied fn on each record
2225  * return the number of records traversed, or -1 on error
2226  *
2227  * Extendet variant with a flag to signal whether empty records should
2228  * be listed.
2229  */
2230 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2231                              ctdb_traverse_func fn,
2232                              bool withemptyrecords,
2233                              void *private_data)
2234 {
2235         TDB_DATA data;
2236         struct ctdb_traverse_start_ext t;
2237         int32_t status;
2238         int ret;
2239         uint64_t srvid = (getpid() | 0xFLL<<60);
2240         struct traverse_state state;
2241
2242         state.done = false;
2243         state.count = 0;
2244         state.private_data = private_data;
2245         state.fn = fn;
2246         state.listemptyrecords = withemptyrecords;
2247
2248         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2249         if (ret != 0) {
2250                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2251                 return -1;
2252         }
2253
2254         t.db_id = ctdb_db->db_id;
2255         t.srvid = srvid;
2256         t.reqid = 0;
2257         t.withemptyrecords = withemptyrecords;
2258
2259         data.dptr = (uint8_t *)&t;
2260         data.dsize = sizeof(t);
2261
2262         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2263                            data, NULL, NULL, &status, NULL, NULL);
2264         if (ret != 0 || status != 0) {
2265                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2266                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2267                 return -1;
2268         }
2269
2270         while (!state.done) {
2271                 event_loop_once(ctdb_db->ctdb->ev);
2272         }
2273
2274         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2275         if (ret != 0) {
2276                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2277                 return -1;
2278         }
2279
2280         return state.count;
2281 }
2282
2283 /**
2284  * start a cluster wide traverse, calling the supplied fn on each record
2285  * return the number of records traversed, or -1 on error
2286  *
2287  * Standard version which does not list the empty records:
2288  * These are considered deleted.
2289  */
2290 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2291 {
2292         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2293 }
2294
2295 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2296 /*
2297   called on each key during a catdb
2298  */
2299 int ctdb_dumpdb_record(TDB_DATA key, TDB_DATA data, void *p)
2300 {
2301         int i;
2302         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2303         FILE *f = c->f;
2304         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2305
2306         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2307         for (i=0;i<key.dsize;i++) {
2308                 if (ISASCII(key.dptr[i])) {
2309                         fprintf(f, "%c", key.dptr[i]);
2310                 } else {
2311                         fprintf(f, "\\%02X", key.dptr[i]);
2312                 }
2313         }
2314         fprintf(f, "\"\n");
2315
2316         fprintf(f, "dmaster: %u\n", h->dmaster);
2317         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2318
2319         if (c->printlmaster && c->ctdb->vnn_map != NULL) {
2320                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(c->ctdb, &key));
2321         }
2322
2323         if (c->printhash) {
2324                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2325         }
2326
2327         if (c->printrecordflags) {
2328                 fprintf(f, "flags: 0x%08x", h->flags);
2329                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2330                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2331                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2332                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2333                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2334                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2335                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2336                 fprintf(f, "\n");
2337         }
2338
2339         if (c->printdatasize) {
2340                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2341         } else {
2342                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2343                 for (i=sizeof(*h);i<data.dsize;i++) {
2344                         if (ISASCII(data.dptr[i])) {
2345                                 fprintf(f, "%c", data.dptr[i]);
2346                         } else {
2347                                 fprintf(f, "\\%02X", data.dptr[i]);
2348                         }
2349                 }
2350                 fprintf(f, "\"\n");
2351         }
2352
2353         fprintf(f, "\n");
2354
2355         return 0;
2356 }
2357
2358 /*
2359   convenience function to list all keys to stdout
2360  */
2361 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2362                  struct ctdb_dump_db_context *ctx)
2363 {
2364         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2365                                  ctx->printemptyrecords, ctx);
2366 }
2367
2368 /*
2369   get the pid of a ctdb daemon
2370  */
2371 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2372 {
2373         int ret;
2374         int32_t res;
2375
2376         ret = ctdb_control(ctdb, destnode, 0, 
2377                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2378                            NULL, NULL, &res, &timeout, NULL);
2379         if (ret != 0) {
2380                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2381                 return -1;
2382         }
2383
2384         *pid = res;
2385
2386         return 0;
2387 }
2388
2389
2390 /*
2391   async freeze send control
2392  */
2393 struct ctdb_client_control_state *
2394 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2395 {
2396         return ctdb_control_send(ctdb, destnode, priority, 
2397                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2398                            mem_ctx, &timeout, NULL);
2399 }
2400
2401 /* 
2402    async freeze recv control
2403 */
2404 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2405 {
2406         int ret;
2407         int32_t res;
2408
2409         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2410         if ( (ret != 0) || (res != 0) ){
2411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2412                 return -1;
2413         }
2414
2415         return 0;
2416 }
2417
2418 /*
2419   freeze databases of a certain priority
2420  */
2421 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2422 {
2423         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2424         struct ctdb_client_control_state *state;
2425         int ret;
2426
2427         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2428         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2429         talloc_free(tmp_ctx);
2430
2431         return ret;
2432 }
2433
2434 /* Freeze all databases */
2435 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2436 {
2437         int i;
2438
2439         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2440                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2441                         return -1;
2442                 }
2443         }
2444         return 0;
2445 }
2446
2447 /*
2448   thaw databases of a certain priority
2449  */
2450 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2451 {
2452         int ret;
2453         int32_t res;
2454
2455         ret = ctdb_control(ctdb, destnode, priority, 
2456                            CTDB_CONTROL_THAW, 0, tdb_null, 
2457                            NULL, NULL, &res, &timeout, NULL);
2458         if (ret != 0 || res != 0) {
2459                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2460                 return -1;
2461         }
2462
2463         return 0;
2464 }
2465
2466 /* thaw all databases */
2467 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2468 {
2469         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2470 }
2471
2472 /*
2473   get pnn of a node, or -1
2474  */
2475 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2476 {
2477         int ret;
2478         int32_t res;
2479
2480         ret = ctdb_control(ctdb, destnode, 0, 
2481                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2482                            NULL, NULL, &res, &timeout, NULL);
2483         if (ret != 0) {
2484                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2485                 return -1;
2486         }
2487
2488         return res;
2489 }
2490
2491 /*
2492   get the monitoring mode of a remote node
2493  */
2494 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2495 {
2496         int ret;
2497         int32_t res;
2498
2499         ret = ctdb_control(ctdb, destnode, 0, 
2500                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2501                            NULL, NULL, &res, &timeout, NULL);
2502         if (ret != 0) {
2503                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2504                 return -1;
2505         }
2506
2507         *monmode = res;
2508
2509         return 0;
2510 }
2511
2512
2513 /*
2514  set the monitoring mode of a remote node to active
2515  */
2516 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2517 {
2518         int ret;
2519         
2520
2521         ret = ctdb_control(ctdb, destnode, 0, 
2522                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2523                            NULL, NULL,NULL, &timeout, NULL);
2524         if (ret != 0) {
2525                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2526                 return -1;
2527         }
2528
2529         
2530
2531         return 0;
2532 }
2533
2534 /*
2535   set the monitoring mode of a remote node to disable
2536  */
2537 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2538 {
2539         int ret;
2540         
2541
2542         ret = ctdb_control(ctdb, destnode, 0, 
2543                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2544                            NULL, NULL, NULL, &timeout, NULL);
2545         if (ret != 0) {
2546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2547                 return -1;
2548         }
2549
2550         
2551
2552         return 0;
2553 }
2554
2555
2556
2557 /*
2558   sent to a node to make it take over an ip address
2559 */
2560 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2561                           uint32_t destnode, struct ctdb_public_ip *ip)
2562 {
2563         TDB_DATA data;
2564         int ret;
2565         int32_t res;
2566
2567         data.dsize = sizeof(*ip);
2568         data.dptr  = (uint8_t *)ip;
2569
2570         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0,
2571                            data, NULL, NULL, &res, &timeout, NULL);
2572         if (ret != 0 || res != 0) {
2573                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2574                 return -1;
2575         }
2576
2577         return 0;
2578 }
2579
2580
2581 /*
2582   sent to a node to make it release an ip address
2583 */
2584 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2585                          uint32_t destnode, struct ctdb_public_ip *ip)
2586 {
2587         TDB_DATA data;
2588         int ret;
2589         int32_t res;
2590
2591         data.dsize = sizeof(*ip);
2592         data.dptr  = (uint8_t *)ip;
2593
2594         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0,
2595                            data, NULL, NULL, &res, &timeout, NULL);
2596         if (ret != 0 || res != 0) {
2597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2598                 return -1;
2599         }
2600
2601         return 0;
2602 }
2603
2604
2605 /*
2606   get a tunable
2607  */
2608 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2609                           struct timeval timeout, 
2610                           uint32_t destnode,
2611                           const char *name, uint32_t *value)
2612 {
2613         struct ctdb_control_get_tunable *t;
2614         TDB_DATA data, outdata;
2615         int32_t res;
2616         int ret;
2617
2618         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2619         data.dptr  = talloc_size(ctdb, data.dsize);
2620         CTDB_NO_MEMORY(ctdb, data.dptr);
2621
2622         t = (struct ctdb_control_get_tunable *)data.dptr;
2623         t->length = strlen(name)+1;
2624         memcpy(t->name, name, t->length);
2625
2626         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2627                            &outdata, &res, &timeout, NULL);
2628         talloc_free(data.dptr);
2629         if (ret != 0 || res != 0) {
2630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2631                 return ret != 0 ? ret : res;
2632         }
2633
2634         if (outdata.dsize != sizeof(uint32_t)) {
2635                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2636                 talloc_free(outdata.dptr);
2637                 return -1;
2638         }
2639         
2640         *value = *(uint32_t *)outdata.dptr;
2641         talloc_free(outdata.dptr);
2642
2643         return 0;
2644 }
2645
2646 /*
2647   set a tunable
2648  */
2649 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2650                           struct timeval timeout, 
2651                           uint32_t destnode,
2652                           const char *name, uint32_t value)
2653 {
2654         struct ctdb_control_set_tunable *t;
2655         TDB_DATA data;
2656         int32_t res;
2657         int ret;
2658
2659         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2660         data.dptr  = talloc_size(ctdb, data.dsize);
2661         CTDB_NO_MEMORY(ctdb, data.dptr);
2662
2663         t = (struct ctdb_control_set_tunable *)data.dptr;
2664         t->length = strlen(name)+1;
2665         memcpy(t->name, name, t->length);
2666         t->value = value;
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2669                            NULL, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if ((ret != 0) || (res == -1)) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2673                 return -1;
2674         }
2675
2676         return res;
2677 }
2678
2679 /*
2680   list tunables
2681  */
2682 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2683                             struct timeval timeout, 
2684                             uint32_t destnode,
2685                             TALLOC_CTX *mem_ctx,
2686                             const char ***list, uint32_t *count)
2687 {
2688         TDB_DATA outdata;
2689         int32_t res;
2690         int ret;
2691         struct ctdb_control_list_tunable *t;
2692         char *p, *s, *ptr;
2693
2694         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2695                            mem_ctx, &outdata, &res, &timeout, NULL);
2696         if (ret != 0 || res != 0) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2698                 return -1;
2699         }
2700
2701         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2702         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2703             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2704                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2705                 talloc_free(outdata.dptr);
2706                 return -1;              
2707         }
2708         
2709         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2710         CTDB_NO_MEMORY(ctdb, p);
2711
2712         talloc_free(outdata.dptr);
2713         
2714         (*list) = NULL;
2715         (*count) = 0;
2716
2717         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2718                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2719                 CTDB_NO_MEMORY(ctdb, *list);
2720                 (*list)[*count] = talloc_strdup(*list, s);
2721                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2722                 (*count)++;
2723         }
2724
2725         talloc_free(p);
2726
2727         return 0;
2728 }
2729
2730
2731 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2732                                    struct timeval timeout, uint32_t destnode,
2733                                    TALLOC_CTX *mem_ctx,
2734                                    uint32_t flags,
2735                                    struct ctdb_all_public_ips **ips)
2736 {
2737         int ret;
2738         TDB_DATA outdata;
2739         int32_t res;
2740
2741         ret = ctdb_control(ctdb, destnode, 0,
2742                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2743                            mem_ctx, &outdata, &res, &timeout, NULL);
2744         if (ret != 0 || res != 0) {
2745                 DEBUG(DEBUG_ERR,(__location__
2746                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
2747                                  ret, res));
2748                 return -1;
2749         }
2750
2751         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2752         talloc_free(outdata.dptr);
2753
2754         return 0;
2755 }
2756
2757 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2758                              struct timeval timeout, uint32_t destnode,
2759                              TALLOC_CTX *mem_ctx,
2760                              struct ctdb_all_public_ips **ips)
2761 {
2762         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2763                                               destnode, mem_ctx,
2764                                               0, ips);
2765 }
2766
2767 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2768                                  struct timeval timeout, uint32_t destnode,
2769                                  TALLOC_CTX *mem_ctx,
2770                                  const ctdb_sock_addr *addr,
2771                                  struct ctdb_control_public_ip_info **_info)
2772 {
2773         int ret;
2774         TDB_DATA indata;
2775         TDB_DATA outdata;
2776         int32_t res;
2777         struct ctdb_control_public_ip_info *info;
2778         uint32_t len;
2779         uint32_t i;
2780
2781         indata.dptr = discard_const_p(uint8_t, addr);
2782         indata.dsize = sizeof(*addr);
2783
2784         ret = ctdb_control(ctdb, destnode, 0,
2785                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2786                            mem_ctx, &outdata, &res, &timeout, NULL);
2787         if (ret != 0 || res != 0) {
2788                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2789                                 "failed ret:%d res:%d\n",
2790                                 ret, res));
2791                 return -1;
2792         }
2793
2794         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2795         if (len > outdata.dsize) {
2796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2797                                 "returned invalid data with size %u > %u\n",
2798                                 (unsigned int)outdata.dsize,
2799                                 (unsigned int)len));
2800                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2801                 return -1;
2802         }
2803
2804         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2805         len += info->num*sizeof(struct ctdb_control_iface_info);
2806
2807         if (len > outdata.dsize) {
2808                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2809                                 "returned invalid data with size %u > %u\n",
2810                                 (unsigned int)outdata.dsize,
2811                                 (unsigned int)len));
2812                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2813                 return -1;
2814         }
2815
2816         /* make sure we null terminate the returned strings */
2817         for (i=0; i < info->num; i++) {
2818                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2819         }
2820
2821         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2822                                                                 outdata.dptr,
2823                                                                 outdata.dsize);
2824         talloc_free(outdata.dptr);
2825         if (*_info == NULL) {
2826                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2827                                 "talloc_memdup size %u failed\n",
2828                                 (unsigned int)outdata.dsize));
2829                 return -1;
2830         }
2831
2832         return 0;
2833 }
2834
2835 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2836                          struct timeval timeout, uint32_t destnode,
2837                          TALLOC_CTX *mem_ctx,
2838                          struct ctdb_control_get_ifaces **_ifaces)
2839 {
2840         int ret;
2841         TDB_DATA outdata;
2842         int32_t res;
2843         struct ctdb_control_get_ifaces *ifaces;
2844         uint32_t len;
2845         uint32_t i;
2846
2847         ret = ctdb_control(ctdb, destnode, 0,
2848                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2849                            mem_ctx, &outdata, &res, &timeout, NULL);
2850         if (ret != 0 || res != 0) {
2851                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2852                                 "failed ret:%d res:%d\n",
2853                                 ret, res));
2854                 return -1;
2855         }
2856
2857         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2858         if (len > outdata.dsize) {
2859                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2860                                 "returned invalid data with size %u > %u\n",
2861                                 (unsigned int)outdata.dsize,
2862                                 (unsigned int)len));
2863                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2864                 return -1;
2865         }
2866
2867         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2868         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2869
2870         if (len > outdata.dsize) {
2871                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2872                                 "returned invalid data with size %u > %u\n",
2873                                 (unsigned int)outdata.dsize,
2874                                 (unsigned int)len));
2875                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2876                 return -1;
2877         }
2878
2879         /* make sure we null terminate the returned strings */
2880         for (i=0; i < ifaces->num; i++) {
2881                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2882         }
2883
2884         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2885                                                                   outdata.dptr,
2886                                                                   outdata.dsize);
2887         talloc_free(outdata.dptr);
2888         if (*_ifaces == NULL) {
2889                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2890                                 "talloc_memdup size %u failed\n",
2891                                 (unsigned int)outdata.dsize));
2892                 return -1;
2893         }
2894
2895         return 0;
2896 }
2897
2898 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2899                              struct timeval timeout, uint32_t destnode,
2900                              TALLOC_CTX *mem_ctx,
2901                              const struct ctdb_control_iface_info *info)
2902 {
2903         int ret;
2904         TDB_DATA indata;
2905         int32_t res;
2906
2907         indata.dptr = discard_const_p(uint8_t, info);
2908         indata.dsize = sizeof(*info);
2909
2910         ret = ctdb_control(ctdb, destnode, 0,
2911                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2912                            mem_ctx, NULL, &res, &timeout, NULL);
2913         if (ret != 0 || res != 0) {
2914                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2915                                 "failed ret:%d res:%d\n",
2916                                 ret, res));
2917                 return -1;
2918         }
2919
2920         return 0;
2921 }
2922
2923 /*
2924   set/clear the permanent disabled bit on a remote node
2925  */
2926 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2927                        uint32_t set, uint32_t clear)
2928 {
2929         int ret;
2930         TDB_DATA data;
2931         struct ctdb_node_map *nodemap=NULL;
2932         struct ctdb_node_flag_change c;
2933         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2934         uint32_t recmaster;
2935         uint32_t *nodes;
2936
2937
2938         /* find the recovery master */
2939         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2940         if (ret != 0) {
2941                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2942                 talloc_free(tmp_ctx);
2943                 return ret;
2944         }
2945
2946
2947         /* read the node flags from the recmaster */
2948         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2949         if (ret != 0) {
2950                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2951                 talloc_free(tmp_ctx);
2952                 return -1;
2953         }
2954         if (destnode >= nodemap->num) {
2955                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2956                 talloc_free(tmp_ctx);
2957                 return -1;
2958         }
2959
2960         c.pnn       = destnode;
2961         c.old_flags = nodemap->nodes[destnode].flags;
2962         c.new_flags = c.old_flags;
2963         c.new_flags |= set;
2964         c.new_flags &= ~clear;
2965
2966         data.dsize = sizeof(c);
2967         data.dptr = (unsigned char *)&c;
2968
2969         /* send the flags update to all connected nodes */
2970         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2971
2972         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2973                                         nodes, 0,
2974                                         timeout, false, data,
2975                                         NULL, NULL,
2976                                         NULL) != 0) {
2977                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2978
2979                 talloc_free(tmp_ctx);
2980                 return -1;
2981         }
2982
2983         talloc_free(tmp_ctx);
2984         return 0;
2985 }
2986
2987
2988 /*
2989   get all tunables
2990  */
2991 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2992                                struct timeval timeout, 
2993                                uint32_t destnode,
2994                                struct ctdb_tunable *tunables)
2995 {
2996         TDB_DATA outdata;
2997         int ret;
2998         int32_t res;
2999
3000         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3001                            &outdata, &res, &timeout, NULL);
3002         if (ret != 0 || res != 0) {
3003                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3004                 return -1;
3005         }
3006
3007         if (outdata.dsize != sizeof(*tunables)) {
3008                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3009                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3010                 return -1;              
3011         }
3012
3013         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3014         talloc_free(outdata.dptr);
3015         return 0;
3016 }
3017
3018 /*
3019   add a public address to a node
3020  */
3021 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3022                       struct timeval timeout, 
3023                       uint32_t destnode,
3024                       struct ctdb_control_ip_iface *pub)
3025 {
3026         TDB_DATA data;
3027         int32_t res;
3028         int ret;
3029
3030         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3031         data.dptr  = (unsigned char *)pub;
3032
3033         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3034                            NULL, &res, &timeout, NULL);
3035         if (ret != 0 || res != 0) {
3036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3037                 return -1;
3038         }
3039
3040         return 0;
3041 }
3042
3043 /*
3044   delete a public address from a node
3045  */
3046 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3047                       struct timeval timeout, 
3048                       uint32_t destnode,
3049                       struct ctdb_control_ip_iface *pub)
3050 {
3051         TDB_DATA data;
3052         int32_t res;
3053         int ret;
3054
3055         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3056         data.dptr  = (unsigned char *)pub;
3057
3058         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3059                            NULL, &res, &timeout, NULL);
3060         if (ret != 0 || res != 0) {
3061                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3062                 return -1;
3063         }
3064
3065         return 0;
3066 }
3067
3068 /*
3069   kill a tcp connection
3070  */
3071 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3072                       struct timeval timeout, 
3073                       uint32_t destnode,
3074                       struct ctdb_tcp_connection *killtcp)
3075 {
3076         TDB_DATA data;
3077         int32_t res;
3078         int ret;
3079
3080         data.dsize = sizeof(struct ctdb_tcp_connection);
3081         data.dptr  = (unsigned char *)killtcp;
3082
3083         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3084                            NULL, &res, &timeout, NULL);
3085         if (ret != 0 || res != 0) {
3086                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3087                 return -1;
3088         }
3089
3090         return 0;
3091 }
3092
3093 /*
3094   send a gratious arp
3095  */
3096 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3097                       struct timeval timeout, 
3098                       uint32_t destnode,
3099                       ctdb_sock_addr *addr,
3100                       const char *ifname)
3101 {
3102         TDB_DATA data;
3103         int32_t res;
3104         int ret, len;
3105         struct ctdb_control_gratious_arp *gratious_arp;
3106         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3107
3108
3109         len = strlen(ifname)+1;
3110         gratious_arp = talloc_size(tmp_ctx, 
3111                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3112         CTDB_NO_MEMORY(ctdb, gratious_arp);
3113
3114         gratious_arp->addr = *addr;
3115         gratious_arp->len = len;
3116         memcpy(&gratious_arp->iface[0], ifname, len);
3117
3118
3119         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3120         data.dptr  = (unsigned char *)gratious_arp;
3121
3122         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3123                            NULL, &res, &timeout, NULL);
3124         if (ret != 0 || res != 0) {
3125                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3126                 talloc_free(tmp_ctx);
3127                 return -1;
3128         }
3129
3130         talloc_free(tmp_ctx);
3131         return 0;
3132 }
3133
3134 /*
3135   get a list of all tcp tickles that a node knows about for a particular vnn
3136  */
3137 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3138                               struct timeval timeout, uint32_t destnode, 
3139                               TALLOC_CTX *mem_ctx, 
3140                               ctdb_sock_addr *addr,
3141                               struct ctdb_control_tcp_tickle_list **list)
3142 {
3143         int ret;
3144         TDB_DATA data, outdata;
3145         int32_t status;
3146
3147         data.dptr = (uint8_t*)addr;
3148         data.dsize = sizeof(ctdb_sock_addr);
3149
3150         ret = ctdb_control(ctdb, destnode, 0, 
3151                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3152                            mem_ctx, &outdata, &status, NULL, NULL);
3153         if (ret != 0 || status != 0) {
3154                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3155                 return -1;
3156         }
3157
3158         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3159
3160         return status;
3161 }
3162
3163 /*
3164   register a server id
3165  */
3166 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3167                       struct timeval timeout, 
3168                       struct ctdb_server_id *id)
3169 {
3170         TDB_DATA data;
3171         int32_t res;
3172         int ret;
3173
3174         data.dsize = sizeof(struct ctdb_server_id);
3175         data.dptr  = (unsigned char *)id;
3176
3177         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3178                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3179                         0, data, NULL,
3180                         NULL, &res, &timeout, NULL);
3181         if (ret != 0 || res != 0) {
3182                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3183                 return -1;
3184         }
3185
3186         return 0;
3187 }
3188
3189 /*
3190   unregister a server id
3191  */
3192 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3193                       struct timeval timeout, 
3194                       struct ctdb_server_id *id)
3195 {
3196         TDB_DATA data;
3197         int32_t res;
3198         int ret;
3199
3200         data.dsize = sizeof(struct ctdb_server_id);
3201         data.dptr  = (unsigned char *)id;
3202
3203         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3204                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3205                         0, data, NULL,
3206                         NULL, &res, &timeout, NULL);
3207         if (ret != 0 || res != 0) {
3208                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3209                 return -1;
3210         }
3211
3212         return 0;
3213 }
3214
3215
3216 /*
3217   check if a server id exists
3218
3219   if a server id does exist, return *status == 1, otherwise *status == 0
3220  */
3221 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3222                       struct timeval timeout, 
3223                       uint32_t destnode,
3224                       struct ctdb_server_id *id,
3225                       uint32_t *status)
3226 {
3227         TDB_DATA data;
3228         int32_t res;
3229         int ret;
3230
3231         data.dsize = sizeof(struct ctdb_server_id);
3232         data.dptr  = (unsigned char *)id;
3233
3234         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3235                         0, data, NULL,
3236                         NULL, &res, &timeout, NULL);
3237         if (ret != 0) {
3238                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3239                 return -1;
3240         }
3241
3242         if (res) {
3243                 *status = 1;
3244         } else {
3245                 *status = 0;
3246         }
3247
3248         return 0;
3249 }
3250
3251 /*
3252    get the list of server ids that are registered on a node
3253 */
3254 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3255                 TALLOC_CTX *mem_ctx,
3256                 struct timeval timeout, uint32_t destnode, 
3257                 struct ctdb_server_id_list **svid_list)
3258 {
3259         int ret;
3260         TDB_DATA outdata;
3261         int32_t res;
3262
3263         ret = ctdb_control(ctdb, destnode, 0, 
3264                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3265                            mem_ctx, &outdata, &res, &timeout, NULL);
3266         if (ret != 0 || res != 0) {
3267                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3268                 return -1;
3269         }
3270
3271         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3272                     
3273         return 0;
3274 }
3275
3276 /*
3277   initialise the ctdb daemon for client applications
3278
3279   NOTE: In current code the daemon does not fork. This is for testing purposes only
3280   and to simplify the code.
3281 */
3282 struct ctdb_context *ctdb_init(struct event_context *ev)
3283 {
3284         int ret;
3285         struct ctdb_context *ctdb;
3286
3287         ctdb = talloc_zero(ev, struct ctdb_context);
3288         if (ctdb == NULL) {
3289                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3290                 return NULL;
3291         }
3292         ctdb->ev  = ev;
3293         ctdb->idr = idr_init(ctdb);
3294         /* Wrap early to exercise code. */
3295         ctdb->lastid = INT_MAX-200;
3296         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3297
3298         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3299         if (ret != 0) {
3300                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3301                 talloc_free(ctdb);
3302                 return NULL;
3303         }
3304
3305         ctdb->statistics.statistics_start_time = timeval_current();
3306
3307         return ctdb;
3308 }
3309
3310
3311 /*
3312   set some ctdb flags
3313 */
3314 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3315 {
3316         ctdb->flags |= flags;
3317 }
3318
3319 /*
3320   setup the local socket name
3321 */
3322 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3323 {
3324         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3325         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3326
3327         return 0;
3328 }
3329
3330 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3331 {
3332         return ctdb->daemon.name;
3333 }
3334
3335 /*
3336   return the pnn of this node
3337 */
3338 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3339 {
3340         return ctdb->pnn;
3341 }
3342
3343
3344 /*
3345   get the uptime of a remote node
3346  */
3347 struct ctdb_client_control_state *
3348 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3349 {
3350         return ctdb_control_send(ctdb, destnode, 0, 
3351                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3352                            mem_ctx, &timeout, NULL);
3353 }
3354
3355 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3356 {
3357         int ret;
3358         int32_t res;
3359         TDB_DATA outdata;
3360
3361         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3362         if (ret != 0 || res != 0) {
3363                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3364                 return -1;
3365         }
3366
3367         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3368
3369         return 0;
3370 }
3371
3372 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3373 {
3374         struct ctdb_client_control_state *state;
3375
3376         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3377         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3378 }
3379
3380 /*
3381   send a control to execute the "recovered" event script on a node
3382  */
3383 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3384 {
3385         int ret;
3386         int32_t status;
3387
3388         ret = ctdb_control(ctdb, destnode, 0, 
3389                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3390                            NULL, NULL, &status, &timeout, NULL);
3391         if (ret != 0 || status != 0) {
3392                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3393                 return -1;
3394         }
3395
3396         return 0;
3397 }
3398
3399 /* 
3400   callback for the async helpers used when sending the same control
3401   to multiple nodes in parallell.
3402 */
3403 static void async_callback(struct ctdb_client_control_state *state)
3404 {
3405         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3406         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3407         int ret;
3408         TDB_DATA outdata;
3409         int32_t res = -1;
3410         uint32_t destnode = state->c->hdr.destnode;
3411
3412         outdata.dsize = 0;
3413         outdata.dptr = NULL;
3414
3415         /* one more node has responded with recmode data */
3416         data->count--;
3417
3418         /* if we failed to push the db, then return an error and let
3419            the main loop try again.
3420         */
3421         if (state->state != CTDB_CONTROL_DONE) {
3422                 if ( !data->dont_log_errors) {
3423                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3424                 }
3425                 data->fail_count++;
3426                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3427                         res = -ETIME;
3428                 } else {
3429                         res = -1;
3430                 }
3431                 if (data->fail_callback) {
3432                         data->fail_callback(ctdb, destnode, res, outdata,
3433                                         data->callback_data);
3434                 }
3435                 return;
3436         }
3437         
3438         state->async.fn = NULL;
3439
3440         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3441         if ((ret != 0) || (res != 0)) {
3442                 if ( !data->dont_log_errors) {
3443                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3444                 }
3445                 data->fail_count++;
3446                 if (data->fail_callback) {
3447                         data->fail_callback(ctdb, destnode, res, outdata,
3448                                         data->callback_data);
3449                 }
3450         }
3451         if ((ret == 0) && (data->callback != NULL)) {
3452                 data->callback(ctdb, destnode, res, outdata,
3453                                         data->callback_data);
3454         }
3455 }
3456
3457
3458 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3459 {
3460         /* set up the callback functions */
3461         state->async.fn = async_callback;
3462         state->async.private_data = data;
3463         
3464         /* one more control to wait for to complete */
3465         data->count++;
3466 }
3467
3468
3469 /* wait for up to the maximum number of seconds allowed
3470    or until all nodes we expect a response from has replied
3471 */
3472 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3473 {
3474         while (data->count > 0) {
3475                 event_loop_once(ctdb->ev);
3476         }
3477         if (data->fail_count != 0) {
3478                 if (!data->dont_log_errors) {
3479                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3480                                  data->fail_count));
3481                 }
3482                 return -1;
3483         }
3484         return 0;
3485 }
3486
3487
3488 /* 
3489    perform a simple control on the listed nodes
3490    The control cannot return data
3491  */
3492 int ctdb_client_async_control(struct ctdb_context *ctdb,
3493                                 enum ctdb_controls opcode,
3494                                 uint32_t *nodes,
3495                                 uint64_t srvid,
3496                                 struct timeval timeout,
3497                                 bool dont_log_errors,
3498                                 TDB_DATA data,
3499                                 client_async_callback client_callback,
3500                                 client_async_callback fail_callback,
3501                                 void *callback_data)
3502 {
3503         struct client_async_data *async_data;
3504         struct ctdb_client_control_state *state;
3505         int j, num_nodes;
3506
3507         async_data = talloc_zero(ctdb, struct client_async_data);
3508         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3509         async_data->dont_log_errors = dont_log_errors;
3510         async_data->callback = client_callback;
3511         async_data->fail_callback = fail_callback;
3512         async_data->callback_data = callback_data;
3513         async_data->opcode        = opcode;
3514
3515         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3516
3517         /* loop over all nodes and send an async control to each of them */
3518         for (j=0; j<num_nodes; j++) {
3519                 uint32_t pnn = nodes[j];
3520
3521                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3522                                           0, data, async_data, &timeout, NULL);
3523                 if (state == NULL) {
3524                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3525                         talloc_free(async_data);
3526                         return -1;
3527                 }
3528                 
3529                 ctdb_client_async_add(async_data, state);
3530         }
3531
3532         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3533                 talloc_free(async_data);
3534                 return -1;
3535         }
3536
3537         talloc_free(async_data);
3538         return 0;
3539 }
3540
3541 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3542                                 struct ctdb_vnn_map *vnn_map,
3543                                 TALLOC_CTX *mem_ctx,
3544                                 bool include_self)
3545 {
3546         int i, j, num_nodes;
3547         uint32_t *nodes;
3548
3549         for (i=num_nodes=0;i<vnn_map->size;i++) {
3550                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3551                         continue;
3552                 }
3553                 num_nodes++;
3554         } 
3555
3556         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3557         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3558
3559         for (i=j=0;i<vnn_map->size;i++) {
3560                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3561                         continue;
3562                 }
3563                 nodes[j++] = vnn_map->map[i];
3564         } 
3565
3566         return nodes;
3567 }
3568
3569 /* Get list of nodes not including those with flags specified by mask.
3570  * If exclude_pnn is not -1 then exclude that pnn from the list.
3571  */
3572 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3573                         struct ctdb_node_map *node_map,
3574                         TALLOC_CTX *mem_ctx,
3575                         uint32_t mask,
3576                         int exclude_pnn)
3577 {
3578         int i, j, num_nodes;
3579         uint32_t *nodes;
3580
3581         for (i=num_nodes=0;i<node_map->num;i++) {
3582                 if (node_map->nodes[i].flags & mask) {
3583                         continue;
3584                 }
3585                 if (node_map->nodes[i].pnn == exclude_pnn) {
3586                         continue;
3587                 }
3588                 num_nodes++;
3589         } 
3590
3591         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3592         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3593
3594         for (i=j=0;i<node_map->num;i++) {
3595                 if (node_map->nodes[i].flags & mask) {
3596                         continue;
3597                 }
3598                 if (node_map->nodes[i].pnn == exclude_pnn) {
3599                         continue;
3600                 }
3601                 nodes[j++] = node_map->nodes[i].pnn;
3602         } 
3603
3604         return nodes;
3605 }
3606
3607 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3608                                 struct ctdb_node_map *node_map,
3609                                 TALLOC_CTX *mem_ctx,
3610                                 bool include_self)
3611 {
3612         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3613                              include_self ? -1 : ctdb->pnn);
3614 }
3615
3616 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3617                                 struct ctdb_node_map *node_map,
3618                                 TALLOC_CTX *mem_ctx,
3619                                 bool include_self)
3620 {
3621         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3622                              include_self ? -1 : ctdb->pnn);
3623 }
3624
3625 /* 
3626   this is used to test if a pnn lock exists and if it exists will return
3627   the number of connections that pnn has reported or -1 if that recovery
3628   daemon is not running.
3629 */
3630 int
3631 ctdb_read_pnn_lock(int fd, int32_t pnn)
3632 {
3633         struct flock lock;
3634         char c;
3635
3636         lock.l_type = F_WRLCK;
3637         lock.l_whence = SEEK_SET;
3638         lock.l_start = pnn;
3639         lock.l_len = 1;
3640         lock.l_pid = 0;
3641
3642         if (fcntl(fd, F_GETLK, &lock) != 0) {
3643                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3644                 return -1;
3645         }
3646
3647         if (lock.l_type == F_UNLCK) {
3648                 return -1;
3649         }
3650
3651         if (pread(fd, &c, 1, pnn) == -1) {
3652                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3653                 return -1;
3654         }
3655
3656         return c;
3657 }
3658
3659 /*
3660   get capabilities of a remote node
3661  */
3662 struct ctdb_client_control_state *
3663 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3664 {
3665         return ctdb_control_send(ctdb, destnode, 0, 
3666                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3667                            mem_ctx, &timeout, NULL);
3668 }
3669
3670 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3671 {
3672         int ret;
3673         int32_t res;
3674         TDB_DATA outdata;
3675
3676         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3677         if ( (ret != 0) || (res != 0) ) {
3678                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3679                 return -1;
3680         }
3681
3682         if (capabilities) {
3683                 *capabilities = *((uint32_t *)outdata.dptr);
3684         }
3685
3686         return 0;
3687 }
3688
3689 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3690 {
3691         struct ctdb_client_control_state *state;
3692         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3693         int ret;
3694
3695         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3696         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3697         talloc_free(tmp_ctx);
3698         return ret;
3699 }
3700
3701 static void get_capabilities_callback(struct ctdb_context *ctdb,
3702                                       uint32_t node_pnn, int32_t res,
3703                                       TDB_DATA outdata, void *callback_data)
3704 {
3705         struct ctdb_node_capabilities *caps =
3706                 talloc_get_type(callback_data,
3707                                 struct ctdb_node_capabilities);
3708
3709         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
3710                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
3711                 return;
3712         }
3713
3714         if (node_pnn >= talloc_array_length(caps)) {
3715                 DEBUG(DEBUG_ERR,
3716                       (__location__ " unexpected PNN %u\n", node_pnn));
3717                 return;
3718         }
3719
3720         caps[node_pnn].retrieved = true;
3721         caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
3722 }
3723
3724 struct ctdb_node_capabilities *
3725 ctdb_get_capabilities(struct ctdb_context *ctdb,
3726                       TALLOC_CTX *mem_ctx,
3727                       struct timeval timeout,
3728                       struct ctdb_node_map *nodemap)
3729 {
3730         uint32_t *nodes;
3731         uint32_t i, res;
3732         struct ctdb_node_capabilities *ret;
3733
3734         nodes = list_of_connected_nodes(ctdb, nodemap, mem_ctx, true);
3735
3736         ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
3737                            nodemap->num);
3738         CTDB_NO_MEMORY_NULL(ctdb, ret);
3739         /* Prepopulate the expected PNNs */
3740         for (i = 0; i < talloc_array_length(ret); i++) {
3741                 ret[i].retrieved = false;
3742         }
3743
3744         res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
3745                                         nodes, 0, timeout,
3746                                         false, tdb_null,
3747                                         get_capabilities_callback, NULL,
3748                                         ret);
3749         if (res != 0) {
3750                 DEBUG(DEBUG_ERR,
3751                       (__location__ " Failed to read node capabilities.\n"));
3752                 TALLOC_FREE(ret);
3753         }
3754
3755         return ret;
3756 }
3757
3758 uint32_t *
3759 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
3760                            uint32_t pnn)
3761 {
3762         if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
3763                 return &caps[pnn].capabilities;
3764         }
3765
3766         return NULL;
3767 }
3768
3769 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
3770                                 uint32_t pnn,
3771                                 uint32_t capabilities_required)
3772 {
3773         uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
3774         return (capp != NULL) &&
3775                 ((*capp & capabilities_required) == capabilities_required);
3776 }
3777
3778
3779 struct server_id {
3780         uint64_t pid;
3781         uint32_t task_id;
3782         uint32_t vnn;
3783         uint64_t unique_id;
3784 };
3785
3786 static struct server_id server_id_fetch(struct ctdb_context *ctdb, uint32_t reqid)
3787 {
3788         struct server_id id;
3789
3790         id.pid = getpid();
3791         id.task_id = reqid;
3792         id.vnn = ctdb_get_pnn(ctdb);
3793         id.unique_id = id.vnn;
3794         id.unique_id = (id.unique_id << 32) | reqid;
3795
3796         return id;
3797 }
3798
3799 /* This is basically a copy from Samba's server_id.*.  However, a
3800  * dependency chain stops us from using Samba's version, so use a
3801  * renamed copy until a better solution is found. */
3802 static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
3803 {
3804         if (id1->pid != id2->pid) {
3805                 return false;
3806         }
3807
3808         if (id1->task_id != id2->task_id) {
3809                 return false;
3810         }
3811
3812         if (id1->vnn != id2->vnn) {
3813                 return false;
3814         }
3815
3816         if (id1->unique_id != id2->unique_id) {
3817                 return false;
3818         }
3819
3820         return true;
3821 }
3822
3823 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3824 {
3825         struct ctdb_server_id sid;
3826         int ret;
3827         uint32_t result = 0;
3828
3829         sid.type = SERVER_TYPE_SAMBA;
3830         sid.pnn = id->vnn;
3831         sid.server_id = id->pid;
3832
3833         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3834                                         id->vnn, &sid, &result);
3835         if (ret != 0) {
3836                 /* If control times out, assume server_id exists. */
3837                 return true;
3838         }
3839
3840         if (result) {
3841                 return true;
3842         }
3843
3844         return false;
3845 }
3846
3847
3848 enum g_lock_type {
3849         G_LOCK_READ = 0,
3850         G_LOCK_WRITE = 1,
3851 };
3852
3853 struct g_lock_rec {
3854         enum g_lock_type type;
3855         struct server_id id;
3856 };
3857
3858 struct g_lock_recs {
3859         unsigned int num;
3860         struct g_lock_rec *lock;
3861 };
3862
3863 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3864                          struct g_lock_recs **locks)
3865 {
3866         struct g_lock_recs *recs;
3867
3868         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3869         if (recs == NULL) {
3870                 return false;
3871         }
3872
3873         if (data.dsize == 0) {
3874                 goto done;
3875         }
3876
3877         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3878                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3879                                   (unsigned long)data.dsize));
3880                 talloc_free(recs);
3881                 return false;
3882         }
3883
3884         recs->num = data.dsize / sizeof(struct g_lock_rec);
3885         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3886         if (recs->lock == NULL) {
3887                 talloc_free(recs);
3888                 return false;
3889         }
3890
3891 done:
3892         if (locks != NULL) {
3893                 *locks = recs;
3894         }
3895
3896         return true;
3897 }
3898
3899
3900 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3901                         struct ctdb_db_context *ctdb_db,
3902                         const char *keyname, uint32_t reqid)
3903 {
3904         TDB_DATA key, data;
3905         struct ctdb_record_handle *h;
3906         struct g_lock_recs *locks;
3907         struct server_id id;
3908         struct timeval t_start;
3909         int i;
3910
3911         key.dptr = (uint8_t *)discard_const(keyname);
3912         key.dsize = strlen(keyname) + 1;
3913
3914         t_start = timeval_current();
3915
3916 again:
3917         /* Keep trying for an hour. */
3918         if (timeval_elapsed(&t_start) > 3600) {
3919                 return false;
3920         }
3921
3922         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3923         if (h == NULL) {
3924                 return false;
3925         }
3926
3927         if (!g_lock_parse(h, data, &locks)) {
3928                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3929                 talloc_free(data.dptr);
3930                 talloc_free(h);
3931                 return false;
3932         }
3933
3934         talloc_free(data.dptr);
3935
3936         id = server_id_fetch(ctdb_db->ctdb, reqid);
3937
3938         i = 0;
3939         while (i < locks->num) {
3940                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
3941                         /* Internal error */
3942                         talloc_free(h);
3943                         return false;
3944                 }
3945
3946                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3947                         if (i < locks->num-1) {
3948                                 locks->lock[i] = locks->lock[locks->num-1];
3949                         }
3950                         locks->num--;
3951                         continue;
3952                 }
3953
3954                 /* This entry is locked. */
3955                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3956                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3957                                    (unsigned long long)id.pid,
3958                                    id.task_id, id.vnn,
3959                                    (unsigned long long)id.unique_id));
3960                 talloc_free(h);
3961                 goto again;
3962         }
3963
3964         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3965                                      locks->num+1);
3966         if (locks->lock == NULL) {
3967                 talloc_free(h);
3968                 return false;
3969         }
3970
3971         locks->lock[locks->num].type = G_LOCK_WRITE;
3972         locks->lock[locks->num].id = id;
3973         locks->num++;
3974
3975         data.dptr = (uint8_t *)locks->lock;
3976         data.dsize = locks->num * sizeof(struct g_lock_rec);
3977
3978         if (ctdb_record_store(h, data) != 0) {
3979                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3980                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3981                                   (unsigned long long)id.pid,
3982                                   id.task_id, id.vnn,
3983                                   (unsigned long long)id.unique_id));
3984                 talloc_free(h);
3985                 return false;
3986         }
3987
3988         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3989                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3990                            (unsigned long long)id.pid,
3991                            id.task_id, id.vnn,
3992                            (unsigned long long)id.unique_id));
3993
3994         talloc_free(h);
3995         return true;
3996 }
3997
3998 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3999                           struct ctdb_db_context *ctdb_db,
4000                           const char *keyname, uint32_t reqid)
4001 {
4002         TDB_DATA key, data;
4003         struct ctdb_record_handle *h;
4004         struct g_lock_recs *locks;
4005         struct server_id id;
4006         int i;
4007         bool found = false;
4008
4009         key.dptr = (uint8_t *)discard_const(keyname);
4010         key.dsize = strlen(keyname) + 1;
4011         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
4012         if (h == NULL) {
4013                 return false;
4014         }
4015
4016         if (!g_lock_parse(h, data, &locks)) {
4017                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4018                 talloc_free(data.dptr);
4019                 talloc_free(h);
4020                 return false;
4021         }
4022
4023         talloc_free(data.dptr);
4024
4025         id = server_id_fetch(ctdb_db->ctdb, reqid);
4026
4027         for (i=0; i<locks->num; i++) {
4028                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
4029                         if (i < locks->num-1) {
4030                                 locks->lock[i] = locks->lock[locks->num-1];
4031                         }
4032                         locks->num--;
4033                         found = true;
4034                         break;
4035                 }
4036         }
4037
4038         if (!found) {
4039                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4040                 talloc_free(h);
4041                 return false;
4042         }
4043
4044         data.dptr = (uint8_t *)locks->lock;
4045         data.dsize = locks->num * sizeof(struct g_lock_rec);
4046
4047         if (ctdb_record_store(h, data) != 0) {
4048                 talloc_free(h);
4049                 return false;
4050         }
4051
4052         talloc_free(h);
4053         return true;
4054 }
4055
4056
4057 struct ctdb_transaction_handle {
4058         struct ctdb_db_context *ctdb_db;
4059         struct ctdb_db_context *g_lock_db;
4060         char *lock_name;
4061         uint32_t reqid;
4062         /*
4063          * we store reads and writes done under a transaction:
4064          * - one list stores both reads and writes (m_all)
4065          * - the other just writes (m_write)
4066          */
4067         struct ctdb_marshall_buffer *m_all;
4068         struct ctdb_marshall_buffer *m_write;
4069 };
4070
4071 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4072 {
4073         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4074         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4075         return 0;
4076 }
4077
4078
4079 /**
4080  * start a transaction on a database
4081  */
4082 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4083                                                        TALLOC_CTX *mem_ctx)
4084 {
4085         struct ctdb_transaction_handle *h;
4086         struct ctdb_server_id id;
4087
4088         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4089         if (h == NULL) {
4090                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4091                 return NULL;
4092         }
4093
4094         h->ctdb_db = ctdb_db;
4095         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4096                                        (unsigned int)ctdb_db->db_id);
4097         if (h->lock_name == NULL) {
4098                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4099                 talloc_free(h);
4100                 return NULL;
4101         }
4102
4103         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4104                                    "g_lock.tdb", false, 0);
4105         if (!h->g_lock_db) {
4106                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4107                 talloc_free(h);
4108                 return NULL;
4109         }
4110
4111         id.type = SERVER_TYPE_SAMBA;
4112         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4113         id.server_id = getpid();
4114
4115         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4116                                          &id) != 0) {
4117                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4118                 talloc_free(h);
4119                 return NULL;
4120         }
4121
4122         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4123
4124         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4125                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4126                 talloc_free(h);
4127                 return NULL;
4128         }
4129
4130         talloc_set_destructor(h, ctdb_transaction_destructor);
4131         return h;
4132 }
4133
4134 /**
4135  * fetch a record inside a transaction
4136  */
4137 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4138                            TALLOC_CTX *mem_ctx,
4139                            TDB_DATA key, TDB_DATA *data)
4140 {
4141         struct ctdb_ltdb_header header;
4142         int ret;
4143
4144         ZERO_STRUCT(header);
4145
4146         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4147         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4148                 /* record doesn't exist yet */
4149                 *data = tdb_null;
4150                 ret = 0;
4151         }
4152
4153         if (ret != 0) {
4154                 return ret;
4155         }
4156
4157         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4158         if (h->m_all == NULL) {
4159                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4160                 return -1;
4161         }
4162
4163         return 0;
4164 }
4165
4166 /**
4167  * stores a record inside a transaction
4168  */
4169 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4170                            TDB_DATA key, TDB_DATA data)
4171 {
4172         TALLOC_CTX *tmp_ctx = talloc_new(h);
4173         struct ctdb_ltdb_header header;
4174         TDB_DATA olddata;
4175         int ret;
4176
4177         /* we need the header so we can update the RSN */
4178         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4179         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4180                 /* the record doesn't exist - create one with us as dmaster.
4181                    This is only safe because we are in a transaction and this
4182                    is a persistent database */
4183                 ZERO_STRUCT(header);
4184         } else if (ret != 0) {
4185                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4186                 talloc_free(tmp_ctx);
4187                 return ret;
4188         }
4189
4190         if (data.dsize == olddata.dsize &&
4191             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4192             header.rsn != 0) {
4193                 /* save writing the same data */
4194                 talloc_free(tmp_ctx);
4195                 return 0;
4196         }
4197
4198         header.dmaster = h->ctdb_db->ctdb->pnn;
4199         header.rsn++;
4200
4201         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4202         if (h->m_all == NULL) {
4203                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4204                 talloc_free(tmp_ctx);
4205                 return -1;
4206         }
4207
4208         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4209         if (h->m_write == NULL) {
4210                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4211                 talloc_free(tmp_ctx);
4212                 return -1;
4213         }
4214
4215         talloc_free(tmp_ctx);
4216         return 0;
4217 }
4218
4219 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4220 {
4221         const char *keyname = CTDB_DB_SEQNUM_KEY;
4222         TDB_DATA key, data;
4223         struct ctdb_ltdb_header header;
4224         int ret;
4225
4226         key.dptr = (uint8_t *)discard_const(keyname);
4227         key.dsize = strlen(keyname) + 1;
4228
4229         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4230         if (ret != 0) {
4231                 *seqnum = 0;
4232                 return 0;
4233         }
4234
4235         if (data.dsize == 0) {
4236                 *seqnum = 0;
4237                 return 0;
4238         }
4239
4240         if (data.dsize != sizeof(*seqnum)) {
4241                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4242                                   data.dsize));
4243                 talloc_free(data.dptr);
4244                 return -1;
4245         }
4246
4247         *seqnum = *(uint64_t *)data.dptr;
4248         talloc_free(data.dptr);
4249
4250         return 0;
4251 }
4252
4253
4254 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4255                                 uint64_t seqnum)
4256 {
4257         const char *keyname = CTDB_DB_SEQNUM_KEY;
4258         TDB_DATA key, data;
4259
4260         key.dptr = (uint8_t *)discard_const(keyname);
4261         key.dsize = strlen(keyname) + 1;
4262
4263         data.dptr = (uint8_t *)&seqnum;
4264         data.dsize = sizeof(seqnum);
4265
4266         return ctdb_transaction_store(h, key, data);
4267 }
4268
4269
4270 /**
4271  * commit a transaction
4272  */
4273 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4274 {
4275         int ret;
4276         uint64_t old_seqnum, new_seqnum;
4277         int32_t status;
4278         struct timeval timeout;
4279
4280         if (h->m_write == NULL) {
4281                 /* no changes were made */
4282                 talloc_free(h);
4283                 return 0;
4284         }
4285
4286         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4287         if (ret != 0) {
4288                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4289                 ret = -1;
4290                 goto done;
4291         }
4292
4293         new_seqnum = old_seqnum + 1;
4294         ret = ctdb_store_db_seqnum(h, new_seqnum);
4295         if (ret != 0) {
4296                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4297                 ret = -1;
4298                 goto done;
4299         }
4300
4301 again:
4302         timeout = timeval_current_ofs(3,0);
4303         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4304                            h->ctdb_db->db_id,
4305                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4306                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4307                            &status, &timeout, NULL);
4308         if (ret != 0 || status != 0) {
4309                 /*
4310                  * TRANS3_COMMIT control will only fail if recovery has been
4311                  * triggered.  Check if the database has been updated or not.
4312                  */
4313                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4314                 if (ret != 0) {
4315                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4316                         goto done;
4317                 }
4318
4319                 if (new_seqnum == old_seqnum) {
4320                         /* Database not yet updated, try again */
4321                         goto again;
4322                 }
4323
4324                 if (new_seqnum != (old_seqnum + 1)) {
4325                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4326                                           (long long unsigned)new_seqnum,
4327                                           (long long unsigned)old_seqnum));
4328                         ret = -1;
4329                         goto done;
4330                 }
4331         }
4332
4333         ret = 0;
4334
4335 done:
4336         talloc_free(h);
4337         return ret;
4338 }
4339
4340 /**
4341  * cancel a transaction
4342  */
4343 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4344 {
4345         talloc_free(h);
4346         return 0;
4347 }
4348
4349
4350 /*
4351   recovery daemon ping to main daemon
4352  */
4353 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4354 {
4355         int ret;
4356         int32_t res;
4357
4358         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4359                            ctdb, NULL, &res, NULL, NULL);
4360         if (ret != 0 || res != 0) {
4361                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4362                 return -1;
4363         }
4364
4365         return 0;
4366 }
4367
4368 /* When forking the main daemon and the child process needs to connect
4369  * back to the daemon as a client process, this function can be used
4370  * to change the ctdb context from daemon into client mode.  The child
4371  * process must be created using ctdb_fork() and not fork() -
4372  * ctdb_fork() does some necessary housekeeping.
4373  */
4374 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4375 {
4376         int ret;
4377         va_list ap;
4378
4379         /* Add extra information so we can identify this in the logs */
4380         va_start(ap, fmt);
4381         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4382         va_end(ap);
4383
4384         /* get a new event context */
4385         ctdb->ev = event_context_init(ctdb);
4386         tevent_loop_allow_nesting(ctdb->ev);
4387
4388         /* Connect to main CTDB daemon */
4389         ret = ctdb_socket_connect(ctdb);
4390         if (ret != 0) {
4391                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4392                 return -1;
4393         }
4394
4395         ctdb->can_send_controls = true;
4396
4397         return 0;
4398 }
4399
4400 /*
4401   get the status of running the monitor eventscripts: NULL means never run.
4402  */
4403 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4404                 struct timeval timeout, uint32_t destnode, 
4405                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4406                 struct ctdb_scripts_wire **scripts)
4407 {
4408         int ret;
4409         TDB_DATA outdata, indata;
4410         int32_t res;
4411         uint32_t uinttype = type;
4412
4413         indata.dptr = (uint8_t *)&uinttype;
4414         indata.dsize = sizeof(uinttype);
4415
4416         ret = ctdb_control(ctdb, destnode, 0, 
4417                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4418                            mem_ctx, &outdata, &res, &timeout, NULL);
4419         if (ret != 0 || res != 0) {
4420                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4421                 return -1;
4422         }
4423
4424         if (outdata.dsize == 0) {
4425                 *scripts = NULL;
4426         } else {
4427                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4428                 talloc_free(outdata.dptr);
4429         }
4430                     
4431         return 0;
4432 }
4433
4434 /*
4435   tell the main daemon how long it took to lock the reclock file
4436  */
4437 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4438 {
4439         int ret;
4440         int32_t res;
4441         TDB_DATA data;
4442
4443         data.dptr = (uint8_t *)&latency;
4444         data.dsize = sizeof(latency);
4445
4446         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4447                            ctdb, NULL, &res, NULL, NULL);
4448         if (ret != 0 || res != 0) {
4449                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4450                 return -1;
4451         }
4452
4453         return 0;
4454 }
4455
4456 /*
4457   get the name of the reclock file
4458  */
4459 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4460                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4461                          const char **name)
4462 {
4463         int ret;
4464         int32_t res;
4465         TDB_DATA data;
4466
4467         ret = ctdb_control(ctdb, destnode, 0, 
4468                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4469                            mem_ctx, &data, &res, &timeout, NULL);
4470         if (ret != 0 || res != 0) {
4471                 return -1;
4472         }
4473
4474         if (data.dsize == 0) {
4475                 *name = NULL;
4476         } else {
4477                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4478         }
4479         talloc_free(data.dptr);
4480
4481         return 0;
4482 }
4483
4484 /*
4485   set the reclock filename for a node
4486  */
4487 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4488 {
4489         int ret;
4490         TDB_DATA data;
4491         int32_t res;
4492
4493         if (reclock == NULL) {
4494                 data.dsize = 0;
4495                 data.dptr  = NULL;
4496         } else {
4497                 data.dsize = strlen(reclock) + 1;
4498                 data.dptr  = discard_const(reclock);
4499         }
4500
4501         ret = ctdb_control(ctdb, destnode, 0, 
4502                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4503                            NULL, NULL, &res, &timeout, NULL);
4504         if (ret != 0 || res != 0) {
4505                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4506                 return -1;
4507         }
4508
4509         return 0;
4510 }
4511
4512 /*
4513   stop a node
4514  */
4515 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4516 {
4517         int ret;
4518         int32_t res;
4519
4520         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4521                            ctdb, NULL, &res, &timeout, NULL);
4522         if (ret != 0 || res != 0) {
4523                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4524                 return -1;
4525         }
4526
4527         return 0;
4528 }
4529
4530 /*
4531   continue a node
4532  */
4533 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4534 {
4535         int ret;
4536
4537         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4538                            ctdb, NULL, NULL, &timeout, NULL);
4539         if (ret != 0) {
4540                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4541                 return -1;
4542         }
4543
4544         return 0;
4545 }
4546
4547 /*
4548   set the natgw state for a node
4549  */
4550 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4551 {
4552         int ret;
4553         TDB_DATA data;
4554         int32_t res;
4555
4556         data.dsize = sizeof(natgwstate);
4557         data.dptr  = (uint8_t *)&natgwstate;
4558
4559         ret = ctdb_control(ctdb, destnode, 0, 
4560                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4561                            NULL, NULL, &res, &timeout, NULL);
4562         if (ret != 0 || res != 0) {
4563                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4564                 return -1;
4565         }
4566
4567         return 0;
4568 }
4569
4570 /*
4571   set the lmaster role for a node
4572  */
4573 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4574 {
4575         int ret;
4576         TDB_DATA data;
4577         int32_t res;
4578
4579         data.dsize = sizeof(lmasterrole);
4580         data.dptr  = (uint8_t *)&lmasterrole;
4581
4582         ret = ctdb_control(ctdb, destnode, 0, 
4583                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4584                            NULL, NULL, &res, &timeout, NULL);
4585         if (ret != 0 || res != 0) {
4586                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4587                 return -1;
4588         }
4589
4590         return 0;
4591 }
4592
4593 /*
4594   set the recmaster role for a node
4595  */
4596 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4597 {
4598         int ret;
4599         TDB_DATA data;
4600         int32_t res;
4601
4602         data.dsize = sizeof(recmasterrole);
4603         data.dptr  = (uint8_t *)&recmasterrole;
4604
4605         ret = ctdb_control(ctdb, destnode, 0, 
4606                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4607                            NULL, NULL, &res, &timeout, NULL);
4608         if (ret != 0 || res != 0) {
4609                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4610                 return -1;
4611         }
4612
4613         return 0;
4614 }
4615
4616 /* enable an eventscript
4617  */
4618 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4619 {
4620         int ret;
4621         TDB_DATA data;
4622         int32_t res;
4623
4624         data.dsize = strlen(script) + 1;
4625         data.dptr  = discard_const(script);
4626
4627         ret = ctdb_control(ctdb, destnode, 0, 
4628                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4629                            NULL, NULL, &res, &timeout, NULL);
4630         if (ret != 0 || res != 0) {
4631                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4632                 return -1;
4633         }
4634
4635         return 0;
4636 }
4637
4638 /* disable an eventscript
4639  */
4640 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4641 {
4642         int ret;
4643         TDB_DATA data;
4644         int32_t res;
4645
4646         data.dsize = strlen(script) + 1;
4647         data.dptr  = discard_const(script);
4648
4649         ret = ctdb_control(ctdb, destnode, 0, 
4650                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4651                            NULL, NULL, &res, &timeout, NULL);
4652         if (ret != 0 || res != 0) {
4653                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4654                 return -1;
4655         }
4656
4657         return 0;
4658 }
4659
4660
4661 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4662 {
4663         int ret;
4664         TDB_DATA data;
4665         int32_t res;
4666
4667         data.dsize = sizeof(*bantime);
4668         data.dptr  = (uint8_t *)bantime;
4669
4670         ret = ctdb_control(ctdb, destnode, 0, 
4671                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4672                            NULL, NULL, &res, &timeout, NULL);
4673         if (ret != 0 || res != 0) {
4674                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4675                 return -1;
4676         }
4677
4678         return 0;
4679 }
4680
4681
4682 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4683 {
4684         int ret;
4685         TDB_DATA outdata;
4686         int32_t res;
4687         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4688
4689         ret = ctdb_control(ctdb, destnode, 0, 
4690                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4691                            tmp_ctx, &outdata, &res, &timeout, NULL);
4692         if (ret != 0 || res != 0) {
4693                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4694                 talloc_free(tmp_ctx);
4695                 return -1;
4696         }
4697
4698         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4699         talloc_free(tmp_ctx);
4700
4701         return 0;
4702 }
4703
4704
4705 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4706 {
4707         int ret;
4708         int32_t res;
4709         TDB_DATA data;
4710         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4711
4712         data.dptr = (uint8_t*)db_prio;
4713         data.dsize = sizeof(*db_prio);
4714
4715         ret = ctdb_control(ctdb, destnode, 0, 
4716                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4717                            tmp_ctx, NULL, &res, &timeout, NULL);
4718         if (ret != 0 || res != 0) {
4719                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4720                 talloc_free(tmp_ctx);
4721                 return -1;
4722         }
4723
4724         talloc_free(tmp_ctx);
4725
4726         return 0;
4727 }
4728
4729 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4730 {
4731         int ret;
4732         int32_t res;
4733         TDB_DATA data;
4734         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4735
4736         data.dptr = (uint8_t*)&db_id;
4737         data.dsize = sizeof(db_id);
4738
4739         ret = ctdb_control(ctdb, destnode, 0, 
4740                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4741                            tmp_ctx, NULL, &res, &timeout, NULL);
4742         if (ret != 0 || res < 0) {
4743                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4744                 talloc_free(tmp_ctx);
4745                 return -1;
4746         }
4747
4748         if (priority) {
4749                 *priority = res;
4750         }
4751
4752         talloc_free(tmp_ctx);
4753
4754         return 0;
4755 }
4756
4757 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4758 {
4759         int ret;
4760         TDB_DATA outdata;
4761         int32_t res;
4762
4763         ret = ctdb_control(ctdb, destnode, 0, 
4764                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4765                            mem_ctx, &outdata, &res, &timeout, NULL);
4766         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4767                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4768                 return -1;
4769         }
4770
4771         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4772         talloc_free(outdata.dptr);
4773                     
4774         return 0;
4775 }
4776
4777 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4778 {
4779         if (h == NULL) {
4780                 return NULL;
4781         }
4782
4783         return &h->header;
4784 }
4785
4786
4787 struct ctdb_client_control_state *
4788 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4789 {
4790         struct ctdb_client_control_state *handle;
4791         struct ctdb_marshall_buffer *m;
4792         struct ctdb_rec_data *rec;
4793         TDB_DATA outdata;
4794
4795         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4796         if (m == NULL) {
4797                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4798                 return NULL;
4799         }
4800
4801         m->db_id = ctdb_db->db_id;
4802
4803         rec = ctdb_marshall_record(m, 0, key, header, data);
4804         if (rec == NULL) {
4805                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4806                 talloc_free(m);
4807                 return NULL;
4808         }
4809         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4810         if (m == NULL) {
4811                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4812                 talloc_free(m);
4813                 return NULL;
4814         }
4815         m->count++;
4816         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4817
4818
4819         outdata.dptr = (uint8_t *)m;
4820         outdata.dsize = talloc_get_size(m);
4821
4822         handle = ctdb_control_send(ctdb, destnode, 0, 
4823                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4824                            mem_ctx, &timeout, NULL);
4825         talloc_free(m);
4826         return handle;
4827 }
4828
4829 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4830 {
4831         int ret;
4832         int32_t res;
4833
4834         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4835         if ( (ret != 0) || (res != 0) ){
4836                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4837                 return -1;
4838         }
4839
4840         return 0;
4841 }
4842
4843 int
4844 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4845 {
4846         struct ctdb_client_control_state *state;
4847
4848         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4849         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4850 }
4851
4852
4853
4854
4855
4856
4857 /*
4858   set a database to be readonly
4859  */
4860 struct ctdb_client_control_state *
4861 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4862 {
4863         TDB_DATA data;
4864
4865         data.dptr = (uint8_t *)&dbid;
4866         data.dsize = sizeof(dbid);
4867
4868         return ctdb_control_send(ctdb, destnode, 0, 
4869                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4870                            ctdb, NULL, NULL);
4871 }
4872
4873 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4874 {
4875         int ret;
4876         int32_t res;
4877
4878         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4879         if (ret != 0 || res != 0) {
4880           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4881                 return -1;
4882         }
4883
4884         return 0;
4885 }
4886
4887 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4888 {
4889         struct ctdb_client_control_state *state;
4890
4891         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4892         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4893 }
4894
4895 /*
4896   set a database to be sticky
4897  */
4898 struct ctdb_client_control_state *
4899 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4900 {
4901         TDB_DATA data;
4902
4903         data.dptr = (uint8_t *)&dbid;
4904         data.dsize = sizeof(dbid);
4905
4906         return ctdb_control_send(ctdb, destnode, 0, 
4907                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4908                            ctdb, NULL, NULL);
4909 }
4910
4911 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4912 {
4913         int ret;
4914         int32_t res;
4915
4916         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4917         if (ret != 0 || res != 0) {
4918                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4919                 return -1;
4920         }
4921
4922         return 0;
4923 }
4924
4925 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4926 {
4927         struct ctdb_client_control_state *state;
4928
4929         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4930         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4931 }