ebd448ccbfda0b4ac970a86b487177ce56e1d868
[vlendec/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data && updatetdb) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
211                 exit(1);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
273                 close(ctdb->daemon.sd);
274                 ctdb->daemon.sd = -1;
275                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
276                 return -1;
277         }
278
279         set_nonblocking(ctdb->daemon.sd);
280         set_close_on_exec(ctdb->daemon.sd);
281         
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return call->status;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
372         if (ret != 0) {
373                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
374         }
375
376         return state;
377 }
378
379 /*
380   make a ctdb call to the local daemon - async send. Called from client context.
381
382   This constructs a ctdb_call request and queues it for processing. 
383   This call never blocks.
384 */
385 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
386                                               struct ctdb_call *call)
387 {
388         struct ctdb_client_call_state *state;
389         struct ctdb_context *ctdb = ctdb_db->ctdb;
390         struct ctdb_ltdb_header header;
391         TDB_DATA data;
392         int ret;
393         size_t len;
394         struct ctdb_req_call *c;
395
396         /* if the domain socket is not yet open, open it */
397         if (ctdb->daemon.sd==-1) {
398                 ctdb_socket_connect(ctdb);
399         }
400
401         ret = ctdb_ltdb_lock(ctdb_db, call->key);
402         if (ret != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
404                 return NULL;
405         }
406
407         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
408
409         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
410                 ret = -1;
411         }
412
413         if (ret == 0 && header.dmaster == ctdb->pnn) {
414                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
415                 talloc_free(data.dptr);
416                 ctdb_ltdb_unlock(ctdb_db, call->key);
417                 return state;
418         }
419
420         ctdb_ltdb_unlock(ctdb_db, call->key);
421         talloc_free(data.dptr);
422
423         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
424         if (state == NULL) {
425                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
426                 return NULL;
427         }
428         state->call = talloc_zero(state, struct ctdb_call);
429         if (state->call == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
431                 return NULL;
432         }
433
434         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
435         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
436         if (c == NULL) {
437                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
438                 return NULL;
439         }
440
441         state->reqid     = ctdb_reqid_new(ctdb, state);
442         state->ctdb_db = ctdb_db;
443         talloc_set_destructor(state, ctdb_client_call_destructor);
444
445         c->hdr.reqid     = state->reqid;
446         c->flags         = call->flags;
447         c->db_id         = ctdb_db->db_id;
448         c->callid        = call->call_id;
449         c->hopcount      = 0;
450         c->keylen        = call->key.dsize;
451         c->calldatalen   = call->call_data.dsize;
452         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
453         memcpy(&c->data[call->key.dsize], 
454                call->call_data.dptr, call->call_data.dsize);
455         *(state->call)              = *call;
456         state->call->call_data.dptr = &c->data[call->key.dsize];
457         state->call->key.dptr       = &c->data[0];
458
459         state->state  = CTDB_CALL_WAIT;
460
461
462         ctdb_client_queue_pkt(ctdb, &c->hdr);
463
464         return state;
465 }
466
467
468 /*
469   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
470 */
471 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
472 {
473         struct ctdb_client_call_state *state;
474
475         state = ctdb_call_send(ctdb_db, call);
476         return ctdb_call_recv(state, call);
477 }
478
479
480 /*
481   tell the daemon what messaging srvid we will use, and register the message
482   handler function in the client
483 */
484 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
485                              ctdb_msg_fn_t handler,
486                              void *private_data)
487                                     
488 {
489         int res;
490         int32_t status;
491         
492         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
493                            tdb_null, NULL, NULL, &status, NULL, NULL);
494         if (res != 0 || status != 0) {
495                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
496                 return -1;
497         }
498
499         /* also need to register the handler with our own ctdb structure */
500         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
501 }
502
503 /*
504   tell the daemon we no longer want a srvid
505 */
506 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
507 {
508         int res;
509         int32_t status;
510         
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
512                            tdb_null, NULL, NULL, &status, NULL, NULL);
513         if (res != 0 || status != 0) {
514                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
515                 return -1;
516         }
517
518         /* also need to register the handler with our own ctdb structure */
519         ctdb_deregister_message_handler(ctdb, srvid, private_data);
520         return 0;
521 }
522
523
524 /*
525   send a message - from client context
526  */
527 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
528                       uint64_t srvid, TDB_DATA data)
529 {
530         struct ctdb_req_message *r;
531         int len, res;
532
533         len = offsetof(struct ctdb_req_message, data) + data.dsize;
534         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
535                                len, struct ctdb_req_message);
536         CTDB_NO_MEMORY(ctdb, r);
537
538         r->hdr.destnode  = pnn;
539         r->srvid         = srvid;
540         r->datalen       = data.dsize;
541         memcpy(&r->data[0], data.dptr, data.dsize);
542         
543         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
544         talloc_free(r);
545         return res;
546 }
547
548
549 /*
550   cancel a ctdb_fetch_lock operation, releasing the lock
551  */
552 static int fetch_lock_destructor(struct ctdb_record_handle *h)
553 {
554         ctdb_ltdb_unlock(h->ctdb_db, h->key);
555         return 0;
556 }
557
558 /*
559   force the migration of a record to this node
560  */
561 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
562 {
563         struct ctdb_call call;
564         ZERO_STRUCT(call);
565         call.call_id = CTDB_NULL_FUNC;
566         call.key = key;
567         call.flags = CTDB_IMMEDIATE_MIGRATION;
568         return ctdb_call(ctdb_db, &call);
569 }
570
571 /*
572   try to fetch a readonly copy of a record
573  */
574 static int
575 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
576 {
577         int ret;
578
579         struct ctdb_call call;
580         ZERO_STRUCT(call);
581
582         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
583         call.call_data.dptr = NULL;
584         call.call_data.dsize = 0;
585         call.key = key;
586         call.flags = CTDB_WANT_READONLY;
587         ret = ctdb_call(ctdb_db, &call);
588
589         if (ret != 0) {
590                 return -1;
591         }
592         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
593                 return -1;
594         }
595
596         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
597         if (*hdr == NULL) {
598                 talloc_free(call.reply_data.dptr);
599                 return -1;
600         }
601
602         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
603         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
604         if (data->dptr == NULL) {
605                 talloc_free(call.reply_data.dptr);
606                 talloc_free(hdr);
607                 return -1;
608         }
609
610         return 0;
611 }
612
613 /*
614   get a lock on a record, and return the records data. Blocks until it gets the lock
615  */
616 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
617                                            TDB_DATA key, TDB_DATA *data)
618 {
619         int ret;
620         struct ctdb_record_handle *h;
621
622         /*
623           procedure is as follows:
624
625           1) get the chain lock. 
626           2) check if we are dmaster
627           3) if we are the dmaster then return handle 
628           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
629              reply from ctdbd
630           5) when we get the reply, goto (1)
631          */
632
633         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
634         if (h == NULL) {
635                 return NULL;
636         }
637
638         h->ctdb_db = ctdb_db;
639         h->key     = key;
640         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
641         if (h->key.dptr == NULL) {
642                 talloc_free(h);
643                 return NULL;
644         }
645         h->data    = data;
646
647         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
648                  (const char *)key.dptr));
649
650 again:
651         /* step 1 - get the chain lock */
652         ret = ctdb_ltdb_lock(ctdb_db, key);
653         if (ret != 0) {
654                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
655                 talloc_free(h);
656                 return NULL;
657         }
658
659         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
660
661         talloc_set_destructor(h, fetch_lock_destructor);
662
663         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
664
665         /* when torturing, ensure we test the remote path */
666         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
667             random() % 5 == 0) {
668                 h->header.dmaster = (uint32_t)-1;
669         }
670
671
672         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
673
674         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
675                 ctdb_ltdb_unlock(ctdb_db, key);
676                 ret = ctdb_client_force_migration(ctdb_db, key);
677                 if (ret != 0) {
678                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
679                         talloc_free(h);
680                         return NULL;
681                 }
682                 goto again;
683         }
684
685         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
686         return h;
687 }
688
689 /*
690   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
691  */
692 struct ctdb_record_handle *
693 ctdb_fetch_readonly_lock(
694         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
695         TDB_DATA key, TDB_DATA *data,
696         int read_only)
697 {
698         int ret;
699         struct ctdb_record_handle *h;
700         struct ctdb_ltdb_header *roheader = NULL;
701
702         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
703         if (h == NULL) {
704                 return NULL;
705         }
706
707         h->ctdb_db = ctdb_db;
708         h->key     = key;
709         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
710         if (h->key.dptr == NULL) {
711                 talloc_free(h);
712                 return NULL;
713         }
714         h->data    = data;
715
716         data->dptr = NULL;
717         data->dsize = 0;
718
719
720 again:
721         talloc_free(roheader);
722         roheader = NULL;
723
724         talloc_free(data->dptr);
725         data->dptr = NULL;
726         data->dsize = 0;
727
728         /* Lock the record/chain */
729         ret = ctdb_ltdb_lock(ctdb_db, key);
730         if (ret != 0) {
731                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
732                 talloc_free(h);
733                 return NULL;
734         }
735
736         talloc_set_destructor(h, fetch_lock_destructor);
737
738         /* Check if record exists yet in the TDB */
739         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
740         if (ret != 0) {
741                 ctdb_ltdb_unlock(ctdb_db, key);
742                 ret = ctdb_client_force_migration(ctdb_db, key);
743                 if (ret != 0) {
744                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
745                         talloc_free(h);
746                         return NULL;
747                 }
748                 goto again;
749         }
750
751         /* if this is a request for read/write and we have delegations
752            we have to revoke all delegations first
753         */
754         if ((read_only == 0) 
755         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
756         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
757                 ctdb_ltdb_unlock(ctdb_db, key);
758                 ret = ctdb_client_force_migration(ctdb_db, key);
759                 if (ret != 0) {
760                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
761                         talloc_free(h);
762                         return NULL;
763                 }
764                 goto again;
765         }
766
767         /* if we are dmaster, just return the handle */
768         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
769                 return h;
770         }
771
772         if (read_only != 0) {
773                 TDB_DATA rodata = {NULL, 0};
774
775                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
776                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
777                         return h;
778                 }
779
780                 ctdb_ltdb_unlock(ctdb_db, key);
781                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
782                 if (ret != 0) {
783                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
784                         ret = ctdb_client_force_migration(ctdb_db, key);
785                         if (ret != 0) {
786                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
787                                 talloc_free(h);
788                                 return NULL;
789                         }
790
791                         goto again;
792                 }
793
794                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
795                         ret = ctdb_client_force_migration(ctdb_db, key);
796                         if (ret != 0) {
797                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
798                                 talloc_free(h);
799                                 return NULL;
800                         }
801
802                         goto again;
803                 }
804
805                 ret = ctdb_ltdb_lock(ctdb_db, key);
806                 if (ret != 0) {
807                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
808                         talloc_free(h);
809                         return NULL;
810                 }
811
812                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
813                 if (ret != 0) {
814                         ctdb_ltdb_unlock(ctdb_db, key);
815
816                         ret = ctdb_client_force_migration(ctdb_db, key);
817                         if (ret != 0) {
818                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
819                                 talloc_free(h);
820                                 return NULL;
821                         }
822
823                         goto again;
824                 }
825
826                 return h;
827         }
828
829         /* we are not dmaster and this was not a request for a readonly lock
830          * so unlock the record, migrate it and try again
831          */
832         ctdb_ltdb_unlock(ctdb_db, key);
833         ret = ctdb_client_force_migration(ctdb_db, key);
834         if (ret != 0) {
835                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
836                 talloc_free(h);
837                 return NULL;
838         }
839         goto again;
840 }
841
842 /*
843   store some data to the record that was locked with ctdb_fetch_lock()
844 */
845 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
846 {
847         if (h->ctdb_db->persistent) {
848                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
849                 return -1;
850         }
851
852         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
853 }
854
855 /*
856   non-locking fetch of a record
857  */
858 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
859                TDB_DATA key, TDB_DATA *data)
860 {
861         struct ctdb_call call;
862         int ret;
863
864         call.call_id = CTDB_FETCH_FUNC;
865         call.call_data.dptr = NULL;
866         call.call_data.dsize = 0;
867         call.key = key;
868
869         ret = ctdb_call(ctdb_db, &call);
870
871         if (ret == 0) {
872                 *data = call.reply_data;
873                 talloc_steal(mem_ctx, data->dptr);
874         }
875
876         return ret;
877 }
878
879
880
881 /*
882    called when a control completes or timesout to invoke the callback
883    function the user provided
884 */
885 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
886         struct timeval t, void *private_data)
887 {
888         struct ctdb_client_control_state *state;
889         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
890         int ret;
891
892         state = talloc_get_type(private_data, struct ctdb_client_control_state);
893         talloc_steal(tmp_ctx, state);
894
895         ret = ctdb_control_recv(state->ctdb, state, state,
896                         NULL, 
897                         NULL, 
898                         NULL);
899         if (ret != 0) {
900                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
901         }
902
903         talloc_free(tmp_ctx);
904 }
905
906 /*
907   called when a CTDB_REPLY_CONTROL packet comes in in the client
908
909   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
910   contains any reply data from the control
911 */
912 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
913                                       struct ctdb_req_header *hdr)
914 {
915         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
916         struct ctdb_client_control_state *state;
917
918         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
919         if (state == NULL) {
920                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
921                 return;
922         }
923
924         if (hdr->reqid != state->reqid) {
925                 /* we found a record  but it was the wrong one */
926                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
927                 return;
928         }
929
930         state->outdata.dptr = c->data;
931         state->outdata.dsize = c->datalen;
932         state->status = c->status;
933         if (c->errorlen) {
934                 state->errormsg = talloc_strndup(state, 
935                                                  (char *)&c->data[c->datalen], 
936                                                  c->errorlen);
937         }
938
939         /* state->outdata now uses resources from c so we dont want c
940            to just dissappear from under us while state is still alive
941         */
942         talloc_steal(state, c);
943
944         state->state = CTDB_CONTROL_DONE;
945
946         /* if we had a callback registered for this control, pull the response
947            and call the callback.
948         */
949         if (state->async.fn) {
950                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
951         }
952 }
953
954
955 /*
956   destroy a ctdb_control in client
957 */
958 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
959 {
960         ctdb_reqid_remove(state->ctdb, state->reqid);
961         return 0;
962 }
963
964
965 /* time out handler for ctdb_control */
966 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
967         struct timeval t, void *private_data)
968 {
969         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
970
971         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
972                          "dstnode:%u\n", state->reqid, state->c->opcode,
973                          state->c->hdr.destnode));
974
975         state->state = CTDB_CONTROL_TIMEOUT;
976
977         /* if we had a callback registered for this control, pull the response
978            and call the callback.
979         */
980         if (state->async.fn) {
981                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
982         }
983 }
984
985 /* async version of send control request */
986 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
987                 uint32_t destnode, uint64_t srvid, 
988                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
989                 TALLOC_CTX *mem_ctx,
990                 struct timeval *timeout,
991                 char **errormsg)
992 {
993         struct ctdb_client_control_state *state;
994         size_t len;
995         struct ctdb_req_control *c;
996         int ret;
997
998         if (errormsg) {
999                 *errormsg = NULL;
1000         }
1001
1002         /* if the domain socket is not yet open, open it */
1003         if (ctdb->daemon.sd==-1) {
1004                 ctdb_socket_connect(ctdb);
1005         }
1006
1007         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1008         CTDB_NO_MEMORY_NULL(ctdb, state);
1009
1010         state->ctdb       = ctdb;
1011         state->reqid      = ctdb_reqid_new(ctdb, state);
1012         state->state      = CTDB_CONTROL_WAIT;
1013         state->errormsg   = NULL;
1014
1015         talloc_set_destructor(state, ctdb_client_control_destructor);
1016
1017         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1018         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1019                                len, struct ctdb_req_control);
1020         state->c            = c;        
1021         CTDB_NO_MEMORY_NULL(ctdb, c);
1022         c->hdr.reqid        = state->reqid;
1023         c->hdr.destnode     = destnode;
1024         c->opcode           = opcode;
1025         c->client_id        = 0;
1026         c->flags            = flags;
1027         c->srvid            = srvid;
1028         c->datalen          = data.dsize;
1029         if (data.dsize) {
1030                 memcpy(&c->data[0], data.dptr, data.dsize);
1031         }
1032
1033         /* timeout */
1034         if (timeout && !timeval_is_zero(timeout)) {
1035                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1036         }
1037
1038         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1039         if (ret != 0) {
1040                 talloc_free(state);
1041                 return NULL;
1042         }
1043
1044         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1045                 talloc_free(state);
1046                 return NULL;
1047         }
1048
1049         return state;
1050 }
1051
1052
1053 /* async version of receive control reply */
1054 int ctdb_control_recv(struct ctdb_context *ctdb, 
1055                 struct ctdb_client_control_state *state, 
1056                 TALLOC_CTX *mem_ctx,
1057                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1058 {
1059         TALLOC_CTX *tmp_ctx;
1060
1061         if (status != NULL) {
1062                 *status = -1;
1063         }
1064         if (errormsg != NULL) {
1065                 *errormsg = NULL;
1066         }
1067
1068         if (state == NULL) {
1069                 return -1;
1070         }
1071
1072         /* prevent double free of state */
1073         tmp_ctx = talloc_new(ctdb);
1074         talloc_steal(tmp_ctx, state);
1075
1076         /* loop one event at a time until we either timeout or the control
1077            completes.
1078         */
1079         while (state->state == CTDB_CONTROL_WAIT) {
1080                 event_loop_once(ctdb->ev);
1081         }
1082
1083         if (state->state != CTDB_CONTROL_DONE) {
1084                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1085                 if (state->async.fn) {
1086                         state->async.fn(state);
1087                 }
1088                 talloc_free(tmp_ctx);
1089                 return -1;
1090         }
1091
1092         if (state->errormsg) {
1093                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1094                 if (errormsg) {
1095                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1096                 }
1097                 if (state->async.fn) {
1098                         state->async.fn(state);
1099                 }
1100                 talloc_free(tmp_ctx);
1101                 return -1;
1102         }
1103
1104         if (outdata) {
1105                 *outdata = state->outdata;
1106                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1107         }
1108
1109         if (status) {
1110                 *status = state->status;
1111         }
1112
1113         if (state->async.fn) {
1114                 state->async.fn(state);
1115         }
1116
1117         talloc_free(tmp_ctx);
1118         return 0;
1119 }
1120
1121
1122
1123 /*
1124   send a ctdb control message
1125   timeout specifies how long we should wait for a reply.
1126   if timeout is NULL we wait indefinitely
1127  */
1128 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1129                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1130                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1131                  struct timeval *timeout,
1132                  char **errormsg)
1133 {
1134         struct ctdb_client_control_state *state;
1135
1136         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1137                         flags, data, mem_ctx,
1138                         timeout, errormsg);
1139
1140         /* FIXME: Error conditions in ctdb_control_send return NULL without
1141          * setting errormsg.  So, there is no way to distinguish between sucess
1142          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1143         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1144                 if (status != NULL) {
1145                         *status = 0;
1146                 }
1147                 return 0;
1148         }
1149
1150         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1151                         errormsg);
1152 }
1153
1154
1155
1156
1157 /*
1158   a process exists call. Returns 0 if process exists, -1 otherwise
1159  */
1160 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1161 {
1162         int ret;
1163         TDB_DATA data;
1164         int32_t status;
1165
1166         data.dptr = (uint8_t*)&pid;
1167         data.dsize = sizeof(pid);
1168
1169         ret = ctdb_control(ctdb, destnode, 0, 
1170                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1171                            NULL, NULL, &status, NULL, NULL);
1172         if (ret != 0) {
1173                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1174                 return -1;
1175         }
1176
1177         return status;
1178 }
1179
1180 /*
1181   get remote statistics
1182  */
1183 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1184 {
1185         int ret;
1186         TDB_DATA data;
1187         int32_t res;
1188
1189         ret = ctdb_control(ctdb, destnode, 0, 
1190                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1191                            ctdb, &data, &res, NULL, NULL);
1192         if (ret != 0 || res != 0) {
1193                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1194                 return -1;
1195         }
1196
1197         if (data.dsize != sizeof(struct ctdb_statistics)) {
1198                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1199                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1200                       return -1;
1201         }
1202
1203         *status = *(struct ctdb_statistics *)data.dptr;
1204         talloc_free(data.dptr);
1205                         
1206         return 0;
1207 }
1208
1209 /*
1210   shutdown a remote ctdb node
1211  */
1212 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1213 {
1214         struct ctdb_client_control_state *state;
1215
1216         state = ctdb_control_send(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1218                            NULL, &timeout, NULL);
1219         if (state == NULL) {
1220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1221                 return -1;
1222         }
1223
1224         return 0;
1225 }
1226
1227 /*
1228   get vnn map from a remote node
1229  */
1230 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1231 {
1232         int ret;
1233         TDB_DATA outdata;
1234         int32_t res;
1235         struct ctdb_vnn_map_wire *map;
1236
1237         ret = ctdb_control(ctdb, destnode, 0, 
1238                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1239                            mem_ctx, &outdata, &res, &timeout, NULL);
1240         if (ret != 0 || res != 0) {
1241                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1242                 return -1;
1243         }
1244         
1245         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1246         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1247             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1248                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1249                 return -1;
1250         }
1251
1252         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1253         CTDB_NO_MEMORY(ctdb, *vnnmap);
1254         (*vnnmap)->generation = map->generation;
1255         (*vnnmap)->size       = map->size;
1256         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1257
1258         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1259         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1260         talloc_free(outdata.dptr);
1261                     
1262         return 0;
1263 }
1264
1265
1266 /*
1267   get the recovery mode of a remote node
1268  */
1269 struct ctdb_client_control_state *
1270 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1271 {
1272         return ctdb_control_send(ctdb, destnode, 0, 
1273                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1274                            mem_ctx, &timeout, NULL);
1275 }
1276
1277 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1278 {
1279         int ret;
1280         int32_t res;
1281
1282         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1285                 return -1;
1286         }
1287
1288         if (recmode) {
1289                 *recmode = (uint32_t)res;
1290         }
1291
1292         return 0;
1293 }
1294
1295 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1296 {
1297         struct ctdb_client_control_state *state;
1298
1299         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1300         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1301 }
1302
1303
1304
1305
1306 /*
1307   set the recovery mode of a remote node
1308  */
1309 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1310 {
1311         int ret;
1312         TDB_DATA data;
1313         int32_t res;
1314
1315         data.dsize = sizeof(uint32_t);
1316         data.dptr = (unsigned char *)&recmode;
1317
1318         ret = ctdb_control(ctdb, destnode, 0, 
1319                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1320                            NULL, NULL, &res, &timeout, NULL);
1321         if (ret != 0 || res != 0) {
1322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1323                 return -1;
1324         }
1325
1326         return 0;
1327 }
1328
1329
1330
1331 /*
1332   get the recovery master of a remote node
1333  */
1334 struct ctdb_client_control_state *
1335 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1336                         struct timeval timeout, uint32_t destnode)
1337 {
1338         return ctdb_control_send(ctdb, destnode, 0, 
1339                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1340                            mem_ctx, &timeout, NULL);
1341 }
1342
1343 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1344 {
1345         int ret;
1346         int32_t res;
1347
1348         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1349         if (ret != 0) {
1350                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1351                 return -1;
1352         }
1353
1354         if (recmaster) {
1355                 *recmaster = (uint32_t)res;
1356         }
1357
1358         return 0;
1359 }
1360
1361 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1362 {
1363         struct ctdb_client_control_state *state;
1364
1365         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1366         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1367 }
1368
1369
1370 /*
1371   set the recovery master of a remote node
1372  */
1373 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1374 {
1375         int ret;
1376         TDB_DATA data;
1377         int32_t res;
1378
1379         ZERO_STRUCT(data);
1380         data.dsize = sizeof(uint32_t);
1381         data.dptr = (unsigned char *)&recmaster;
1382
1383         ret = ctdb_control(ctdb, destnode, 0, 
1384                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1385                            NULL, NULL, &res, &timeout, NULL);
1386         if (ret != 0 || res != 0) {
1387                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1388                 return -1;
1389         }
1390
1391         return 0;
1392 }
1393
1394
1395 /*
1396   get a list of databases off a remote node
1397  */
1398 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1399                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1400 {
1401         int ret;
1402         TDB_DATA outdata;
1403         int32_t res;
1404
1405         ret = ctdb_control(ctdb, destnode, 0, 
1406                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1407                            mem_ctx, &outdata, &res, &timeout, NULL);
1408         if (ret != 0 || res != 0) {
1409                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1410                 return -1;
1411         }
1412
1413         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1414         talloc_free(outdata.dptr);
1415                     
1416         return 0;
1417 }
1418
1419 /*
1420   get a list of nodes (vnn and flags ) from a remote node
1421  */
1422 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1423                 struct timeval timeout, uint32_t destnode, 
1424                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1425 {
1426         int ret;
1427         TDB_DATA outdata;
1428         int32_t res;
1429
1430         ret = ctdb_control(ctdb, destnode, 0, 
1431                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1432                            mem_ctx, &outdata, &res, &timeout, NULL);
1433         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1435                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1436         }
1437         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1438                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1439                 return -1;
1440         }
1441
1442         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1443         talloc_free(outdata.dptr);
1444                     
1445         return 0;
1446 }
1447
1448 /*
1449   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1450  */
1451 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1452                 struct timeval timeout, uint32_t destnode, 
1453                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1454 {
1455         int ret, i, len;
1456         TDB_DATA outdata;
1457         struct ctdb_node_mapv4 *nodemapv4;
1458         int32_t res;
1459
1460         ret = ctdb_control(ctdb, destnode, 0, 
1461                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1462                            mem_ctx, &outdata, &res, &timeout, NULL);
1463         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1464                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1465                 return -1;
1466         }
1467
1468         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1469
1470         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1471         (*nodemap) = talloc_zero_size(mem_ctx, len);
1472         CTDB_NO_MEMORY(ctdb, (*nodemap));
1473
1474         (*nodemap)->num = nodemapv4->num;
1475         for (i=0; i<nodemapv4->num; i++) {
1476                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1477                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1478                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1479                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1480         }
1481                 
1482         talloc_free(outdata.dptr);
1483                     
1484         return 0;
1485 }
1486
1487 /*
1488   drop the transport, reload the nodes file and restart the transport
1489  */
1490 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1491                     struct timeval timeout, uint32_t destnode)
1492 {
1493         int ret;
1494         int32_t res;
1495
1496         ret = ctdb_control(ctdb, destnode, 0, 
1497                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1498                            NULL, NULL, &res, &timeout, NULL);
1499         if (ret != 0 || res != 0) {
1500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1501                 return -1;
1502         }
1503
1504         return 0;
1505 }
1506
1507
1508 /*
1509   set vnn map on a node
1510  */
1511 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1512                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1513 {
1514         int ret;
1515         TDB_DATA data;
1516         int32_t res;
1517         struct ctdb_vnn_map_wire *map;
1518         size_t len;
1519
1520         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1521         map = talloc_size(mem_ctx, len);
1522         CTDB_NO_MEMORY(ctdb, map);
1523
1524         map->generation = vnnmap->generation;
1525         map->size = vnnmap->size;
1526         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1527         
1528         data.dsize = len;
1529         data.dptr  = (uint8_t *)map;
1530
1531         ret = ctdb_control(ctdb, destnode, 0, 
1532                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1533                            NULL, NULL, &res, &timeout, NULL);
1534         if (ret != 0 || res != 0) {
1535                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1536                 return -1;
1537         }
1538
1539         talloc_free(map);
1540
1541         return 0;
1542 }
1543
1544
1545 /*
1546   async send for pull database
1547  */
1548 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1549         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1550         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1551 {
1552         TDB_DATA indata;
1553         struct ctdb_control_pulldb *pull;
1554         struct ctdb_client_control_state *state;
1555
1556         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1557         CTDB_NO_MEMORY_NULL(ctdb, pull);
1558
1559         pull->db_id   = dbid;
1560         pull->lmaster = lmaster;
1561
1562         indata.dsize = sizeof(struct ctdb_control_pulldb);
1563         indata.dptr  = (unsigned char *)pull;
1564
1565         state = ctdb_control_send(ctdb, destnode, 0, 
1566                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1567                                   mem_ctx, &timeout, NULL);
1568         talloc_free(pull);
1569
1570         return state;
1571 }
1572
1573 /*
1574   async recv for pull database
1575  */
1576 int ctdb_ctrl_pulldb_recv(
1577         struct ctdb_context *ctdb, 
1578         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1579         TDB_DATA *outdata)
1580 {
1581         int ret;
1582         int32_t res;
1583
1584         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1585         if ( (ret != 0) || (res != 0) ){
1586                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1587                 return -1;
1588         }
1589
1590         return 0;
1591 }
1592
1593 /*
1594   pull all keys and records for a specific database on a node
1595  */
1596 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1597                 uint32_t dbid, uint32_t lmaster, 
1598                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1599                 TDB_DATA *outdata)
1600 {
1601         struct ctdb_client_control_state *state;
1602
1603         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1604                                       timeout);
1605         
1606         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1607 }
1608
1609
1610 /*
1611   change dmaster for all keys in the database to the new value
1612  */
1613 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1614                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1615 {
1616         int ret;
1617         TDB_DATA indata;
1618         int32_t res;
1619
1620         indata.dsize = 2*sizeof(uint32_t);
1621         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1622
1623         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1624         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1625
1626         ret = ctdb_control(ctdb, destnode, 0, 
1627                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1628                            NULL, NULL, &res, &timeout, NULL);
1629         if (ret != 0 || res != 0) {
1630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1631                 return -1;
1632         }
1633
1634         return 0;
1635 }
1636
1637 /*
1638   ping a node, return number of clients connected
1639  */
1640 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1641 {
1642         int ret;
1643         int32_t res;
1644
1645         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1646                            tdb_null, NULL, NULL, &res, NULL, NULL);
1647         if (ret != 0) {
1648                 return -1;
1649         }
1650         return res;
1651 }
1652
1653 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1654                            struct timeval timeout, 
1655                            uint32_t destnode,
1656                            uint32_t *runstate)
1657 {
1658         TDB_DATA outdata;
1659         int32_t res;
1660         int ret;
1661
1662         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1663                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1664         if (ret != 0 || res != 0) {
1665                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1666                 return ret != 0 ? ret : res;
1667         }
1668
1669         if (outdata.dsize != sizeof(uint32_t)) {
1670                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1671                 talloc_free(outdata.dptr);
1672                 return -1;
1673         }
1674
1675         if (runstate != NULL) {
1676                 *runstate = *(uint32_t *)outdata.dptr;
1677         }
1678         talloc_free(outdata.dptr);
1679
1680         return 0;
1681 }
1682
1683 /*
1684   find the real path to a ltdb 
1685  */
1686 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1687                    const char **path)
1688 {
1689         int ret;
1690         int32_t res;
1691         TDB_DATA data;
1692
1693         data.dptr = (uint8_t *)&dbid;
1694         data.dsize = sizeof(dbid);
1695
1696         ret = ctdb_control(ctdb, destnode, 0, 
1697                            CTDB_CONTROL_GETDBPATH, 0, data, 
1698                            mem_ctx, &data, &res, &timeout, NULL);
1699         if (ret != 0 || res != 0) {
1700                 return -1;
1701         }
1702
1703         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1704         if ((*path) == NULL) {
1705                 return -1;
1706         }
1707
1708         talloc_free(data.dptr);
1709
1710         return 0;
1711 }
1712
1713 /*
1714   find the name of a db 
1715  */
1716 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1717                    const char **name)
1718 {
1719         int ret;
1720         int32_t res;
1721         TDB_DATA data;
1722
1723         data.dptr = (uint8_t *)&dbid;
1724         data.dsize = sizeof(dbid);
1725
1726         ret = ctdb_control(ctdb, destnode, 0, 
1727                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1728                            mem_ctx, &data, &res, &timeout, NULL);
1729         if (ret != 0 || res != 0) {
1730                 return -1;
1731         }
1732
1733         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1734         if ((*name) == NULL) {
1735                 return -1;
1736         }
1737
1738         talloc_free(data.dptr);
1739
1740         return 0;
1741 }
1742
1743 /*
1744   get the health status of a db
1745  */
1746 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1747                           struct timeval timeout,
1748                           uint32_t destnode,
1749                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1750                           const char **reason)
1751 {
1752         int ret;
1753         int32_t res;
1754         TDB_DATA data;
1755
1756         data.dptr = (uint8_t *)&dbid;
1757         data.dsize = sizeof(dbid);
1758
1759         ret = ctdb_control(ctdb, destnode, 0,
1760                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1761                            mem_ctx, &data, &res, &timeout, NULL);
1762         if (ret != 0 || res != 0) {
1763                 return -1;
1764         }
1765
1766         if (data.dsize == 0) {
1767                 (*reason) = NULL;
1768                 return 0;
1769         }
1770
1771         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1772         if ((*reason) == NULL) {
1773                 return -1;
1774         }
1775
1776         talloc_free(data.dptr);
1777
1778         return 0;
1779 }
1780
1781 /*
1782   create a database
1783  */
1784 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1785                        TALLOC_CTX *mem_ctx, const char *name, uint32_t tdb_flags)
1786 {
1787         int ret;
1788         int32_t res;
1789         TDB_DATA data;
1790         bool persistent;
1791
1792         data.dptr = discard_const(name);
1793         data.dsize = strlen(name)+1;
1794
1795         persistent = (tdb_flags & CTDB_DB_FLAGS_PERSISTENT);
1796         ret = ctdb_control(ctdb, destnode, 0,
1797                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1798                            tdb_flags, data,
1799                            mem_ctx, &data, &res, &timeout, NULL);
1800
1801         if (ret != 0 || res != 0) {
1802                 return -1;
1803         }
1804
1805         return 0;
1806 }
1807
1808 /*
1809   get debug level on a node
1810  */
1811 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1812 {
1813         int ret;
1814         int32_t res;
1815         TDB_DATA data;
1816
1817         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1818                            ctdb, &data, &res, NULL, NULL);
1819         if (ret != 0 || res != 0) {
1820                 return -1;
1821         }
1822         if (data.dsize != sizeof(int32_t)) {
1823                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1824                          (unsigned)data.dsize));
1825                 return -1;
1826         }
1827         *level = *(int32_t *)data.dptr;
1828         talloc_free(data.dptr);
1829         return 0;
1830 }
1831
1832 /*
1833   set debug level on a node
1834  */
1835 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1836 {
1837         int ret;
1838         int32_t res;
1839         TDB_DATA data;
1840
1841         data.dptr = (uint8_t *)&level;
1842         data.dsize = sizeof(level);
1843
1844         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1845                            NULL, NULL, &res, NULL, NULL);
1846         if (ret != 0 || res != 0) {
1847                 return -1;
1848         }
1849         return 0;
1850 }
1851
1852
1853 /*
1854   get a list of connected nodes
1855  */
1856 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1857                                 struct timeval timeout,
1858                                 TALLOC_CTX *mem_ctx,
1859                                 uint32_t *num_nodes)
1860 {
1861         struct ctdb_node_map *map=NULL;
1862         int ret, i;
1863         uint32_t *nodes;
1864
1865         *num_nodes = 0;
1866
1867         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1868         if (ret != 0) {
1869                 return NULL;
1870         }
1871
1872         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1873         if (nodes == NULL) {
1874                 return NULL;
1875         }
1876
1877         for (i=0;i<map->num;i++) {
1878                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1879                         nodes[*num_nodes] = map->nodes[i].pnn;
1880                         (*num_nodes)++;
1881                 }
1882         }
1883
1884         return nodes;
1885 }
1886
1887
1888 /*
1889   reset remote status
1890  */
1891 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1892 {
1893         int ret;
1894         int32_t res;
1895
1896         ret = ctdb_control(ctdb, destnode, 0, 
1897                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1898                            NULL, NULL, &res, NULL, NULL);
1899         if (ret != 0 || res != 0) {
1900                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1901                 return -1;
1902         }
1903         return 0;
1904 }
1905
1906 /*
1907   attach to a specific database - client call
1908 */
1909 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1910                                     struct timeval timeout,
1911                                     const char *name,
1912                                     bool persistent,
1913                                     uint32_t tdb_flags)
1914 {
1915         struct ctdb_db_context *ctdb_db;
1916         TDB_DATA data;
1917         int ret;
1918         int32_t res;
1919
1920         ctdb_db = ctdb_db_handle(ctdb, name);
1921         if (ctdb_db) {
1922                 return ctdb_db;
1923         }
1924
1925         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1926         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1927
1928         ctdb_db->ctdb = ctdb;
1929         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1930         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1931
1932         data.dptr = discard_const(name);
1933         data.dsize = strlen(name)+1;
1934
1935         /* CTDB has switched to using jenkins hash for volatile databases.
1936          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
1937          * always set it.
1938          */
1939         if (!persistent) {
1940                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
1941         }
1942
1943         /* tell ctdb daemon to attach */
1944         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1945                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1946                            0, data, ctdb_db, &data, &res, NULL, NULL);
1947         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1948                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1949                 talloc_free(ctdb_db);
1950                 return NULL;
1951         }
1952         
1953         ctdb_db->db_id = *(uint32_t *)data.dptr;
1954         talloc_free(data.dptr);
1955
1956         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1957         if (ret != 0) {
1958                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1959                 talloc_free(ctdb_db);
1960                 return NULL;
1961         }
1962
1963         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1964         if (ctdb->valgrinding) {
1965                 tdb_flags |= TDB_NOMMAP;
1966         }
1967         tdb_flags |= TDB_DISALLOW_NESTING;
1968
1969         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1970         if (ctdb_db->ltdb == NULL) {
1971                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1972                 talloc_free(ctdb_db);
1973                 return NULL;
1974         }
1975
1976         ctdb_db->persistent = persistent;
1977
1978         DLIST_ADD(ctdb->db_list, ctdb_db);
1979
1980         /* add well known functions */
1981         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1982         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1983         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1984
1985         return ctdb_db;
1986 }
1987
1988
1989 /*
1990   setup a call for a database
1991  */
1992 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1993 {
1994         struct ctdb_registered_call *call;
1995
1996 #if 0
1997         TDB_DATA data;
1998         int32_t status;
1999         struct ctdb_control_set_call c;
2000         int ret;
2001
2002         /* this is no longer valid with the separate daemon architecture */
2003         c.db_id = ctdb_db->db_id;
2004         c.fn    = fn;
2005         c.id    = id;
2006
2007         data.dptr = (uint8_t *)&c;
2008         data.dsize = sizeof(c);
2009
2010         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2011                            data, NULL, NULL, &status, NULL, NULL);
2012         if (ret != 0 || status != 0) {
2013                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2014                 return -1;
2015         }
2016 #endif
2017
2018         /* also register locally */
2019         call = talloc(ctdb_db, struct ctdb_registered_call);
2020         call->fn = fn;
2021         call->id = id;
2022
2023         DLIST_ADD(ctdb_db->calls, call);        
2024         return 0;
2025 }
2026
2027
2028 struct traverse_state {
2029         bool done;
2030         uint32_t count;
2031         ctdb_traverse_func fn;
2032         void *private_data;
2033         bool listemptyrecords;
2034 };
2035
2036 /*
2037   called on each key during a ctdb_traverse
2038  */
2039 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2040 {
2041         struct traverse_state *state = (struct traverse_state *)p;
2042         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2043         TDB_DATA key;
2044
2045         if (data.dsize < sizeof(uint32_t) ||
2046             d->length != data.dsize) {
2047                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2048                 state->done = true;
2049                 return;
2050         }
2051
2052         key.dsize = d->keylen;
2053         key.dptr  = &d->data[0];
2054         data.dsize = d->datalen;
2055         data.dptr = &d->data[d->keylen];
2056
2057         if (key.dsize == 0 && data.dsize == 0) {
2058                 /* end of traverse */
2059                 state->done = true;
2060                 return;
2061         }
2062
2063         if (!state->listemptyrecords &&
2064             data.dsize == sizeof(struct ctdb_ltdb_header))
2065         {
2066                 /* empty records are deleted records in ctdb */
2067                 return;
2068         }
2069
2070         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2071                 state->done = true;
2072         }
2073
2074         state->count++;
2075 }
2076
2077 /**
2078  * start a cluster wide traverse, calling the supplied fn on each record
2079  * return the number of records traversed, or -1 on error
2080  *
2081  * Extendet variant with a flag to signal whether empty records should
2082  * be listed.
2083  */
2084 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2085                              ctdb_traverse_func fn,
2086                              bool withemptyrecords,
2087                              void *private_data)
2088 {
2089         TDB_DATA data;
2090         struct ctdb_traverse_start_ext t;
2091         int32_t status;
2092         int ret;
2093         uint64_t srvid = (getpid() | 0xFLL<<60);
2094         struct traverse_state state;
2095
2096         state.done = false;
2097         state.count = 0;
2098         state.private_data = private_data;
2099         state.fn = fn;
2100         state.listemptyrecords = withemptyrecords;
2101
2102         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2103         if (ret != 0) {
2104                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2105                 return -1;
2106         }
2107
2108         t.db_id = ctdb_db->db_id;
2109         t.srvid = srvid;
2110         t.reqid = 0;
2111         t.withemptyrecords = withemptyrecords;
2112
2113         data.dptr = (uint8_t *)&t;
2114         data.dsize = sizeof(t);
2115
2116         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2117                            data, NULL, NULL, &status, NULL, NULL);
2118         if (ret != 0 || status != 0) {
2119                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2120                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2121                 return -1;
2122         }
2123
2124         while (!state.done) {
2125                 event_loop_once(ctdb_db->ctdb->ev);
2126         }
2127
2128         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2129         if (ret != 0) {
2130                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2131                 return -1;
2132         }
2133
2134         return state.count;
2135 }
2136
2137 /**
2138  * start a cluster wide traverse, calling the supplied fn on each record
2139  * return the number of records traversed, or -1 on error
2140  *
2141  * Standard version which does not list the empty records:
2142  * These are considered deleted.
2143  */
2144 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2145 {
2146         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2147 }
2148
2149 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2150 /*
2151   called on each key during a catdb
2152  */
2153 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2154 {
2155         int i;
2156         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2157         FILE *f = c->f;
2158         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2159
2160         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2161         for (i=0;i<key.dsize;i++) {
2162                 if (ISASCII(key.dptr[i])) {
2163                         fprintf(f, "%c", key.dptr[i]);
2164                 } else {
2165                         fprintf(f, "\\%02X", key.dptr[i]);
2166                 }
2167         }
2168         fprintf(f, "\"\n");
2169
2170         fprintf(f, "dmaster: %u\n", h->dmaster);
2171         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2172
2173         if (c->printlmaster && ctdb->vnn_map != NULL) {
2174                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2175         }
2176
2177         if (c->printhash) {
2178                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2179         }
2180
2181         if (c->printrecordflags) {
2182                 fprintf(f, "flags: 0x%08x", h->flags);
2183                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2184                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2185                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2186                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2187                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2188                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2189                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2190                 fprintf(f, "\n");
2191         }
2192
2193         if (c->printdatasize) {
2194                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2195         } else {
2196                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2197                 for (i=sizeof(*h);i<data.dsize;i++) {
2198                         if (ISASCII(data.dptr[i])) {
2199                                 fprintf(f, "%c", data.dptr[i]);
2200                         } else {
2201                                 fprintf(f, "\\%02X", data.dptr[i]);
2202                         }
2203                 }
2204                 fprintf(f, "\"\n");
2205         }
2206
2207         fprintf(f, "\n");
2208
2209         return 0;
2210 }
2211
2212 /*
2213   convenience function to list all keys to stdout
2214  */
2215 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2216                  struct ctdb_dump_db_context *ctx)
2217 {
2218         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2219                                  ctx->printemptyrecords, ctx);
2220 }
2221
2222 /*
2223   get the pid of a ctdb daemon
2224  */
2225 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2226 {
2227         int ret;
2228         int32_t res;
2229
2230         ret = ctdb_control(ctdb, destnode, 0, 
2231                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2232                            NULL, NULL, &res, &timeout, NULL);
2233         if (ret != 0) {
2234                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2235                 return -1;
2236         }
2237
2238         *pid = res;
2239
2240         return 0;
2241 }
2242
2243
2244 /*
2245   async freeze send control
2246  */
2247 struct ctdb_client_control_state *
2248 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2249 {
2250         return ctdb_control_send(ctdb, destnode, priority, 
2251                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2252                            mem_ctx, &timeout, NULL);
2253 }
2254
2255 /* 
2256    async freeze recv control
2257 */
2258 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2259 {
2260         int ret;
2261         int32_t res;
2262
2263         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2264         if ( (ret != 0) || (res != 0) ){
2265                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2266                 return -1;
2267         }
2268
2269         return 0;
2270 }
2271
2272 /*
2273   freeze databases of a certain priority
2274  */
2275 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2276 {
2277         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2278         struct ctdb_client_control_state *state;
2279         int ret;
2280
2281         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2282         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2283         talloc_free(tmp_ctx);
2284
2285         return ret;
2286 }
2287
2288 /* Freeze all databases */
2289 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2290 {
2291         int i;
2292
2293         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2294                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2295                         return -1;
2296                 }
2297         }
2298         return 0;
2299 }
2300
2301 /*
2302   thaw databases of a certain priority
2303  */
2304 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2305 {
2306         int ret;
2307         int32_t res;
2308
2309         ret = ctdb_control(ctdb, destnode, priority, 
2310                            CTDB_CONTROL_THAW, 0, tdb_null, 
2311                            NULL, NULL, &res, &timeout, NULL);
2312         if (ret != 0 || res != 0) {
2313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2314                 return -1;
2315         }
2316
2317         return 0;
2318 }
2319
2320 /* thaw all databases */
2321 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2322 {
2323         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2324 }
2325
2326 /*
2327   get pnn of a node, or -1
2328  */
2329 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2330 {
2331         int ret;
2332         int32_t res;
2333
2334         ret = ctdb_control(ctdb, destnode, 0, 
2335                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2336                            NULL, NULL, &res, &timeout, NULL);
2337         if (ret != 0) {
2338                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2339                 return -1;
2340         }
2341
2342         return res;
2343 }
2344
2345 /*
2346   get the monitoring mode of a remote node
2347  */
2348 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2349 {
2350         int ret;
2351         int32_t res;
2352
2353         ret = ctdb_control(ctdb, destnode, 0, 
2354                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2355                            NULL, NULL, &res, &timeout, NULL);
2356         if (ret != 0) {
2357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2358                 return -1;
2359         }
2360
2361         *monmode = res;
2362
2363         return 0;
2364 }
2365
2366
2367 /*
2368  set the monitoring mode of a remote node to active
2369  */
2370 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2371 {
2372         int ret;
2373         
2374
2375         ret = ctdb_control(ctdb, destnode, 0, 
2376                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2377                            NULL, NULL,NULL, &timeout, NULL);
2378         if (ret != 0) {
2379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2380                 return -1;
2381         }
2382
2383         
2384
2385         return 0;
2386 }
2387
2388 /*
2389   set the monitoring mode of a remote node to disable
2390  */
2391 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2392 {
2393         int ret;
2394         
2395
2396         ret = ctdb_control(ctdb, destnode, 0, 
2397                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2398                            NULL, NULL, NULL, &timeout, NULL);
2399         if (ret != 0) {
2400                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2401                 return -1;
2402         }
2403
2404         
2405
2406         return 0;
2407 }
2408
2409
2410
2411 /* 
2412   sent to a node to make it take over an ip address
2413 */
2414 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2415                           uint32_t destnode, struct ctdb_public_ip *ip)
2416 {
2417         TDB_DATA data;
2418         struct ctdb_public_ipv4 ipv4;
2419         int ret;
2420         int32_t res;
2421
2422         if (ip->addr.sa.sa_family == AF_INET) {
2423                 ipv4.pnn = ip->pnn;
2424                 ipv4.sin = ip->addr.ip;
2425
2426                 data.dsize = sizeof(ipv4);
2427                 data.dptr  = (uint8_t *)&ipv4;
2428
2429                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2430                            NULL, &res, &timeout, NULL);
2431         } else {
2432                 data.dsize = sizeof(*ip);
2433                 data.dptr  = (uint8_t *)ip;
2434
2435                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2436                            NULL, &res, &timeout, NULL);
2437         }
2438
2439         if (ret != 0 || res != 0) {
2440                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2441                 return -1;
2442         }
2443
2444         return 0;       
2445 }
2446
2447
2448 /* 
2449   sent to a node to make it release an ip address
2450 */
2451 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2452                          uint32_t destnode, struct ctdb_public_ip *ip)
2453 {
2454         TDB_DATA data;
2455         struct ctdb_public_ipv4 ipv4;
2456         int ret;
2457         int32_t res;
2458
2459         if (ip->addr.sa.sa_family == AF_INET) {
2460                 ipv4.pnn = ip->pnn;
2461                 ipv4.sin = ip->addr.ip;
2462
2463                 data.dsize = sizeof(ipv4);
2464                 data.dptr  = (uint8_t *)&ipv4;
2465
2466                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2467                                    NULL, &res, &timeout, NULL);
2468         } else {
2469                 data.dsize = sizeof(*ip);
2470                 data.dptr  = (uint8_t *)ip;
2471
2472                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2473                                    NULL, &res, &timeout, NULL);
2474         }
2475
2476         if (ret != 0 || res != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2478                 return -1;
2479         }
2480
2481         return 0;       
2482 }
2483
2484
2485 /*
2486   get a tunable
2487  */
2488 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2489                           struct timeval timeout, 
2490                           uint32_t destnode,
2491                           const char *name, uint32_t *value)
2492 {
2493         struct ctdb_control_get_tunable *t;
2494         TDB_DATA data, outdata;
2495         int32_t res;
2496         int ret;
2497
2498         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2499         data.dptr  = talloc_size(ctdb, data.dsize);
2500         CTDB_NO_MEMORY(ctdb, data.dptr);
2501
2502         t = (struct ctdb_control_get_tunable *)data.dptr;
2503         t->length = strlen(name)+1;
2504         memcpy(t->name, name, t->length);
2505
2506         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2507                            &outdata, &res, &timeout, NULL);
2508         talloc_free(data.dptr);
2509         if (ret != 0 || res != 0) {
2510                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2511                 return ret != 0 ? ret : res;
2512         }
2513
2514         if (outdata.dsize != sizeof(uint32_t)) {
2515                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2516                 talloc_free(outdata.dptr);
2517                 return -1;
2518         }
2519         
2520         *value = *(uint32_t *)outdata.dptr;
2521         talloc_free(outdata.dptr);
2522
2523         return 0;
2524 }
2525
2526 /*
2527   set a tunable
2528  */
2529 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2530                           struct timeval timeout, 
2531                           uint32_t destnode,
2532                           const char *name, uint32_t value)
2533 {
2534         struct ctdb_control_set_tunable *t;
2535         TDB_DATA data;
2536         int32_t res;
2537         int ret;
2538
2539         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2540         data.dptr  = talloc_size(ctdb, data.dsize);
2541         CTDB_NO_MEMORY(ctdb, data.dptr);
2542
2543         t = (struct ctdb_control_set_tunable *)data.dptr;
2544         t->length = strlen(name)+1;
2545         memcpy(t->name, name, t->length);
2546         t->value = value;
2547
2548         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2549                            NULL, &res, &timeout, NULL);
2550         talloc_free(data.dptr);
2551         if (ret != 0 || res != 0) {
2552                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2553                 return -1;
2554         }
2555
2556         return 0;
2557 }
2558
2559 /*
2560   list tunables
2561  */
2562 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2563                             struct timeval timeout, 
2564                             uint32_t destnode,
2565                             TALLOC_CTX *mem_ctx,
2566                             const char ***list, uint32_t *count)
2567 {
2568         TDB_DATA outdata;
2569         int32_t res;
2570         int ret;
2571         struct ctdb_control_list_tunable *t;
2572         char *p, *s, *ptr;
2573
2574         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2575                            mem_ctx, &outdata, &res, &timeout, NULL);
2576         if (ret != 0 || res != 0) {
2577                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2578                 return -1;
2579         }
2580
2581         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2582         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2583             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2584                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2585                 talloc_free(outdata.dptr);
2586                 return -1;              
2587         }
2588         
2589         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2590         CTDB_NO_MEMORY(ctdb, p);
2591
2592         talloc_free(outdata.dptr);
2593         
2594         (*list) = NULL;
2595         (*count) = 0;
2596
2597         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2598                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2599                 CTDB_NO_MEMORY(ctdb, *list);
2600                 (*list)[*count] = talloc_strdup(*list, s);
2601                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2602                 (*count)++;
2603         }
2604
2605         talloc_free(p);
2606
2607         return 0;
2608 }
2609
2610
2611 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2612                                    struct timeval timeout, uint32_t destnode,
2613                                    TALLOC_CTX *mem_ctx,
2614                                    uint32_t flags,
2615                                    struct ctdb_all_public_ips **ips)
2616 {
2617         int ret;
2618         TDB_DATA outdata;
2619         int32_t res;
2620
2621         ret = ctdb_control(ctdb, destnode, 0, 
2622                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2623                            mem_ctx, &outdata, &res, &timeout, NULL);
2624         if (ret == 0 && res == -1) {
2625                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2626                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2627         }
2628         if (ret != 0 || res != 0) {
2629           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2630                 return -1;
2631         }
2632
2633         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2634         talloc_free(outdata.dptr);
2635                     
2636         return 0;
2637 }
2638
2639 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2640                              struct timeval timeout, uint32_t destnode,
2641                              TALLOC_CTX *mem_ctx,
2642                              struct ctdb_all_public_ips **ips)
2643 {
2644         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2645                                               destnode, mem_ctx,
2646                                               0, ips);
2647 }
2648
2649 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2650                         struct timeval timeout, uint32_t destnode, 
2651                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2652 {
2653         int ret, i, len;
2654         TDB_DATA outdata;
2655         int32_t res;
2656         struct ctdb_all_public_ipsv4 *ipsv4;
2657
2658         ret = ctdb_control(ctdb, destnode, 0, 
2659                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2660                            mem_ctx, &outdata, &res, &timeout, NULL);
2661         if (ret != 0 || res != 0) {
2662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2663                 return -1;
2664         }
2665
2666         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2667         len = offsetof(struct ctdb_all_public_ips, ips) +
2668                 ipsv4->num*sizeof(struct ctdb_public_ip);
2669         *ips = talloc_zero_size(mem_ctx, len);
2670         CTDB_NO_MEMORY(ctdb, *ips);
2671         (*ips)->num = ipsv4->num;
2672         for (i=0; i<ipsv4->num; i++) {
2673                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2674                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2675         }
2676
2677         talloc_free(outdata.dptr);
2678                     
2679         return 0;
2680 }
2681
2682 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2683                                  struct timeval timeout, uint32_t destnode,
2684                                  TALLOC_CTX *mem_ctx,
2685                                  const ctdb_sock_addr *addr,
2686                                  struct ctdb_control_public_ip_info **_info)
2687 {
2688         int ret;
2689         TDB_DATA indata;
2690         TDB_DATA outdata;
2691         int32_t res;
2692         struct ctdb_control_public_ip_info *info;
2693         uint32_t len;
2694         uint32_t i;
2695
2696         indata.dptr = discard_const_p(uint8_t, addr);
2697         indata.dsize = sizeof(*addr);
2698
2699         ret = ctdb_control(ctdb, destnode, 0,
2700                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2701                            mem_ctx, &outdata, &res, &timeout, NULL);
2702         if (ret != 0 || res != 0) {
2703                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2704                                 "failed ret:%d res:%d\n",
2705                                 ret, res));
2706                 return -1;
2707         }
2708
2709         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2710         if (len > outdata.dsize) {
2711                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2712                                 "returned invalid data with size %u > %u\n",
2713                                 (unsigned int)outdata.dsize,
2714                                 (unsigned int)len));
2715                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2716                 return -1;
2717         }
2718
2719         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2720         len += info->num*sizeof(struct ctdb_control_iface_info);
2721
2722         if (len > outdata.dsize) {
2723                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2724                                 "returned invalid data with size %u > %u\n",
2725                                 (unsigned int)outdata.dsize,
2726                                 (unsigned int)len));
2727                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2728                 return -1;
2729         }
2730
2731         /* make sure we null terminate the returned strings */
2732         for (i=0; i < info->num; i++) {
2733                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2734         }
2735
2736         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2737                                                                 outdata.dptr,
2738                                                                 outdata.dsize);
2739         talloc_free(outdata.dptr);
2740         if (*_info == NULL) {
2741                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2742                                 "talloc_memdup size %u failed\n",
2743                                 (unsigned int)outdata.dsize));
2744                 return -1;
2745         }
2746
2747         return 0;
2748 }
2749
2750 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2751                          struct timeval timeout, uint32_t destnode,
2752                          TALLOC_CTX *mem_ctx,
2753                          struct ctdb_control_get_ifaces **_ifaces)
2754 {
2755         int ret;
2756         TDB_DATA outdata;
2757         int32_t res;
2758         struct ctdb_control_get_ifaces *ifaces;
2759         uint32_t len;
2760         uint32_t i;
2761
2762         ret = ctdb_control(ctdb, destnode, 0,
2763                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2764                            mem_ctx, &outdata, &res, &timeout, NULL);
2765         if (ret != 0 || res != 0) {
2766                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2767                                 "failed ret:%d res:%d\n",
2768                                 ret, res));
2769                 return -1;
2770         }
2771
2772         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2773         if (len > outdata.dsize) {
2774                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2775                                 "returned invalid data with size %u > %u\n",
2776                                 (unsigned int)outdata.dsize,
2777                                 (unsigned int)len));
2778                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2779                 return -1;
2780         }
2781
2782         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2783         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2784
2785         if (len > outdata.dsize) {
2786                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2787                                 "returned invalid data with size %u > %u\n",
2788                                 (unsigned int)outdata.dsize,
2789                                 (unsigned int)len));
2790                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2791                 return -1;
2792         }
2793
2794         /* make sure we null terminate the returned strings */
2795         for (i=0; i < ifaces->num; i++) {
2796                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2797         }
2798
2799         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2800                                                                   outdata.dptr,
2801                                                                   outdata.dsize);
2802         talloc_free(outdata.dptr);
2803         if (*_ifaces == NULL) {
2804                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2805                                 "talloc_memdup size %u failed\n",
2806                                 (unsigned int)outdata.dsize));
2807                 return -1;
2808         }
2809
2810         return 0;
2811 }
2812
2813 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2814                              struct timeval timeout, uint32_t destnode,
2815                              TALLOC_CTX *mem_ctx,
2816                              const struct ctdb_control_iface_info *info)
2817 {
2818         int ret;
2819         TDB_DATA indata;
2820         int32_t res;
2821
2822         indata.dptr = discard_const_p(uint8_t, info);
2823         indata.dsize = sizeof(*info);
2824
2825         ret = ctdb_control(ctdb, destnode, 0,
2826                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2827                            mem_ctx, NULL, &res, &timeout, NULL);
2828         if (ret != 0 || res != 0) {
2829                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2830                                 "failed ret:%d res:%d\n",
2831                                 ret, res));
2832                 return -1;
2833         }
2834
2835         return 0;
2836 }
2837
2838 /*
2839   set/clear the permanent disabled bit on a remote node
2840  */
2841 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2842                        uint32_t set, uint32_t clear)
2843 {
2844         int ret;
2845         TDB_DATA data;
2846         struct ctdb_node_map *nodemap=NULL;
2847         struct ctdb_node_flag_change c;
2848         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2849         uint32_t recmaster;
2850         uint32_t *nodes;
2851
2852
2853         /* find the recovery master */
2854         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2855         if (ret != 0) {
2856                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2857                 talloc_free(tmp_ctx);
2858                 return ret;
2859         }
2860
2861
2862         /* read the node flags from the recmaster */
2863         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2864         if (ret != 0) {
2865                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2866                 talloc_free(tmp_ctx);
2867                 return -1;
2868         }
2869         if (destnode >= nodemap->num) {
2870                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2871                 talloc_free(tmp_ctx);
2872                 return -1;
2873         }
2874
2875         c.pnn       = destnode;
2876         c.old_flags = nodemap->nodes[destnode].flags;
2877         c.new_flags = c.old_flags;
2878         c.new_flags |= set;
2879         c.new_flags &= ~clear;
2880
2881         data.dsize = sizeof(c);
2882         data.dptr = (unsigned char *)&c;
2883
2884         /* send the flags update to all connected nodes */
2885         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2886
2887         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2888                                         nodes, 0,
2889                                         timeout, false, data,
2890                                         NULL, NULL,
2891                                         NULL) != 0) {
2892                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2893
2894                 talloc_free(tmp_ctx);
2895                 return -1;
2896         }
2897
2898         talloc_free(tmp_ctx);
2899         return 0;
2900 }
2901
2902
2903 /*
2904   get all tunables
2905  */
2906 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2907                                struct timeval timeout, 
2908                                uint32_t destnode,
2909                                struct ctdb_tunable *tunables)
2910 {
2911         TDB_DATA outdata;
2912         int ret;
2913         int32_t res;
2914
2915         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2916                            &outdata, &res, &timeout, NULL);
2917         if (ret != 0 || res != 0) {
2918                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2919                 return -1;
2920         }
2921
2922         if (outdata.dsize != sizeof(*tunables)) {
2923                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2924                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2925                 return -1;              
2926         }
2927
2928         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2929         talloc_free(outdata.dptr);
2930         return 0;
2931 }
2932
2933 /*
2934   add a public address to a node
2935  */
2936 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2937                       struct timeval timeout, 
2938                       uint32_t destnode,
2939                       struct ctdb_control_ip_iface *pub)
2940 {
2941         TDB_DATA data;
2942         int32_t res;
2943         int ret;
2944
2945         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2946         data.dptr  = (unsigned char *)pub;
2947
2948         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2949                            NULL, &res, &timeout, NULL);
2950         if (ret != 0 || res != 0) {
2951                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2952                 return -1;
2953         }
2954
2955         return 0;
2956 }
2957
2958 /*
2959   delete a public address from a node
2960  */
2961 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2962                       struct timeval timeout, 
2963                       uint32_t destnode,
2964                       struct ctdb_control_ip_iface *pub)
2965 {
2966         TDB_DATA data;
2967         int32_t res;
2968         int ret;
2969
2970         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2971         data.dptr  = (unsigned char *)pub;
2972
2973         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2974                            NULL, &res, &timeout, NULL);
2975         if (ret != 0 || res != 0) {
2976                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2977                 return -1;
2978         }
2979
2980         return 0;
2981 }
2982
2983 /*
2984   kill a tcp connection
2985  */
2986 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2987                       struct timeval timeout, 
2988                       uint32_t destnode,
2989                       struct ctdb_control_killtcp *killtcp)
2990 {
2991         TDB_DATA data;
2992         int32_t res;
2993         int ret;
2994
2995         data.dsize = sizeof(struct ctdb_control_killtcp);
2996         data.dptr  = (unsigned char *)killtcp;
2997
2998         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2999                            NULL, &res, &timeout, NULL);
3000         if (ret != 0 || res != 0) {
3001                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3002                 return -1;
3003         }
3004
3005         return 0;
3006 }
3007
3008 /*
3009   send a gratious arp
3010  */
3011 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3012                       struct timeval timeout, 
3013                       uint32_t destnode,
3014                       ctdb_sock_addr *addr,
3015                       const char *ifname)
3016 {
3017         TDB_DATA data;
3018         int32_t res;
3019         int ret, len;
3020         struct ctdb_control_gratious_arp *gratious_arp;
3021         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3022
3023
3024         len = strlen(ifname)+1;
3025         gratious_arp = talloc_size(tmp_ctx, 
3026                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3027         CTDB_NO_MEMORY(ctdb, gratious_arp);
3028
3029         gratious_arp->addr = *addr;
3030         gratious_arp->len = len;
3031         memcpy(&gratious_arp->iface[0], ifname, len);
3032
3033
3034         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3035         data.dptr  = (unsigned char *)gratious_arp;
3036
3037         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3038                            NULL, &res, &timeout, NULL);
3039         if (ret != 0 || res != 0) {
3040                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3041                 talloc_free(tmp_ctx);
3042                 return -1;
3043         }
3044
3045         talloc_free(tmp_ctx);
3046         return 0;
3047 }
3048
3049 /*
3050   get a list of all tcp tickles that a node knows about for a particular vnn
3051  */
3052 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3053                               struct timeval timeout, uint32_t destnode, 
3054                               TALLOC_CTX *mem_ctx, 
3055                               ctdb_sock_addr *addr,
3056                               struct ctdb_control_tcp_tickle_list **list)
3057 {
3058         int ret;
3059         TDB_DATA data, outdata;
3060         int32_t status;
3061
3062         data.dptr = (uint8_t*)addr;
3063         data.dsize = sizeof(ctdb_sock_addr);
3064
3065         ret = ctdb_control(ctdb, destnode, 0, 
3066                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3067                            mem_ctx, &outdata, &status, NULL, NULL);
3068         if (ret != 0 || status != 0) {
3069                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3070                 return -1;
3071         }
3072
3073         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3074
3075         return status;
3076 }
3077
3078 /*
3079   register a server id
3080  */
3081 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3082                       struct timeval timeout, 
3083                       struct ctdb_server_id *id)
3084 {
3085         TDB_DATA data;
3086         int32_t res;
3087         int ret;
3088
3089         data.dsize = sizeof(struct ctdb_server_id);
3090         data.dptr  = (unsigned char *)id;
3091
3092         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3093                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3094                         0, data, NULL,
3095                         NULL, &res, &timeout, NULL);
3096         if (ret != 0 || res != 0) {
3097                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3098                 return -1;
3099         }
3100
3101         return 0;
3102 }
3103
3104 /*
3105   unregister a server id
3106  */
3107 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3108                       struct timeval timeout, 
3109                       struct ctdb_server_id *id)
3110 {
3111         TDB_DATA data;
3112         int32_t res;
3113         int ret;
3114
3115         data.dsize = sizeof(struct ctdb_server_id);
3116         data.dptr  = (unsigned char *)id;
3117
3118         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3119                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3120                         0, data, NULL,
3121                         NULL, &res, &timeout, NULL);
3122         if (ret != 0 || res != 0) {
3123                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3124                 return -1;
3125         }
3126
3127         return 0;
3128 }
3129
3130
3131 /*
3132   check if a server id exists
3133
3134   if a server id does exist, return *status == 1, otherwise *status == 0
3135  */
3136 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3137                       struct timeval timeout, 
3138                       uint32_t destnode,
3139                       struct ctdb_server_id *id,
3140                       uint32_t *status)
3141 {
3142         TDB_DATA data;
3143         int32_t res;
3144         int ret;
3145
3146         data.dsize = sizeof(struct ctdb_server_id);
3147         data.dptr  = (unsigned char *)id;
3148
3149         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3150                         0, data, NULL,
3151                         NULL, &res, &timeout, NULL);
3152         if (ret != 0) {
3153                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3154                 return -1;
3155         }
3156
3157         if (res) {
3158                 *status = 1;
3159         } else {
3160                 *status = 0;
3161         }
3162
3163         return 0;
3164 }
3165
3166 /*
3167    get the list of server ids that are registered on a node
3168 */
3169 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3170                 TALLOC_CTX *mem_ctx,
3171                 struct timeval timeout, uint32_t destnode, 
3172                 struct ctdb_server_id_list **svid_list)
3173 {
3174         int ret;
3175         TDB_DATA outdata;
3176         int32_t res;
3177
3178         ret = ctdb_control(ctdb, destnode, 0, 
3179                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3180                            mem_ctx, &outdata, &res, &timeout, NULL);
3181         if (ret != 0 || res != 0) {
3182                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3183                 return -1;
3184         }
3185
3186         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3187                     
3188         return 0;
3189 }
3190
3191 /*
3192   initialise the ctdb daemon for client applications
3193
3194   NOTE: In current code the daemon does not fork. This is for testing purposes only
3195   and to simplify the code.
3196 */
3197 struct ctdb_context *ctdb_init(struct event_context *ev)
3198 {
3199         int ret;
3200         struct ctdb_context *ctdb;
3201
3202         ctdb = talloc_zero(ev, struct ctdb_context);
3203         if (ctdb == NULL) {
3204                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3205                 return NULL;
3206         }
3207         ctdb->ev  = ev;
3208         ctdb->idr = idr_init(ctdb);
3209         /* Wrap early to exercise code. */
3210         ctdb->lastid = INT_MAX-200;
3211         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3212
3213         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3214         if (ret != 0) {
3215                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3216                 talloc_free(ctdb);
3217                 return NULL;
3218         }
3219
3220         ctdb->statistics.statistics_start_time = timeval_current();
3221
3222         return ctdb;
3223 }
3224
3225
3226 /*
3227   set some ctdb flags
3228 */
3229 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3230 {
3231         ctdb->flags |= flags;
3232 }
3233
3234 /*
3235   setup the local socket name
3236 */
3237 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3238 {
3239         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3240         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3241
3242         return 0;
3243 }
3244
3245 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3246 {
3247         return ctdb->daemon.name;
3248 }
3249
3250 /*
3251   return the pnn of this node
3252 */
3253 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3254 {
3255         return ctdb->pnn;
3256 }
3257
3258
3259 /*
3260   get the uptime of a remote node
3261  */
3262 struct ctdb_client_control_state *
3263 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3264 {
3265         return ctdb_control_send(ctdb, destnode, 0, 
3266                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3267                            mem_ctx, &timeout, NULL);
3268 }
3269
3270 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3271 {
3272         int ret;
3273         int32_t res;
3274         TDB_DATA outdata;
3275
3276         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3277         if (ret != 0 || res != 0) {
3278                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3279                 return -1;
3280         }
3281
3282         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3283
3284         return 0;
3285 }
3286
3287 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3288 {
3289         struct ctdb_client_control_state *state;
3290
3291         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3292         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3293 }
3294
3295 /*
3296   send a control to execute the "recovered" event script on a node
3297  */
3298 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3299 {
3300         int ret;
3301         int32_t status;
3302
3303         ret = ctdb_control(ctdb, destnode, 0, 
3304                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3305                            NULL, NULL, &status, &timeout, NULL);
3306         if (ret != 0 || status != 0) {
3307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3308                 return -1;
3309         }
3310
3311         return 0;
3312 }
3313
3314 /* 
3315   callback for the async helpers used when sending the same control
3316   to multiple nodes in parallell.
3317 */
3318 static void async_callback(struct ctdb_client_control_state *state)
3319 {
3320         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3321         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3322         int ret;
3323         TDB_DATA outdata;
3324         int32_t res = -1;
3325         uint32_t destnode = state->c->hdr.destnode;
3326
3327         /* one more node has responded with recmode data */
3328         data->count--;
3329
3330         /* if we failed to push the db, then return an error and let
3331            the main loop try again.
3332         */
3333         if (state->state != CTDB_CONTROL_DONE) {
3334                 if ( !data->dont_log_errors) {
3335                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3336                 }
3337                 data->fail_count++;
3338                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3339                         res = -ETIME;
3340                 } else {
3341                         res = -1;
3342                 }
3343                 if (data->fail_callback) {
3344                         data->fail_callback(ctdb, destnode, res, outdata,
3345                                         data->callback_data);
3346                 }
3347                 return;
3348         }
3349         
3350         state->async.fn = NULL;
3351
3352         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3353         if ((ret != 0) || (res != 0)) {
3354                 if ( !data->dont_log_errors) {
3355                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3356                 }
3357                 data->fail_count++;
3358                 if (data->fail_callback) {
3359                         data->fail_callback(ctdb, destnode, res, outdata,
3360                                         data->callback_data);
3361                 }
3362         }
3363         if ((ret == 0) && (data->callback != NULL)) {
3364                 data->callback(ctdb, destnode, res, outdata,
3365                                         data->callback_data);
3366         }
3367 }
3368
3369
3370 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3371 {
3372         /* set up the callback functions */
3373         state->async.fn = async_callback;
3374         state->async.private_data = data;
3375         
3376         /* one more control to wait for to complete */
3377         data->count++;
3378 }
3379
3380
3381 /* wait for up to the maximum number of seconds allowed
3382    or until all nodes we expect a response from has replied
3383 */
3384 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3385 {
3386         while (data->count > 0) {
3387                 event_loop_once(ctdb->ev);
3388         }
3389         if (data->fail_count != 0) {
3390                 if (!data->dont_log_errors) {
3391                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3392                                  data->fail_count));
3393                 }
3394                 return -1;
3395         }
3396         return 0;
3397 }
3398
3399
3400 /* 
3401    perform a simple control on the listed nodes
3402    The control cannot return data
3403  */
3404 int ctdb_client_async_control(struct ctdb_context *ctdb,
3405                                 enum ctdb_controls opcode,
3406                                 uint32_t *nodes,
3407                                 uint64_t srvid,
3408                                 struct timeval timeout,
3409                                 bool dont_log_errors,
3410                                 TDB_DATA data,
3411                                 client_async_callback client_callback,
3412                                 client_async_callback fail_callback,
3413                                 void *callback_data)
3414 {
3415         struct client_async_data *async_data;
3416         struct ctdb_client_control_state *state;
3417         int j, num_nodes;
3418
3419         async_data = talloc_zero(ctdb, struct client_async_data);
3420         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3421         async_data->dont_log_errors = dont_log_errors;
3422         async_data->callback = client_callback;
3423         async_data->fail_callback = fail_callback;
3424         async_data->callback_data = callback_data;
3425         async_data->opcode        = opcode;
3426
3427         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3428
3429         /* loop over all nodes and send an async control to each of them */
3430         for (j=0; j<num_nodes; j++) {
3431                 uint32_t pnn = nodes[j];
3432
3433                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3434                                           0, data, async_data, &timeout, NULL);
3435                 if (state == NULL) {
3436                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3437                         talloc_free(async_data);
3438                         return -1;
3439                 }
3440                 
3441                 ctdb_client_async_add(async_data, state);
3442         }
3443
3444         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3445                 talloc_free(async_data);
3446                 return -1;
3447         }
3448
3449         talloc_free(async_data);
3450         return 0;
3451 }
3452
3453 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3454                                 struct ctdb_vnn_map *vnn_map,
3455                                 TALLOC_CTX *mem_ctx,
3456                                 bool include_self)
3457 {
3458         int i, j, num_nodes;
3459         uint32_t *nodes;
3460
3461         for (i=num_nodes=0;i<vnn_map->size;i++) {
3462                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3463                         continue;
3464                 }
3465                 num_nodes++;
3466         } 
3467
3468         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3469         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3470
3471         for (i=j=0;i<vnn_map->size;i++) {
3472                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3473                         continue;
3474                 }
3475                 nodes[j++] = vnn_map->map[i];
3476         } 
3477
3478         return nodes;
3479 }
3480
3481 /* Get list of nodes not including those with flags specified by mask.
3482  * If exclude_pnn is not -1 then exclude that pnn from the list.
3483  */
3484 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3485                         struct ctdb_node_map *node_map,
3486                         TALLOC_CTX *mem_ctx,
3487                         uint32_t mask,
3488                         int exclude_pnn)
3489 {
3490         int i, j, num_nodes;
3491         uint32_t *nodes;
3492
3493         for (i=num_nodes=0;i<node_map->num;i++) {
3494                 if (node_map->nodes[i].flags & mask) {
3495                         continue;
3496                 }
3497                 if (node_map->nodes[i].pnn == exclude_pnn) {
3498                         continue;
3499                 }
3500                 num_nodes++;
3501         } 
3502
3503         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3504         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3505
3506         for (i=j=0;i<node_map->num;i++) {
3507                 if (node_map->nodes[i].flags & mask) {
3508                         continue;
3509                 }
3510                 if (node_map->nodes[i].pnn == exclude_pnn) {
3511                         continue;
3512                 }
3513                 nodes[j++] = node_map->nodes[i].pnn;
3514         } 
3515
3516         return nodes;
3517 }
3518
3519 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3520                                 struct ctdb_node_map *node_map,
3521                                 TALLOC_CTX *mem_ctx,
3522                                 bool include_self)
3523 {
3524         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3525                              include_self ? -1 : ctdb->pnn);
3526 }
3527
3528 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3529                                 struct ctdb_node_map *node_map,
3530                                 TALLOC_CTX *mem_ctx,
3531                                 uint32_t pnn)
3532 {
3533         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE, pnn);
3534 }
3535
3536 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3537                                 struct ctdb_node_map *node_map,
3538                                 TALLOC_CTX *mem_ctx,
3539                                 bool include_self)
3540 {
3541         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3542                              include_self ? -1 : ctdb->pnn);
3543 }
3544
3545 /* 
3546   this is used to test if a pnn lock exists and if it exists will return
3547   the number of connections that pnn has reported or -1 if that recovery
3548   daemon is not running.
3549 */
3550 int
3551 ctdb_read_pnn_lock(int fd, int32_t pnn)
3552 {
3553         struct flock lock;
3554         char c;
3555
3556         lock.l_type = F_WRLCK;
3557         lock.l_whence = SEEK_SET;
3558         lock.l_start = pnn;
3559         lock.l_len = 1;
3560         lock.l_pid = 0;
3561
3562         if (fcntl(fd, F_GETLK, &lock) != 0) {
3563                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3564                 return -1;
3565         }
3566
3567         if (lock.l_type == F_UNLCK) {
3568                 return -1;
3569         }
3570
3571         if (pread(fd, &c, 1, pnn) == -1) {
3572                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3573                 return -1;
3574         }
3575
3576         return c;
3577 }
3578
3579 /*
3580   get capabilities of a remote node
3581  */
3582 struct ctdb_client_control_state *
3583 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3584 {
3585         return ctdb_control_send(ctdb, destnode, 0, 
3586                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3587                            mem_ctx, &timeout, NULL);
3588 }
3589
3590 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3591 {
3592         int ret;
3593         int32_t res;
3594         TDB_DATA outdata;
3595
3596         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3597         if ( (ret != 0) || (res != 0) ) {
3598                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3599                 return -1;
3600         }
3601
3602         if (capabilities) {
3603                 *capabilities = *((uint32_t *)outdata.dptr);
3604         }
3605
3606         return 0;
3607 }
3608
3609 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3610 {
3611         struct ctdb_client_control_state *state;
3612         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3613         int ret;
3614
3615         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3616         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3617         talloc_free(tmp_ctx);
3618         return ret;
3619 }
3620
3621 /**
3622  * check whether a transaction is active on a given db on a given node
3623  */
3624 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3625                                      uint32_t destnode,
3626                                      uint32_t db_id)
3627 {
3628         int32_t status;
3629         int ret;
3630         TDB_DATA indata;
3631
3632         indata.dptr = (uint8_t *)&db_id;
3633         indata.dsize = sizeof(db_id);
3634
3635         ret = ctdb_control(ctdb, destnode, 0,
3636                            CTDB_CONTROL_TRANS2_ACTIVE,
3637                            0, indata, NULL, NULL, &status,
3638                            NULL, NULL);
3639
3640         if (ret != 0) {
3641                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3642                 return -1;
3643         }
3644
3645         return status;
3646 }
3647
3648
3649 struct ctdb_transaction_handle {
3650         struct ctdb_db_context *ctdb_db;
3651         bool in_replay;
3652         /*
3653          * we store the reads and writes done under a transaction:
3654          * - one list stores both reads and writes (m_all),
3655          * - the other just writes (m_write)
3656          */
3657         struct ctdb_marshall_buffer *m_all;
3658         struct ctdb_marshall_buffer *m_write;
3659 };
3660
3661 /* start a transaction on a database */
3662 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3663 {
3664         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3665         return 0;
3666 }
3667
3668 /* start a transaction on a database */
3669 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3670 {
3671         struct ctdb_record_handle *rh;
3672         TDB_DATA key;
3673         TDB_DATA data;
3674         struct ctdb_ltdb_header header;
3675         TALLOC_CTX *tmp_ctx;
3676         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3677         int ret;
3678         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3679         pid_t pid;
3680         int32_t status;
3681
3682         key.dptr = discard_const(keyname);
3683         key.dsize = strlen(keyname);
3684
3685         if (!ctdb_db->persistent) {
3686                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3687                 return -1;
3688         }
3689
3690 again:
3691         tmp_ctx = talloc_new(h);
3692
3693         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3694         if (rh == NULL) {
3695                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3696                 talloc_free(tmp_ctx);
3697                 return -1;
3698         }
3699
3700         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3701                                               CTDB_CURRENT_NODE,
3702                                               ctdb_db->db_id);
3703         if (status == 1) {
3704                 unsigned long int usec = (1000 + random()) % 100000;
3705                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3706                                     "on db_id[0x%08x]. waiting for %lu "
3707                                     "microseconds\n",
3708                                     ctdb_db->db_id, usec));
3709                 talloc_free(tmp_ctx);
3710                 usleep(usec);
3711                 goto again;
3712         }
3713
3714         /*
3715          * store the pid in the database:
3716          * it is not enough that the node is dmaster...
3717          */
3718         pid = getpid();
3719         data.dptr = (unsigned char *)&pid;
3720         data.dsize = sizeof(pid_t);
3721         rh->header.rsn++;
3722         rh->header.dmaster = ctdb_db->ctdb->pnn;
3723         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3724         if (ret != 0) {
3725                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3726                                   "transaction record\n"));
3727                 talloc_free(tmp_ctx);
3728                 return -1;
3729         }
3730
3731         talloc_free(rh);
3732
3733         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3734         if (ret != 0) {
3735                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3736                 talloc_free(tmp_ctx);
3737                 return -1;
3738         }
3739
3740         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3741         if (ret != 0) {
3742                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3743                                  "lock record inside transaction\n"));
3744                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3745                 talloc_free(tmp_ctx);
3746                 goto again;
3747         }
3748
3749         if (header.dmaster != ctdb_db->ctdb->pnn) {
3750                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3751                                    "transaction lock record\n"));
3752                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3753                 talloc_free(tmp_ctx);
3754                 goto again;
3755         }
3756
3757         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3758                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3759                                     "the transaction lock record\n"));
3760                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3761                 talloc_free(tmp_ctx);
3762                 goto again;
3763         }
3764
3765         talloc_free(tmp_ctx);
3766
3767         return 0;
3768 }
3769
3770
3771 /* start a transaction on a database */
3772 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3773                                                        TALLOC_CTX *mem_ctx)
3774 {
3775         struct ctdb_transaction_handle *h;
3776         int ret;
3777
3778         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3779         if (h == NULL) {
3780                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3781                 return NULL;
3782         }
3783
3784         h->ctdb_db = ctdb_db;
3785
3786         ret = ctdb_transaction_fetch_start(h);
3787         if (ret != 0) {
3788                 talloc_free(h);
3789                 return NULL;
3790         }
3791
3792         talloc_set_destructor(h, ctdb_transaction_destructor);
3793
3794         return h;
3795 }
3796
3797
3798
3799 /*
3800   fetch a record inside a transaction
3801  */
3802 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3803                            TALLOC_CTX *mem_ctx, 
3804                            TDB_DATA key, TDB_DATA *data)
3805 {
3806         struct ctdb_ltdb_header header;
3807         int ret;
3808
3809         ZERO_STRUCT(header);
3810
3811         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3812         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3813                 /* record doesn't exist yet */
3814                 *data = tdb_null;
3815                 ret = 0;
3816         }
3817         
3818         if (ret != 0) {
3819                 return ret;
3820         }
3821
3822         if (!h->in_replay) {
3823                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3824                 if (h->m_all == NULL) {
3825                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3826                         return -1;
3827                 }
3828         }
3829
3830         return 0;
3831 }
3832
3833 /*
3834   stores a record inside a transaction
3835  */
3836 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3837                            TDB_DATA key, TDB_DATA data)
3838 {
3839         TALLOC_CTX *tmp_ctx = talloc_new(h);
3840         struct ctdb_ltdb_header header;
3841         TDB_DATA olddata;
3842         int ret;
3843
3844         ZERO_STRUCT(header);
3845
3846         /* we need the header so we can update the RSN */
3847         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3848         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3849                 /* the record doesn't exist - create one with us as dmaster.
3850                    This is only safe because we are in a transaction and this
3851                    is a persistent database */
3852                 ZERO_STRUCT(header);
3853         } else if (ret != 0) {
3854                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3855                 talloc_free(tmp_ctx);
3856                 return ret;
3857         }
3858
3859         if (data.dsize == olddata.dsize &&
3860             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3861                 /* save writing the same data */
3862                 talloc_free(tmp_ctx);
3863                 return 0;
3864         }
3865
3866         header.dmaster = h->ctdb_db->ctdb->pnn;
3867         header.rsn++;
3868
3869         if (!h->in_replay) {
3870                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3871                 if (h->m_all == NULL) {
3872                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3873                         talloc_free(tmp_ctx);
3874                         return -1;
3875                 }
3876         }               
3877
3878         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3879         if (h->m_write == NULL) {
3880                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3881                 talloc_free(tmp_ctx);
3882                 return -1;
3883         }
3884         
3885         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3886
3887         talloc_free(tmp_ctx);
3888         
3889         return ret;
3890 }
3891
3892 /*
3893   replay a transaction
3894  */
3895 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3896 {
3897         int ret, i;
3898         struct ctdb_rec_data *rec = NULL;
3899
3900         h->in_replay = true;
3901         talloc_free(h->m_write);
3902         h->m_write = NULL;
3903
3904         ret = ctdb_transaction_fetch_start(h);
3905         if (ret != 0) {
3906                 return ret;
3907         }
3908
3909         for (i=0;i<h->m_all->count;i++) {
3910                 TDB_DATA key, data;
3911
3912                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3913                 if (rec == NULL) {
3914                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3915                         goto failed;
3916                 }
3917
3918                 if (rec->reqid == 0) {
3919                         /* its a store */
3920                         if (ctdb_transaction_store(h, key, data) != 0) {
3921                                 goto failed;
3922                         }
3923                 } else {
3924                         TDB_DATA data2;
3925                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3926
3927                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3928                                 talloc_free(tmp_ctx);
3929                                 goto failed;
3930                         }
3931                         if (data2.dsize != data.dsize ||
3932                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3933                                 /* the record has changed on us - we have to give up */
3934                                 talloc_free(tmp_ctx);
3935                                 goto failed;
3936                         }
3937                         talloc_free(tmp_ctx);
3938                 }
3939         }
3940         
3941         return 0;
3942
3943 failed:
3944         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3945         return -1;
3946 }
3947
3948
3949 /*
3950   commit a transaction
3951  */
3952 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3953 {
3954         int ret, retries=0;
3955         int32_t status;
3956         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3957         struct timeval timeout;
3958         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3959
3960         talloc_set_destructor(h, NULL);
3961
3962         /* our commit strategy is quite complex.
3963
3964            - we first try to commit the changes to all other nodes
3965
3966            - if that works, then we commit locally and we are done
3967
3968            - if a commit on another node fails, then we need to cancel
3969              the transaction, then restart the transaction (thus
3970              opening a window of time for a pending recovery to
3971              complete), then replay the transaction, checking all the
3972              reads and writes (checking that reads give the same data,
3973              and writes succeed). Then we retry the transaction to the
3974              other nodes
3975         */
3976
3977 again:
3978         if (h->m_write == NULL) {
3979                 /* no changes were made */
3980                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3981                 talloc_free(h);
3982                 return 0;
3983         }
3984
3985         /* tell ctdbd to commit to the other nodes */
3986         timeout = timeval_current_ofs(1, 0);
3987         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3988                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3989                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3990                            &timeout, NULL);
3991         if (ret != 0 || status != 0) {
3992                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3993                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3994                                      ", retrying after 1 second...\n",
3995                                      (retries==0)?"":"retry "));
3996                 sleep(1);
3997
3998                 if (ret != 0) {
3999                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4000                 } else {
4001                         /* work out what error code we will give if we 
4002                            have to fail the operation */
4003                         switch ((enum ctdb_trans2_commit_error)status) {
4004                         case CTDB_TRANS2_COMMIT_SUCCESS:
4005                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4006                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4007                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4008                                 break;
4009                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4010                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4011                                 break;
4012                         }
4013                 }
4014
4015                 if (++retries == 100) {
4016                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4017                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4018                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4019                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4020                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4021                         talloc_free(h);
4022                         return -1;
4023                 }               
4024
4025                 if (ctdb_replay_transaction(h) != 0) {
4026                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4027                                           "transaction on db 0x%08x, "
4028                                           "failure control =%u\n",
4029                                           h->ctdb_db->db_id,
4030                                           (unsigned)failure_control));
4031                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4032                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4033                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4034                         talloc_free(h);
4035                         return -1;
4036                 }
4037                 goto again;
4038         } else {
4039                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4040         }
4041
4042         /* do the real commit locally */
4043         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4044         if (ret != 0) {
4045                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4046                                   "on db id 0x%08x locally, "
4047                                   "failure_control=%u\n",
4048                                   h->ctdb_db->db_id,
4049                                   (unsigned)failure_control));
4050                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4051                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4052                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4053                 talloc_free(h);
4054                 return ret;
4055         }
4056
4057         /* tell ctdbd that we are finished with our local commit */
4058         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4059                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4060                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4061         talloc_free(h);
4062         return 0;
4063 }
4064
4065 /*
4066   recovery daemon ping to main daemon
4067  */
4068 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4069 {
4070         int ret;
4071         int32_t res;
4072
4073         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4074                            ctdb, NULL, &res, NULL, NULL);
4075         if (ret != 0 || res != 0) {
4076                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4077                 return -1;
4078         }
4079
4080         return 0;
4081 }
4082
4083 /* When forking the main daemon and the child process needs to connect
4084  * back to the daemon as a client process, this function can be used
4085  * to change the ctdb context from daemon into client mode.  The child
4086  * process must be created using ctdb_fork() and not fork() -
4087  * ctdb_fork() does some necessary housekeeping.
4088  */
4089 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4090 {
4091         int ret;
4092         va_list ap;
4093
4094         /* Add extra information so we can identify this in the logs */
4095         va_start(ap, fmt);
4096         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4097         va_end(ap);
4098
4099         /* get a new event context */
4100         ctdb->ev = event_context_init(ctdb);
4101         tevent_loop_allow_nesting(ctdb->ev);
4102
4103         /* Connect to main CTDB daemon */
4104         ret = ctdb_socket_connect(ctdb);
4105         if (ret != 0) {
4106                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4107                 return -1;
4108         }
4109
4110         ctdb->can_send_controls = true;
4111
4112         return 0;
4113 }
4114
4115 /*
4116   get the status of running the monitor eventscripts: NULL means never run.
4117  */
4118 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4119                 struct timeval timeout, uint32_t destnode, 
4120                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4121                 struct ctdb_scripts_wire **scripts)
4122 {
4123         int ret;
4124         TDB_DATA outdata, indata;
4125         int32_t res;
4126         uint32_t uinttype = type;
4127
4128         indata.dptr = (uint8_t *)&uinttype;
4129         indata.dsize = sizeof(uinttype);
4130
4131         ret = ctdb_control(ctdb, destnode, 0, 
4132                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4133                            mem_ctx, &outdata, &res, &timeout, NULL);
4134         if (ret != 0 || res != 0) {
4135                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4136                 return -1;
4137         }
4138
4139         if (outdata.dsize == 0) {
4140                 *scripts = NULL;
4141         } else {
4142                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4143                 talloc_free(outdata.dptr);
4144         }
4145                     
4146         return 0;
4147 }
4148
4149 /*
4150   tell the main daemon how long it took to lock the reclock file
4151  */
4152 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4153 {
4154         int ret;
4155         int32_t res;
4156         TDB_DATA data;
4157
4158         data.dptr = (uint8_t *)&latency;
4159         data.dsize = sizeof(latency);
4160
4161         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4162                            ctdb, NULL, &res, NULL, NULL);
4163         if (ret != 0 || res != 0) {
4164                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4165                 return -1;
4166         }
4167
4168         return 0;
4169 }
4170
4171 /*
4172   get the name of the reclock file
4173  */
4174 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4175                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4176                          const char **name)
4177 {
4178         int ret;
4179         int32_t res;
4180         TDB_DATA data;
4181
4182         ret = ctdb_control(ctdb, destnode, 0, 
4183                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4184                            mem_ctx, &data, &res, &timeout, NULL);
4185         if (ret != 0 || res != 0) {
4186                 return -1;
4187         }
4188
4189         if (data.dsize == 0) {
4190                 *name = NULL;
4191         } else {
4192                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4193         }
4194         talloc_free(data.dptr);
4195
4196         return 0;
4197 }
4198
4199 /*
4200   set the reclock filename for a node
4201  */
4202 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4203 {
4204         int ret;
4205         TDB_DATA data;
4206         int32_t res;
4207
4208         if (reclock == NULL) {
4209                 data.dsize = 0;
4210                 data.dptr  = NULL;
4211         } else {
4212                 data.dsize = strlen(reclock) + 1;
4213                 data.dptr  = discard_const(reclock);
4214         }
4215
4216         ret = ctdb_control(ctdb, destnode, 0, 
4217                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4218                            NULL, NULL, &res, &timeout, NULL);
4219         if (ret != 0 || res != 0) {
4220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4221                 return -1;
4222         }
4223
4224         return 0;
4225 }
4226
4227 /*
4228   stop a node
4229  */
4230 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4231 {
4232         int ret;
4233         int32_t res;
4234
4235         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4236                            ctdb, NULL, &res, &timeout, NULL);
4237         if (ret != 0 || res != 0) {
4238                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4239                 return -1;
4240         }
4241
4242         return 0;
4243 }
4244
4245 /*
4246   continue a node
4247  */
4248 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4249 {
4250         int ret;
4251
4252         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4253                            ctdb, NULL, NULL, &timeout, NULL);
4254         if (ret != 0) {
4255                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4256                 return -1;
4257         }
4258
4259         return 0;
4260 }
4261
4262 /*
4263   set the natgw state for a node
4264  */
4265 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4266 {
4267         int ret;
4268         TDB_DATA data;
4269         int32_t res;
4270
4271         data.dsize = sizeof(natgwstate);
4272         data.dptr  = (uint8_t *)&natgwstate;
4273
4274         ret = ctdb_control(ctdb, destnode, 0, 
4275                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4276                            NULL, NULL, &res, &timeout, NULL);
4277         if (ret != 0 || res != 0) {
4278                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4279                 return -1;
4280         }
4281
4282         return 0;
4283 }
4284
4285 /*
4286   set the lmaster role for a node
4287  */
4288 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4289 {
4290         int ret;
4291         TDB_DATA data;
4292         int32_t res;
4293
4294         data.dsize = sizeof(lmasterrole);
4295         data.dptr  = (uint8_t *)&lmasterrole;
4296
4297         ret = ctdb_control(ctdb, destnode, 0, 
4298                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4299                            NULL, NULL, &res, &timeout, NULL);
4300         if (ret != 0 || res != 0) {
4301                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4302                 return -1;
4303         }
4304
4305         return 0;
4306 }
4307
4308 /*
4309   set the recmaster role for a node
4310  */
4311 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4312 {
4313         int ret;
4314         TDB_DATA data;
4315         int32_t res;
4316
4317         data.dsize = sizeof(recmasterrole);
4318         data.dptr  = (uint8_t *)&recmasterrole;
4319
4320         ret = ctdb_control(ctdb, destnode, 0, 
4321                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4322                            NULL, NULL, &res, &timeout, NULL);
4323         if (ret != 0 || res != 0) {
4324                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4325                 return -1;
4326         }
4327
4328         return 0;
4329 }
4330
4331 /* enable an eventscript
4332  */
4333 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4334 {
4335         int ret;
4336         TDB_DATA data;
4337         int32_t res;
4338
4339         data.dsize = strlen(script) + 1;
4340         data.dptr  = discard_const(script);
4341
4342         ret = ctdb_control(ctdb, destnode, 0, 
4343                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4344                            NULL, NULL, &res, &timeout, NULL);
4345         if (ret != 0 || res != 0) {
4346                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4347                 return -1;
4348         }
4349
4350         return 0;
4351 }
4352
4353 /* disable an eventscript
4354  */
4355 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4356 {
4357         int ret;
4358         TDB_DATA data;
4359         int32_t res;
4360
4361         data.dsize = strlen(script) + 1;
4362         data.dptr  = discard_const(script);
4363
4364         ret = ctdb_control(ctdb, destnode, 0, 
4365                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4366                            NULL, NULL, &res, &timeout, NULL);
4367         if (ret != 0 || res != 0) {
4368                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4369                 return -1;
4370         }
4371
4372         return 0;
4373 }
4374
4375
4376 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4377 {
4378         int ret;
4379         TDB_DATA data;
4380         int32_t res;
4381
4382         data.dsize = sizeof(*bantime);
4383         data.dptr  = (uint8_t *)bantime;
4384
4385         ret = ctdb_control(ctdb, destnode, 0, 
4386                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4387                            NULL, NULL, &res, &timeout, NULL);
4388         if (ret != 0 || res != 0) {
4389                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4390                 return -1;
4391         }
4392
4393         return 0;
4394 }
4395
4396
4397 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4398 {
4399         int ret;
4400         TDB_DATA outdata;
4401         int32_t res;
4402         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4403
4404         ret = ctdb_control(ctdb, destnode, 0, 
4405                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4406                            tmp_ctx, &outdata, &res, &timeout, NULL);
4407         if (ret != 0 || res != 0) {
4408                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4409                 talloc_free(tmp_ctx);
4410                 return -1;
4411         }
4412
4413         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4414         talloc_free(tmp_ctx);
4415
4416         return 0;
4417 }
4418
4419
4420 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4421 {
4422         int ret;
4423         int32_t res;
4424         TDB_DATA data;
4425         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4426
4427         data.dptr = (uint8_t*)db_prio;
4428         data.dsize = sizeof(*db_prio);
4429
4430         ret = ctdb_control(ctdb, destnode, 0, 
4431                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4432                            tmp_ctx, NULL, &res, &timeout, NULL);
4433         if (ret != 0 || res != 0) {
4434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4435                 talloc_free(tmp_ctx);
4436                 return -1;
4437         }
4438
4439         talloc_free(tmp_ctx);
4440
4441         return 0;
4442 }
4443
4444 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4445 {
4446         int ret;
4447         int32_t res;
4448         TDB_DATA data;
4449         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4450
4451         data.dptr = (uint8_t*)&db_id;
4452         data.dsize = sizeof(db_id);
4453
4454         ret = ctdb_control(ctdb, destnode, 0, 
4455                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4456                            tmp_ctx, NULL, &res, &timeout, NULL);
4457         if (ret != 0 || res < 0) {
4458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4459                 talloc_free(tmp_ctx);
4460                 return -1;
4461         }
4462
4463         if (priority) {
4464                 *priority = res;
4465         }
4466
4467         talloc_free(tmp_ctx);
4468
4469         return 0;
4470 }
4471
4472 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4473 {
4474         int ret;
4475         TDB_DATA outdata;
4476         int32_t res;
4477
4478         ret = ctdb_control(ctdb, destnode, 0, 
4479                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4480                            mem_ctx, &outdata, &res, &timeout, NULL);
4481         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4482                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4483                 return -1;
4484         }
4485
4486         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4487         talloc_free(outdata.dptr);
4488                     
4489         return 0;
4490 }
4491
4492 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4493 {
4494         if (h == NULL) {
4495                 return NULL;
4496         }
4497
4498         return &h->header;
4499 }
4500
4501
4502 struct ctdb_client_control_state *
4503 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4504 {
4505         struct ctdb_client_control_state *handle;
4506         struct ctdb_marshall_buffer *m;
4507         struct ctdb_rec_data *rec;
4508         TDB_DATA outdata;
4509
4510         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4511         if (m == NULL) {
4512                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4513                 return NULL;
4514         }
4515
4516         m->db_id = ctdb_db->db_id;
4517
4518         rec = ctdb_marshall_record(m, 0, key, header, data);
4519         if (rec == NULL) {
4520                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4521                 talloc_free(m);
4522                 return NULL;
4523         }
4524         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4525         if (m == NULL) {
4526                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4527                 talloc_free(m);
4528                 return NULL;
4529         }
4530         m->count++;
4531         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4532
4533
4534         outdata.dptr = (uint8_t *)m;
4535         outdata.dsize = talloc_get_size(m);
4536
4537         handle = ctdb_control_send(ctdb, destnode, 0, 
4538                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4539                            mem_ctx, &timeout, NULL);
4540         talloc_free(m);
4541         return handle;
4542 }
4543
4544 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4545 {
4546         int ret;
4547         int32_t res;
4548
4549         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4550         if ( (ret != 0) || (res != 0) ){
4551                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4552                 return -1;
4553         }
4554
4555         return 0;
4556 }
4557
4558 int
4559 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4560 {
4561         struct ctdb_client_control_state *state;
4562
4563         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4564         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4565 }
4566
4567
4568
4569
4570
4571
4572 /*
4573   set a database to be readonly
4574  */
4575 struct ctdb_client_control_state *
4576 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4577 {
4578         TDB_DATA data;
4579
4580         data.dptr = (uint8_t *)&dbid;
4581         data.dsize = sizeof(dbid);
4582
4583         return ctdb_control_send(ctdb, destnode, 0, 
4584                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4585                            ctdb, NULL, NULL);
4586 }
4587
4588 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4589 {
4590         int ret;
4591         int32_t res;
4592
4593         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4594         if (ret != 0 || res != 0) {
4595           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4596                 return -1;
4597         }
4598
4599         return 0;
4600 }
4601
4602 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4603 {
4604         struct ctdb_client_control_state *state;
4605
4606         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4607         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4608 }
4609
4610 /*
4611   set a database to be sticky
4612  */
4613 struct ctdb_client_control_state *
4614 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4615 {
4616         TDB_DATA data;
4617
4618         data.dptr = (uint8_t *)&dbid;
4619         data.dsize = sizeof(dbid);
4620
4621         return ctdb_control_send(ctdb, destnode, 0, 
4622                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4623                            ctdb, NULL, NULL);
4624 }
4625
4626 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4627 {
4628         int ret;
4629         int32_t res;
4630
4631         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4632         if (ret != 0 || res != 0) {
4633                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4634                 return -1;
4635         }
4636
4637         return 0;
4638 }
4639
4640 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4641 {
4642         struct ctdb_client_control_state *state;
4643
4644         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4645         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4646 }