ReadOnly: When the client wants a readwrite lock but the local node is the dmaster...
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, bool updatetdb)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data && updatetdb) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return call->status;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
391
392         return state;
393 }
394
395 /*
396   make a ctdb call to the local daemon - async send. Called from client context.
397
398   This constructs a ctdb_call request and queues it for processing. 
399   This call never blocks.
400 */
401 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
402                                               struct ctdb_call *call)
403 {
404         struct ctdb_client_call_state *state;
405         struct ctdb_context *ctdb = ctdb_db->ctdb;
406         struct ctdb_ltdb_header header;
407         TDB_DATA data;
408         int ret;
409         size_t len;
410         struct ctdb_req_call *c;
411
412         /* if the domain socket is not yet open, open it */
413         if (ctdb->daemon.sd==-1) {
414                 ctdb_socket_connect(ctdb);
415         }
416
417         ret = ctdb_ltdb_lock(ctdb_db, call->key);
418         if (ret != 0) {
419                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
420                 return NULL;
421         }
422
423         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
424
425         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
426                 ret = -1;
427         }
428
429         if (ret == 0 && header.dmaster == ctdb->pnn) {
430                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
431                 talloc_free(data.dptr);
432                 ctdb_ltdb_unlock(ctdb_db, call->key);
433                 return state;
434         }
435
436         ctdb_ltdb_unlock(ctdb_db, call->key);
437         talloc_free(data.dptr);
438
439         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
440         if (state == NULL) {
441                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
442                 return NULL;
443         }
444         state->call = talloc_zero(state, struct ctdb_call);
445         if (state->call == NULL) {
446                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
447                 return NULL;
448         }
449
450         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
451         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
452         if (c == NULL) {
453                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
454                 return NULL;
455         }
456
457         state->reqid     = ctdb_reqid_new(ctdb, state);
458         state->ctdb_db = ctdb_db;
459         talloc_set_destructor(state, ctdb_client_call_destructor);
460
461         c->hdr.reqid     = state->reqid;
462         c->flags         = call->flags;
463         c->db_id         = ctdb_db->db_id;
464         c->callid        = call->call_id;
465         c->hopcount      = 0;
466         c->keylen        = call->key.dsize;
467         c->calldatalen   = call->call_data.dsize;
468         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
469         memcpy(&c->data[call->key.dsize], 
470                call->call_data.dptr, call->call_data.dsize);
471         *(state->call)              = *call;
472         state->call->call_data.dptr = &c->data[call->key.dsize];
473         state->call->key.dptr       = &c->data[0];
474
475         state->state  = CTDB_CALL_WAIT;
476
477
478         ctdb_client_queue_pkt(ctdb, &c->hdr);
479
480         return state;
481 }
482
483
484 /*
485   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
486 */
487 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
488 {
489         struct ctdb_client_call_state *state;
490
491         state = ctdb_call_send(ctdb_db, call);
492         return ctdb_call_recv(state, call);
493 }
494
495
496 /*
497   tell the daemon what messaging srvid we will use, and register the message
498   handler function in the client
499 */
500 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
501                              ctdb_msg_fn_t handler,
502                              void *private_data)
503                                     
504 {
505         int res;
506         int32_t status;
507         
508         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
509                            tdb_null, NULL, NULL, &status, NULL, NULL);
510         if (res != 0 || status != 0) {
511                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
512                 return -1;
513         }
514
515         /* also need to register the handler with our own ctdb structure */
516         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
517 }
518
519 /*
520   tell the daemon we no longer want a srvid
521 */
522 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
523 {
524         int res;
525         int32_t status;
526         
527         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
528                            tdb_null, NULL, NULL, &status, NULL, NULL);
529         if (res != 0 || status != 0) {
530                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
531                 return -1;
532         }
533
534         /* also need to register the handler with our own ctdb structure */
535         ctdb_deregister_message_handler(ctdb, srvid, private_data);
536         return 0;
537 }
538
539
540 /*
541   send a message - from client context
542  */
543 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
544                       uint64_t srvid, TDB_DATA data)
545 {
546         struct ctdb_req_message *r;
547         int len, res;
548
549         len = offsetof(struct ctdb_req_message, data) + data.dsize;
550         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
551                                len, struct ctdb_req_message);
552         CTDB_NO_MEMORY(ctdb, r);
553
554         r->hdr.destnode  = pnn;
555         r->srvid         = srvid;
556         r->datalen       = data.dsize;
557         memcpy(&r->data[0], data.dptr, data.dsize);
558         
559         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
560         if (res != 0) {
561                 return res;
562         }
563
564         talloc_free(r);
565         return 0;
566 }
567
568
569 /*
570   cancel a ctdb_fetch_lock operation, releasing the lock
571  */
572 static int fetch_lock_destructor(struct ctdb_record_handle *h)
573 {
574         ctdb_ltdb_unlock(h->ctdb_db, h->key);
575         return 0;
576 }
577
578 /*
579   force the migration of a record to this node
580  */
581 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
582 {
583         struct ctdb_call call;
584         ZERO_STRUCT(call);
585         call.call_id = CTDB_NULL_FUNC;
586         call.key = key;
587         call.flags = CTDB_IMMEDIATE_MIGRATION;
588         return ctdb_call(ctdb_db, &call);
589 }
590
591 /*
592   try to fetch a readonly copy of a record
593  */
594 static int
595 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
596 {
597         int ret;
598
599         struct ctdb_call call;
600         ZERO_STRUCT(call);
601
602         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
603         call.call_data.dptr = NULL;
604         call.call_data.dsize = 0;
605         call.key = key;
606         call.flags = CTDB_WANT_READONLY;
607         ret = ctdb_call(ctdb_db, &call);
608
609         if (ret != 0) {
610                 return -1;
611         }
612         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
613                 return -1;
614         }
615
616         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
617         if (*hdr == NULL) {
618                 talloc_free(call.reply_data.dptr);
619                 return -1;
620         }
621
622         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
623         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
624         if (data->dptr == NULL) {
625                 talloc_free(call.reply_data.dptr);
626                 talloc_free(hdr);
627                 return -1;
628         }
629
630         return 0;
631 }
632
633 /*
634   get a lock on a record, and return the records data. Blocks until it gets the lock
635  */
636 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
637                                            TDB_DATA key, TDB_DATA *data)
638 {
639         int ret;
640         struct ctdb_record_handle *h;
641
642         /*
643           procedure is as follows:
644
645           1) get the chain lock. 
646           2) check if we are dmaster
647           3) if we are the dmaster then return handle 
648           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
649              reply from ctdbd
650           5) when we get the reply, goto (1)
651          */
652
653         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
654         if (h == NULL) {
655                 return NULL;
656         }
657
658         h->ctdb_db = ctdb_db;
659         h->key     = key;
660         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
661         if (h->key.dptr == NULL) {
662                 talloc_free(h);
663                 return NULL;
664         }
665         h->data    = data;
666
667         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
668                  (const char *)key.dptr));
669
670 again:
671         /* step 1 - get the chain lock */
672         ret = ctdb_ltdb_lock(ctdb_db, key);
673         if (ret != 0) {
674                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
675                 talloc_free(h);
676                 return NULL;
677         }
678
679         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
680
681         talloc_set_destructor(h, fetch_lock_destructor);
682
683         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
684
685         /* when torturing, ensure we test the remote path */
686         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
687             random() % 5 == 0) {
688                 h->header.dmaster = (uint32_t)-1;
689         }
690
691
692         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
693
694         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
695                 ctdb_ltdb_unlock(ctdb_db, key);
696                 ret = ctdb_client_force_migration(ctdb_db, key);
697                 if (ret != 0) {
698                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
699                         talloc_free(h);
700                         return NULL;
701                 }
702                 goto again;
703         }
704
705         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
706         return h;
707 }
708
709 /*
710   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
711  */
712 struct ctdb_record_handle *
713 ctdb_fetch_readonly_lock(
714         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
715         TDB_DATA key, TDB_DATA *data,
716         int read_only)
717 {
718         int ret;
719         struct ctdb_record_handle *h;
720         struct ctdb_ltdb_header *roheader = NULL;
721
722         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
723         if (h == NULL) {
724                 return NULL;
725         }
726
727         h->ctdb_db = ctdb_db;
728         h->key     = key;
729         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
730         if (h->key.dptr == NULL) {
731                 talloc_free(h);
732                 return NULL;
733         }
734         h->data    = data;
735
736         data->dptr = NULL;
737         data->dsize = 0;
738
739
740 again:
741         talloc_free(roheader);
742         roheader = NULL;
743
744         talloc_free(data->dptr);
745         data->dptr = NULL;
746         data->dsize = 0;
747
748         /* Lock the record/chain */
749         ret = ctdb_ltdb_lock(ctdb_db, key);
750         if (ret != 0) {
751                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
752                 talloc_free(h);
753                 return NULL;
754         }
755
756         talloc_set_destructor(h, fetch_lock_destructor);
757
758         /* Check if record exists yet in the TDB */
759         ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
760         if (ret != 0) {
761                 ctdb_ltdb_unlock(ctdb_db, key);
762                 ret = ctdb_client_force_migration(ctdb_db, key);
763                 if (ret != 0) {
764                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
765                         talloc_free(h);
766                         return NULL;
767                 }
768                 goto again;
769         }
770
771         /* if this is a request for read/write and we have delegations
772            we have to revoke all delegations first
773         */
774         if ((read_only == 0) 
775         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
776         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
777                 ctdb_ltdb_unlock(ctdb_db, key);
778                 ret = ctdb_client_force_migration(ctdb_db, key);
779                 if (ret != 0) {
780                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
781                         talloc_free(h);
782                         return NULL;
783                 }
784                 goto again;
785         }
786
787         /* if we are dmaster, just return the handle */
788         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
789                 return h;
790         }
791
792         if (read_only != 0) {
793                 TDB_DATA rodata = {NULL, 0};
794
795                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
796                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
797                         return h;
798                 }
799
800                 ctdb_ltdb_unlock(ctdb_db, key);
801                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
802                 if (ret != 0) {
803                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
804                         ret = ctdb_client_force_migration(ctdb_db, key);
805                         if (ret != 0) {
806                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
807                                 talloc_free(h);
808                                 return NULL;
809                         }
810
811                         goto again;
812                 }
813
814                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
815                         ret = ctdb_client_force_migration(ctdb_db, key);
816                         if (ret != 0) {
817                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
818                                 talloc_free(h);
819                                 return NULL;
820                         }
821
822                         goto again;
823                 }
824
825                 ret = ctdb_ltdb_lock(ctdb_db, key);
826                 if (ret != 0) {
827                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
828                         talloc_free(h);
829                         return NULL;
830                 }
831
832                 ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
833                 if (ret != 0) {
834                         ctdb_ltdb_unlock(ctdb_db, key);
835
836                         ret = ctdb_client_force_migration(ctdb_db, key);
837                         if (ret != 0) {
838                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
839                                 talloc_free(h);
840                                 return NULL;
841                         }
842
843                         goto again;
844                 }
845
846                 if (h->header.rsn >= roheader->rsn) {
847                         DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n"));
848                         ctdb_ltdb_unlock(ctdb_db, key);
849
850                         ret = ctdb_client_force_migration(ctdb_db, key);
851                         if (ret != 0) {
852                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
853                                 talloc_free(h);
854                                 return NULL;
855                         }
856
857                         goto again;
858                 }
859
860                 if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) {
861                         ctdb_ltdb_unlock(ctdb_db, key);
862
863                         ret = ctdb_client_force_migration(ctdb_db, key);
864                         if (ret != 0) {
865                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
866                                 talloc_free(h);
867                                 return NULL;
868                         }
869
870                         goto again;
871                 }
872                 return h;
873         }
874
875         /* we are not dmaster and this was not a request for a readonly lock
876          * so unlock the record, migrate it and try again
877          */
878         ctdb_ltdb_unlock(ctdb_db, key);
879         ret = ctdb_client_force_migration(ctdb_db, key);
880         if (ret != 0) {
881                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
882                 talloc_free(h);
883                 return NULL;
884         }
885         goto again;
886 }
887
888 /*
889   store some data to the record that was locked with ctdb_fetch_lock()
890 */
891 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
892 {
893         if (h->ctdb_db->persistent) {
894                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
895                 return -1;
896         }
897
898         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
899 }
900
901 /*
902   non-locking fetch of a record
903  */
904 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
905                TDB_DATA key, TDB_DATA *data)
906 {
907         struct ctdb_call call;
908         int ret;
909
910         call.call_id = CTDB_FETCH_FUNC;
911         call.call_data.dptr = NULL;
912         call.call_data.dsize = 0;
913         call.key = key;
914
915         ret = ctdb_call(ctdb_db, &call);
916
917         if (ret == 0) {
918                 *data = call.reply_data;
919                 talloc_steal(mem_ctx, data->dptr);
920         }
921
922         return ret;
923 }
924
925
926
927 /*
928    called when a control completes or timesout to invoke the callback
929    function the user provided
930 */
931 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
932         struct timeval t, void *private_data)
933 {
934         struct ctdb_client_control_state *state;
935         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
936         int ret;
937
938         state = talloc_get_type(private_data, struct ctdb_client_control_state);
939         talloc_steal(tmp_ctx, state);
940
941         ret = ctdb_control_recv(state->ctdb, state, state,
942                         NULL, 
943                         NULL, 
944                         NULL);
945
946         talloc_free(tmp_ctx);
947 }
948
949 /*
950   called when a CTDB_REPLY_CONTROL packet comes in in the client
951
952   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
953   contains any reply data from the control
954 */
955 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
956                                       struct ctdb_req_header *hdr)
957 {
958         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
959         struct ctdb_client_control_state *state;
960
961         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
962         if (state == NULL) {
963                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
964                 return;
965         }
966
967         if (hdr->reqid != state->reqid) {
968                 /* we found a record  but it was the wrong one */
969                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
970                 return;
971         }
972
973         state->outdata.dptr = c->data;
974         state->outdata.dsize = c->datalen;
975         state->status = c->status;
976         if (c->errorlen) {
977                 state->errormsg = talloc_strndup(state, 
978                                                  (char *)&c->data[c->datalen], 
979                                                  c->errorlen);
980         }
981
982         /* state->outdata now uses resources from c so we dont want c
983            to just dissappear from under us while state is still alive
984         */
985         talloc_steal(state, c);
986
987         state->state = CTDB_CONTROL_DONE;
988
989         /* if we had a callback registered for this control, pull the response
990            and call the callback.
991         */
992         if (state->async.fn) {
993                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
994         }
995 }
996
997
998 /*
999   destroy a ctdb_control in client
1000 */
1001 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
1002 {
1003         ctdb_reqid_remove(state->ctdb, state->reqid);
1004         return 0;
1005 }
1006
1007
1008 /* time out handler for ctdb_control */
1009 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1010         struct timeval t, void *private_data)
1011 {
1012         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1013
1014         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1015                          "dstnode:%u\n", state->reqid, state->c->opcode,
1016                          state->c->hdr.destnode));
1017
1018         state->state = CTDB_CONTROL_TIMEOUT;
1019
1020         /* if we had a callback registered for this control, pull the response
1021            and call the callback.
1022         */
1023         if (state->async.fn) {
1024                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1025         }
1026 }
1027
1028 /* async version of send control request */
1029 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1030                 uint32_t destnode, uint64_t srvid, 
1031                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1032                 TALLOC_CTX *mem_ctx,
1033                 struct timeval *timeout,
1034                 char **errormsg)
1035 {
1036         struct ctdb_client_control_state *state;
1037         size_t len;
1038         struct ctdb_req_control *c;
1039         int ret;
1040
1041         if (errormsg) {
1042                 *errormsg = NULL;
1043         }
1044
1045         /* if the domain socket is not yet open, open it */
1046         if (ctdb->daemon.sd==-1) {
1047                 ctdb_socket_connect(ctdb);
1048         }
1049
1050         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1051         CTDB_NO_MEMORY_NULL(ctdb, state);
1052
1053         state->ctdb       = ctdb;
1054         state->reqid      = ctdb_reqid_new(ctdb, state);
1055         state->state      = CTDB_CONTROL_WAIT;
1056         state->errormsg   = NULL;
1057
1058         talloc_set_destructor(state, ctdb_control_destructor);
1059
1060         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1061         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1062                                len, struct ctdb_req_control);
1063         state->c            = c;        
1064         CTDB_NO_MEMORY_NULL(ctdb, c);
1065         c->hdr.reqid        = state->reqid;
1066         c->hdr.destnode     = destnode;
1067         c->opcode           = opcode;
1068         c->client_id        = 0;
1069         c->flags            = flags;
1070         c->srvid            = srvid;
1071         c->datalen          = data.dsize;
1072         if (data.dsize) {
1073                 memcpy(&c->data[0], data.dptr, data.dsize);
1074         }
1075
1076         /* timeout */
1077         if (timeout && !timeval_is_zero(timeout)) {
1078                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1079         }
1080
1081         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1082         if (ret != 0) {
1083                 talloc_free(state);
1084                 return NULL;
1085         }
1086
1087         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1088                 talloc_free(state);
1089                 return NULL;
1090         }
1091
1092         return state;
1093 }
1094
1095
1096 /* async version of receive control reply */
1097 int ctdb_control_recv(struct ctdb_context *ctdb, 
1098                 struct ctdb_client_control_state *state, 
1099                 TALLOC_CTX *mem_ctx,
1100                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1101 {
1102         TALLOC_CTX *tmp_ctx;
1103
1104         if (status != NULL) {
1105                 *status = -1;
1106         }
1107         if (errormsg != NULL) {
1108                 *errormsg = NULL;
1109         }
1110
1111         if (state == NULL) {
1112                 return -1;
1113         }
1114
1115         /* prevent double free of state */
1116         tmp_ctx = talloc_new(ctdb);
1117         talloc_steal(tmp_ctx, state);
1118
1119         /* loop one event at a time until we either timeout or the control
1120            completes.
1121         */
1122         while (state->state == CTDB_CONTROL_WAIT) {
1123                 event_loop_once(ctdb->ev);
1124         }
1125
1126         if (state->state != CTDB_CONTROL_DONE) {
1127                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1128                 if (state->async.fn) {
1129                         state->async.fn(state);
1130                 }
1131                 talloc_free(tmp_ctx);
1132                 return -1;
1133         }
1134
1135         if (state->errormsg) {
1136                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1137                 if (errormsg) {
1138                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1139                 }
1140                 if (state->async.fn) {
1141                         state->async.fn(state);
1142                 }
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         if (outdata) {
1148                 *outdata = state->outdata;
1149                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1150         }
1151
1152         if (status) {
1153                 *status = state->status;
1154         }
1155
1156         if (state->async.fn) {
1157                 state->async.fn(state);
1158         }
1159
1160         talloc_free(tmp_ctx);
1161         return 0;
1162 }
1163
1164
1165
1166 /*
1167   send a ctdb control message
1168   timeout specifies how long we should wait for a reply.
1169   if timeout is NULL we wait indefinitely
1170  */
1171 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1172                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1173                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1174                  struct timeval *timeout,
1175                  char **errormsg)
1176 {
1177         struct ctdb_client_control_state *state;
1178
1179         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1180                         flags, data, mem_ctx,
1181                         timeout, errormsg);
1182         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1183                         errormsg);
1184 }
1185
1186
1187
1188
1189 /*
1190   a process exists call. Returns 0 if process exists, -1 otherwise
1191  */
1192 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1193 {
1194         int ret;
1195         TDB_DATA data;
1196         int32_t status;
1197
1198         data.dptr = (uint8_t*)&pid;
1199         data.dsize = sizeof(pid);
1200
1201         ret = ctdb_control(ctdb, destnode, 0, 
1202                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1203                            NULL, NULL, &status, NULL, NULL);
1204         if (ret != 0) {
1205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1206                 return -1;
1207         }
1208
1209         return status;
1210 }
1211
1212 /*
1213   get remote statistics
1214  */
1215 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1216 {
1217         int ret;
1218         TDB_DATA data;
1219         int32_t res;
1220
1221         ret = ctdb_control(ctdb, destnode, 0, 
1222                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1223                            ctdb, &data, &res, NULL, NULL);
1224         if (ret != 0 || res != 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1226                 return -1;
1227         }
1228
1229         if (data.dsize != sizeof(struct ctdb_statistics)) {
1230                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1231                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1232                       return -1;
1233         }
1234
1235         *status = *(struct ctdb_statistics *)data.dptr;
1236         talloc_free(data.dptr);
1237                         
1238         return 0;
1239 }
1240
1241 /*
1242   shutdown a remote ctdb node
1243  */
1244 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1245 {
1246         struct ctdb_client_control_state *state;
1247
1248         state = ctdb_control_send(ctdb, destnode, 0, 
1249                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1250                            NULL, &timeout, NULL);
1251         if (state == NULL) {
1252                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1253                 return -1;
1254         }
1255
1256         return 0;
1257 }
1258
1259 /*
1260   get vnn map from a remote node
1261  */
1262 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1263 {
1264         int ret;
1265         TDB_DATA outdata;
1266         int32_t res;
1267         struct ctdb_vnn_map_wire *map;
1268
1269         ret = ctdb_control(ctdb, destnode, 0, 
1270                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1271                            mem_ctx, &outdata, &res, &timeout, NULL);
1272         if (ret != 0 || res != 0) {
1273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1274                 return -1;
1275         }
1276         
1277         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1278         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1279             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1280                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1281                 return -1;
1282         }
1283
1284         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1285         CTDB_NO_MEMORY(ctdb, *vnnmap);
1286         (*vnnmap)->generation = map->generation;
1287         (*vnnmap)->size       = map->size;
1288         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1289
1290         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1291         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1292         talloc_free(outdata.dptr);
1293                     
1294         return 0;
1295 }
1296
1297
1298 /*
1299   get the recovery mode of a remote node
1300  */
1301 struct ctdb_client_control_state *
1302 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1303 {
1304         return ctdb_control_send(ctdb, destnode, 0, 
1305                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1306                            mem_ctx, &timeout, NULL);
1307 }
1308
1309 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1310 {
1311         int ret;
1312         int32_t res;
1313
1314         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1315         if (ret != 0) {
1316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1317                 return -1;
1318         }
1319
1320         if (recmode) {
1321                 *recmode = (uint32_t)res;
1322         }
1323
1324         return 0;
1325 }
1326
1327 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1328 {
1329         struct ctdb_client_control_state *state;
1330
1331         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1332         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1333 }
1334
1335
1336
1337
1338 /*
1339   set the recovery mode of a remote node
1340  */
1341 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1342 {
1343         int ret;
1344         TDB_DATA data;
1345         int32_t res;
1346
1347         data.dsize = sizeof(uint32_t);
1348         data.dptr = (unsigned char *)&recmode;
1349
1350         ret = ctdb_control(ctdb, destnode, 0, 
1351                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1352                            NULL, NULL, &res, &timeout, NULL);
1353         if (ret != 0 || res != 0) {
1354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1355                 return -1;
1356         }
1357
1358         return 0;
1359 }
1360
1361
1362
1363 /*
1364   get the recovery master of a remote node
1365  */
1366 struct ctdb_client_control_state *
1367 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1368                         struct timeval timeout, uint32_t destnode)
1369 {
1370         return ctdb_control_send(ctdb, destnode, 0, 
1371                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1372                            mem_ctx, &timeout, NULL);
1373 }
1374
1375 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1376 {
1377         int ret;
1378         int32_t res;
1379
1380         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1381         if (ret != 0) {
1382                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1383                 return -1;
1384         }
1385
1386         if (recmaster) {
1387                 *recmaster = (uint32_t)res;
1388         }
1389
1390         return 0;
1391 }
1392
1393 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1394 {
1395         struct ctdb_client_control_state *state;
1396
1397         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1398         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1399 }
1400
1401
1402 /*
1403   set the recovery master of a remote node
1404  */
1405 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1406 {
1407         int ret;
1408         TDB_DATA data;
1409         int32_t res;
1410
1411         ZERO_STRUCT(data);
1412         data.dsize = sizeof(uint32_t);
1413         data.dptr = (unsigned char *)&recmaster;
1414
1415         ret = ctdb_control(ctdb, destnode, 0, 
1416                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1417                            NULL, NULL, &res, &timeout, NULL);
1418         if (ret != 0 || res != 0) {
1419                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1420                 return -1;
1421         }
1422
1423         return 0;
1424 }
1425
1426
1427 /*
1428   get a list of databases off a remote node
1429  */
1430 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1431                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1432 {
1433         int ret;
1434         TDB_DATA outdata;
1435         int32_t res;
1436
1437         ret = ctdb_control(ctdb, destnode, 0, 
1438                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1439                            mem_ctx, &outdata, &res, &timeout, NULL);
1440         if (ret != 0 || res != 0) {
1441                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1442                 return -1;
1443         }
1444
1445         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1446         talloc_free(outdata.dptr);
1447                     
1448         return 0;
1449 }
1450
1451 /*
1452   get a list of nodes (vnn and flags ) from a remote node
1453  */
1454 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1455                 struct timeval timeout, uint32_t destnode, 
1456                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1457 {
1458         int ret;
1459         TDB_DATA outdata;
1460         int32_t res;
1461
1462         ret = ctdb_control(ctdb, destnode, 0, 
1463                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1464                            mem_ctx, &outdata, &res, &timeout, NULL);
1465         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1466                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1467                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1468         }
1469         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1470                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1471                 return -1;
1472         }
1473
1474         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1475         talloc_free(outdata.dptr);
1476                     
1477         return 0;
1478 }
1479
1480 /*
1481   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1482  */
1483 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1484                 struct timeval timeout, uint32_t destnode, 
1485                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1486 {
1487         int ret, i, len;
1488         TDB_DATA outdata;
1489         struct ctdb_node_mapv4 *nodemapv4;
1490         int32_t res;
1491
1492         ret = ctdb_control(ctdb, destnode, 0, 
1493                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1494                            mem_ctx, &outdata, &res, &timeout, NULL);
1495         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1496                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1497                 return -1;
1498         }
1499
1500         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1501
1502         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1503         (*nodemap) = talloc_zero_size(mem_ctx, len);
1504         CTDB_NO_MEMORY(ctdb, (*nodemap));
1505
1506         (*nodemap)->num = nodemapv4->num;
1507         for (i=0; i<nodemapv4->num; i++) {
1508                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1509                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1510                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1511                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1512         }
1513                 
1514         talloc_free(outdata.dptr);
1515                     
1516         return 0;
1517 }
1518
1519 /*
1520   drop the transport, reload the nodes file and restart the transport
1521  */
1522 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1523                     struct timeval timeout, uint32_t destnode)
1524 {
1525         int ret;
1526         int32_t res;
1527
1528         ret = ctdb_control(ctdb, destnode, 0, 
1529                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1530                            NULL, NULL, &res, &timeout, NULL);
1531         if (ret != 0 || res != 0) {
1532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1533                 return -1;
1534         }
1535
1536         return 0;
1537 }
1538
1539
1540 /*
1541   set vnn map on a node
1542  */
1543 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1544                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1545 {
1546         int ret;
1547         TDB_DATA data;
1548         int32_t res;
1549         struct ctdb_vnn_map_wire *map;
1550         size_t len;
1551
1552         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1553         map = talloc_size(mem_ctx, len);
1554         CTDB_NO_MEMORY(ctdb, map);
1555
1556         map->generation = vnnmap->generation;
1557         map->size = vnnmap->size;
1558         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1559         
1560         data.dsize = len;
1561         data.dptr  = (uint8_t *)map;
1562
1563         ret = ctdb_control(ctdb, destnode, 0, 
1564                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1565                            NULL, NULL, &res, &timeout, NULL);
1566         if (ret != 0 || res != 0) {
1567                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1568                 return -1;
1569         }
1570
1571         talloc_free(map);
1572
1573         return 0;
1574 }
1575
1576
1577 /*
1578   async send for pull database
1579  */
1580 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1581         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1582         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1583 {
1584         TDB_DATA indata;
1585         struct ctdb_control_pulldb *pull;
1586         struct ctdb_client_control_state *state;
1587
1588         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1589         CTDB_NO_MEMORY_NULL(ctdb, pull);
1590
1591         pull->db_id   = dbid;
1592         pull->lmaster = lmaster;
1593
1594         indata.dsize = sizeof(struct ctdb_control_pulldb);
1595         indata.dptr  = (unsigned char *)pull;
1596
1597         state = ctdb_control_send(ctdb, destnode, 0, 
1598                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1599                                   mem_ctx, &timeout, NULL);
1600         talloc_free(pull);
1601
1602         return state;
1603 }
1604
1605 /*
1606   async recv for pull database
1607  */
1608 int ctdb_ctrl_pulldb_recv(
1609         struct ctdb_context *ctdb, 
1610         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1611         TDB_DATA *outdata)
1612 {
1613         int ret;
1614         int32_t res;
1615
1616         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1617         if ( (ret != 0) || (res != 0) ){
1618                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1619                 return -1;
1620         }
1621
1622         return 0;
1623 }
1624
1625 /*
1626   pull all keys and records for a specific database on a node
1627  */
1628 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1629                 uint32_t dbid, uint32_t lmaster, 
1630                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1631                 TDB_DATA *outdata)
1632 {
1633         struct ctdb_client_control_state *state;
1634
1635         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1636                                       timeout);
1637         
1638         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1639 }
1640
1641
1642 /*
1643   change dmaster for all keys in the database to the new value
1644  */
1645 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1646                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1647 {
1648         int ret;
1649         TDB_DATA indata;
1650         int32_t res;
1651
1652         indata.dsize = 2*sizeof(uint32_t);
1653         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1654
1655         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1656         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1657
1658         ret = ctdb_control(ctdb, destnode, 0, 
1659                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1660                            NULL, NULL, &res, &timeout, NULL);
1661         if (ret != 0 || res != 0) {
1662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1663                 return -1;
1664         }
1665
1666         return 0;
1667 }
1668
1669 /*
1670   ping a node, return number of clients connected
1671  */
1672 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1673 {
1674         int ret;
1675         int32_t res;
1676
1677         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1678                            tdb_null, NULL, NULL, &res, NULL, NULL);
1679         if (ret != 0) {
1680                 return -1;
1681         }
1682         return res;
1683 }
1684
1685 /*
1686   find the real path to a ltdb 
1687  */
1688 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1689                    const char **path)
1690 {
1691         int ret;
1692         int32_t res;
1693         TDB_DATA data;
1694
1695         data.dptr = (uint8_t *)&dbid;
1696         data.dsize = sizeof(dbid);
1697
1698         ret = ctdb_control(ctdb, destnode, 0, 
1699                            CTDB_CONTROL_GETDBPATH, 0, data, 
1700                            mem_ctx, &data, &res, &timeout, NULL);
1701         if (ret != 0 || res != 0) {
1702                 return -1;
1703         }
1704
1705         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1706         if ((*path) == NULL) {
1707                 return -1;
1708         }
1709
1710         talloc_free(data.dptr);
1711
1712         return 0;
1713 }
1714
1715 /*
1716   find the name of a db 
1717  */
1718 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1719                    const char **name)
1720 {
1721         int ret;
1722         int32_t res;
1723         TDB_DATA data;
1724
1725         data.dptr = (uint8_t *)&dbid;
1726         data.dsize = sizeof(dbid);
1727
1728         ret = ctdb_control(ctdb, destnode, 0, 
1729                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1730                            mem_ctx, &data, &res, &timeout, NULL);
1731         if (ret != 0 || res != 0) {
1732                 return -1;
1733         }
1734
1735         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1736         if ((*name) == NULL) {
1737                 return -1;
1738         }
1739
1740         talloc_free(data.dptr);
1741
1742         return 0;
1743 }
1744
1745 /*
1746   get the health status of a db
1747  */
1748 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1749                           struct timeval timeout,
1750                           uint32_t destnode,
1751                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1752                           const char **reason)
1753 {
1754         int ret;
1755         int32_t res;
1756         TDB_DATA data;
1757
1758         data.dptr = (uint8_t *)&dbid;
1759         data.dsize = sizeof(dbid);
1760
1761         ret = ctdb_control(ctdb, destnode, 0,
1762                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1763                            mem_ctx, &data, &res, &timeout, NULL);
1764         if (ret != 0 || res != 0) {
1765                 return -1;
1766         }
1767
1768         if (data.dsize == 0) {
1769                 (*reason) = NULL;
1770                 return 0;
1771         }
1772
1773         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1774         if ((*reason) == NULL) {
1775                 return -1;
1776         }
1777
1778         talloc_free(data.dptr);
1779
1780         return 0;
1781 }
1782
1783 /*
1784   create a database
1785  */
1786 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1787                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1788 {
1789         int ret;
1790         int32_t res;
1791         TDB_DATA data;
1792
1793         data.dptr = discard_const(name);
1794         data.dsize = strlen(name)+1;
1795
1796         ret = ctdb_control(ctdb, destnode, 0, 
1797                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1798                            0, data, 
1799                            mem_ctx, &data, &res, &timeout, NULL);
1800
1801         if (ret != 0 || res != 0) {
1802                 return -1;
1803         }
1804
1805         return 0;
1806 }
1807
1808 /*
1809   get debug level on a node
1810  */
1811 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1812 {
1813         int ret;
1814         int32_t res;
1815         TDB_DATA data;
1816
1817         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1818                            ctdb, &data, &res, NULL, NULL);
1819         if (ret != 0 || res != 0) {
1820                 return -1;
1821         }
1822         if (data.dsize != sizeof(int32_t)) {
1823                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1824                          (unsigned)data.dsize));
1825                 return -1;
1826         }
1827         *level = *(int32_t *)data.dptr;
1828         talloc_free(data.dptr);
1829         return 0;
1830 }
1831
1832 /*
1833   set debug level on a node
1834  */
1835 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1836 {
1837         int ret;
1838         int32_t res;
1839         TDB_DATA data;
1840
1841         data.dptr = (uint8_t *)&level;
1842         data.dsize = sizeof(level);
1843
1844         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1845                            NULL, NULL, &res, NULL, NULL);
1846         if (ret != 0 || res != 0) {
1847                 return -1;
1848         }
1849         return 0;
1850 }
1851
1852
1853 /*
1854   get a list of connected nodes
1855  */
1856 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1857                                 struct timeval timeout,
1858                                 TALLOC_CTX *mem_ctx,
1859                                 uint32_t *num_nodes)
1860 {
1861         struct ctdb_node_map *map=NULL;
1862         int ret, i;
1863         uint32_t *nodes;
1864
1865         *num_nodes = 0;
1866
1867         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1868         if (ret != 0) {
1869                 return NULL;
1870         }
1871
1872         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1873         if (nodes == NULL) {
1874                 return NULL;
1875         }
1876
1877         for (i=0;i<map->num;i++) {
1878                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1879                         nodes[*num_nodes] = map->nodes[i].pnn;
1880                         (*num_nodes)++;
1881                 }
1882         }
1883
1884         return nodes;
1885 }
1886
1887
1888 /*
1889   reset remote status
1890  */
1891 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1892 {
1893         int ret;
1894         int32_t res;
1895
1896         ret = ctdb_control(ctdb, destnode, 0, 
1897                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1898                            NULL, NULL, &res, NULL, NULL);
1899         if (ret != 0 || res != 0) {
1900                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1901                 return -1;
1902         }
1903         return 0;
1904 }
1905
1906 /*
1907   this is the dummy null procedure that all databases support
1908 */
1909 static int ctdb_null_func(struct ctdb_call_info *call)
1910 {
1911         return 0;
1912 }
1913
1914 /*
1915   this is a plain fetch procedure that all databases support
1916 */
1917 static int ctdb_fetch_func(struct ctdb_call_info *call)
1918 {
1919         call->reply_data = &call->record_data;
1920         return 0;
1921 }
1922
1923 /*
1924   this is a plain fetch procedure that all databases support
1925   this returns the full record including the ltdb header
1926 */
1927 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
1928 {
1929         call->reply_data = talloc(call, TDB_DATA);
1930         if (call->reply_data == NULL) {
1931                 return -1;
1932         }
1933         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
1934         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
1935         if (call->reply_data->dptr == NULL) {
1936                 return -1;
1937         }
1938         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
1939         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
1940
1941         return 0;
1942 }
1943
1944 /*
1945   attach to a specific database - client call
1946 */
1947 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1948 {
1949         struct ctdb_db_context *ctdb_db;
1950         TDB_DATA data;
1951         int ret;
1952         int32_t res;
1953
1954         ctdb_db = ctdb_db_handle(ctdb, name);
1955         if (ctdb_db) {
1956                 return ctdb_db;
1957         }
1958
1959         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1960         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1961
1962         ctdb_db->ctdb = ctdb;
1963         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1964         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1965
1966         data.dptr = discard_const(name);
1967         data.dsize = strlen(name)+1;
1968
1969         /* tell ctdb daemon to attach */
1970         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1971                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1972                            0, data, ctdb_db, &data, &res, NULL, NULL);
1973         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1974                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1975                 talloc_free(ctdb_db);
1976                 return NULL;
1977         }
1978         
1979         ctdb_db->db_id = *(uint32_t *)data.dptr;
1980         talloc_free(data.dptr);
1981
1982         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1983         if (ret != 0) {
1984                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1985                 talloc_free(ctdb_db);
1986                 return NULL;
1987         }
1988
1989         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1990         if (ctdb->valgrinding) {
1991                 tdb_flags |= TDB_NOMMAP;
1992         }
1993         tdb_flags |= TDB_DISALLOW_NESTING;
1994
1995         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1996         if (ctdb_db->ltdb == NULL) {
1997                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1998                 talloc_free(ctdb_db);
1999                 return NULL;
2000         }
2001
2002         ctdb_db->persistent = persistent;
2003
2004         DLIST_ADD(ctdb->db_list, ctdb_db);
2005
2006         /* add well known functions */
2007         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2008         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2009         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2010
2011         return ctdb_db;
2012 }
2013
2014
2015 /*
2016   setup a call for a database
2017  */
2018 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2019 {
2020         struct ctdb_registered_call *call;
2021
2022 #if 0
2023         TDB_DATA data;
2024         int32_t status;
2025         struct ctdb_control_set_call c;
2026         int ret;
2027
2028         /* this is no longer valid with the separate daemon architecture */
2029         c.db_id = ctdb_db->db_id;
2030         c.fn    = fn;
2031         c.id    = id;
2032
2033         data.dptr = (uint8_t *)&c;
2034         data.dsize = sizeof(c);
2035
2036         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2037                            data, NULL, NULL, &status, NULL, NULL);
2038         if (ret != 0 || status != 0) {
2039                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2040                 return -1;
2041         }
2042 #endif
2043
2044         /* also register locally */
2045         call = talloc(ctdb_db, struct ctdb_registered_call);
2046         call->fn = fn;
2047         call->id = id;
2048
2049         DLIST_ADD(ctdb_db->calls, call);        
2050         return 0;
2051 }
2052
2053
2054 struct traverse_state {
2055         bool done;
2056         uint32_t count;
2057         ctdb_traverse_func fn;
2058         void *private_data;
2059 };
2060
2061 /*
2062   called on each key during a ctdb_traverse
2063  */
2064 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2065 {
2066         struct traverse_state *state = (struct traverse_state *)p;
2067         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2068         TDB_DATA key;
2069
2070         if (data.dsize < sizeof(uint32_t) ||
2071             d->length != data.dsize) {
2072                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2073                 state->done = True;
2074                 return;
2075         }
2076
2077         key.dsize = d->keylen;
2078         key.dptr  = &d->data[0];
2079         data.dsize = d->datalen;
2080         data.dptr = &d->data[d->keylen];
2081
2082         if (key.dsize == 0 && data.dsize == 0) {
2083                 /* end of traverse */
2084                 state->done = True;
2085                 return;
2086         }
2087
2088         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
2089                 /* empty records are deleted records in ctdb */
2090                 return;
2091         }
2092
2093         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2094                 state->done = True;
2095         }
2096
2097         state->count++;
2098 }
2099
2100
2101 /*
2102   start a cluster wide traverse, calling the supplied fn on each record
2103   return the number of records traversed, or -1 on error
2104  */
2105 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2106 {
2107         TDB_DATA data;
2108         struct ctdb_traverse_start t;
2109         int32_t status;
2110         int ret;
2111         uint64_t srvid = (getpid() | 0xFLL<<60);
2112         struct traverse_state state;
2113
2114         state.done = False;
2115         state.count = 0;
2116         state.private_data = private_data;
2117         state.fn = fn;
2118
2119         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2120         if (ret != 0) {
2121                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2122                 return -1;
2123         }
2124
2125         t.db_id = ctdb_db->db_id;
2126         t.srvid = srvid;
2127         t.reqid = 0;
2128
2129         data.dptr = (uint8_t *)&t;
2130         data.dsize = sizeof(t);
2131
2132         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
2133                            data, NULL, NULL, &status, NULL, NULL);
2134         if (ret != 0 || status != 0) {
2135                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2136                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2137                 return -1;
2138         }
2139
2140         while (!state.done) {
2141                 event_loop_once(ctdb_db->ctdb->ev);
2142         }
2143
2144         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2145         if (ret != 0) {
2146                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2147                 return -1;
2148         }
2149
2150         return state.count;
2151 }
2152
2153 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2154 /*
2155   called on each key during a catdb
2156  */
2157 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2158 {
2159         int i;
2160         FILE *f = (FILE *)p;
2161         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2162
2163         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2164         for (i=0;i<key.dsize;i++) {
2165                 if (ISASCII(key.dptr[i])) {
2166                         fprintf(f, "%c", key.dptr[i]);
2167                 } else {
2168                         fprintf(f, "\\%02X", key.dptr[i]);
2169                 }
2170         }
2171         fprintf(f, "\"\n");
2172
2173         fprintf(f, "dmaster: %u\n", h->dmaster);
2174         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2175         fprintf(f, "flags: 0x%08x", h->flags);
2176         if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2177         if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2178         if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2179         if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2180         if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2181         if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2182         if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2183         fprintf(f, "\n");
2184
2185         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2186         for (i=sizeof(*h);i<data.dsize;i++) {
2187                 if (ISASCII(data.dptr[i])) {
2188                         fprintf(f, "%c", data.dptr[i]);
2189                 } else {
2190                         fprintf(f, "\\%02X", data.dptr[i]);
2191                 }
2192         }
2193         fprintf(f, "\"\n");
2194
2195         fprintf(f, "\n");
2196
2197         return 0;
2198 }
2199
2200 /*
2201   convenience function to list all keys to stdout
2202  */
2203 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
2204 {
2205         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
2206 }
2207
2208 /*
2209   get the pid of a ctdb daemon
2210  */
2211 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2212 {
2213         int ret;
2214         int32_t res;
2215
2216         ret = ctdb_control(ctdb, destnode, 0, 
2217                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2218                            NULL, NULL, &res, &timeout, NULL);
2219         if (ret != 0) {
2220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2221                 return -1;
2222         }
2223
2224         *pid = res;
2225
2226         return 0;
2227 }
2228
2229
2230 /*
2231   async freeze send control
2232  */
2233 struct ctdb_client_control_state *
2234 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2235 {
2236         return ctdb_control_send(ctdb, destnode, priority, 
2237                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2238                            mem_ctx, &timeout, NULL);
2239 }
2240
2241 /* 
2242    async freeze recv control
2243 */
2244 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2245 {
2246         int ret;
2247         int32_t res;
2248
2249         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2250         if ( (ret != 0) || (res != 0) ){
2251                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2252                 return -1;
2253         }
2254
2255         return 0;
2256 }
2257
2258 /*
2259   freeze databases of a certain priority
2260  */
2261 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2262 {
2263         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2264         struct ctdb_client_control_state *state;
2265         int ret;
2266
2267         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2268         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2269         talloc_free(tmp_ctx);
2270
2271         return ret;
2272 }
2273
2274 /* Freeze all databases */
2275 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2276 {
2277         int i;
2278
2279         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2280                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2281                         return -1;
2282                 }
2283         }
2284         return 0;
2285 }
2286
2287 /*
2288   thaw databases of a certain priority
2289  */
2290 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2291 {
2292         int ret;
2293         int32_t res;
2294
2295         ret = ctdb_control(ctdb, destnode, priority, 
2296                            CTDB_CONTROL_THAW, 0, tdb_null, 
2297                            NULL, NULL, &res, &timeout, NULL);
2298         if (ret != 0 || res != 0) {
2299                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2300                 return -1;
2301         }
2302
2303         return 0;
2304 }
2305
2306 /* thaw all databases */
2307 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2308 {
2309         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2310 }
2311
2312 /*
2313   get pnn of a node, or -1
2314  */
2315 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2316 {
2317         int ret;
2318         int32_t res;
2319
2320         ret = ctdb_control(ctdb, destnode, 0, 
2321                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2322                            NULL, NULL, &res, &timeout, NULL);
2323         if (ret != 0) {
2324                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2325                 return -1;
2326         }
2327
2328         return res;
2329 }
2330
2331 /*
2332   get the monitoring mode of a remote node
2333  */
2334 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2335 {
2336         int ret;
2337         int32_t res;
2338
2339         ret = ctdb_control(ctdb, destnode, 0, 
2340                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2341                            NULL, NULL, &res, &timeout, NULL);
2342         if (ret != 0) {
2343                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2344                 return -1;
2345         }
2346
2347         *monmode = res;
2348
2349         return 0;
2350 }
2351
2352
2353 /*
2354  set the monitoring mode of a remote node to active
2355  */
2356 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2357 {
2358         int ret;
2359         
2360
2361         ret = ctdb_control(ctdb, destnode, 0, 
2362                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2363                            NULL, NULL,NULL, &timeout, NULL);
2364         if (ret != 0) {
2365                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2366                 return -1;
2367         }
2368
2369         
2370
2371         return 0;
2372 }
2373
2374 /*
2375   set the monitoring mode of a remote node to disable
2376  */
2377 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2378 {
2379         int ret;
2380         
2381
2382         ret = ctdb_control(ctdb, destnode, 0, 
2383                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2384                            NULL, NULL, NULL, &timeout, NULL);
2385         if (ret != 0) {
2386                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2387                 return -1;
2388         }
2389
2390         
2391
2392         return 0;
2393 }
2394
2395
2396
2397 /* 
2398   sent to a node to make it take over an ip address
2399 */
2400 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2401                           uint32_t destnode, struct ctdb_public_ip *ip)
2402 {
2403         TDB_DATA data;
2404         struct ctdb_public_ipv4 ipv4;
2405         int ret;
2406         int32_t res;
2407
2408         if (ip->addr.sa.sa_family == AF_INET) {
2409                 ipv4.pnn = ip->pnn;
2410                 ipv4.sin = ip->addr.ip;
2411
2412                 data.dsize = sizeof(ipv4);
2413                 data.dptr  = (uint8_t *)&ipv4;
2414
2415                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2416                            NULL, &res, &timeout, NULL);
2417         } else {
2418                 data.dsize = sizeof(*ip);
2419                 data.dptr  = (uint8_t *)ip;
2420
2421                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2422                            NULL, &res, &timeout, NULL);
2423         }
2424
2425         if (ret != 0 || res != 0) {
2426                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2427                 return -1;
2428         }
2429
2430         return 0;       
2431 }
2432
2433
2434 /* 
2435   sent to a node to make it release an ip address
2436 */
2437 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2438                          uint32_t destnode, struct ctdb_public_ip *ip)
2439 {
2440         TDB_DATA data;
2441         struct ctdb_public_ipv4 ipv4;
2442         int ret;
2443         int32_t res;
2444
2445         if (ip->addr.sa.sa_family == AF_INET) {
2446                 ipv4.pnn = ip->pnn;
2447                 ipv4.sin = ip->addr.ip;
2448
2449                 data.dsize = sizeof(ipv4);
2450                 data.dptr  = (uint8_t *)&ipv4;
2451
2452                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2453                                    NULL, &res, &timeout, NULL);
2454         } else {
2455                 data.dsize = sizeof(*ip);
2456                 data.dptr  = (uint8_t *)ip;
2457
2458                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2459                                    NULL, &res, &timeout, NULL);
2460         }
2461
2462         if (ret != 0 || res != 0) {
2463                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2464                 return -1;
2465         }
2466
2467         return 0;       
2468 }
2469
2470
2471 /*
2472   get a tunable
2473  */
2474 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2475                           struct timeval timeout, 
2476                           uint32_t destnode,
2477                           const char *name, uint32_t *value)
2478 {
2479         struct ctdb_control_get_tunable *t;
2480         TDB_DATA data, outdata;
2481         int32_t res;
2482         int ret;
2483
2484         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2485         data.dptr  = talloc_size(ctdb, data.dsize);
2486         CTDB_NO_MEMORY(ctdb, data.dptr);
2487
2488         t = (struct ctdb_control_get_tunable *)data.dptr;
2489         t->length = strlen(name)+1;
2490         memcpy(t->name, name, t->length);
2491
2492         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2493                            &outdata, &res, &timeout, NULL);
2494         talloc_free(data.dptr);
2495         if (ret != 0 || res != 0) {
2496                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2497                 return -1;
2498         }
2499
2500         if (outdata.dsize != sizeof(uint32_t)) {
2501                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2502                 talloc_free(outdata.dptr);
2503                 return -1;
2504         }
2505         
2506         *value = *(uint32_t *)outdata.dptr;
2507         talloc_free(outdata.dptr);
2508
2509         return 0;
2510 }
2511
2512 /*
2513   set a tunable
2514  */
2515 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2516                           struct timeval timeout, 
2517                           uint32_t destnode,
2518                           const char *name, uint32_t value)
2519 {
2520         struct ctdb_control_set_tunable *t;
2521         TDB_DATA data;
2522         int32_t res;
2523         int ret;
2524
2525         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2526         data.dptr  = talloc_size(ctdb, data.dsize);
2527         CTDB_NO_MEMORY(ctdb, data.dptr);
2528
2529         t = (struct ctdb_control_set_tunable *)data.dptr;
2530         t->length = strlen(name)+1;
2531         memcpy(t->name, name, t->length);
2532         t->value = value;
2533
2534         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2535                            NULL, &res, &timeout, NULL);
2536         talloc_free(data.dptr);
2537         if (ret != 0 || res != 0) {
2538                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2539                 return -1;
2540         }
2541
2542         return 0;
2543 }
2544
2545 /*
2546   list tunables
2547  */
2548 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2549                             struct timeval timeout, 
2550                             uint32_t destnode,
2551                             TALLOC_CTX *mem_ctx,
2552                             const char ***list, uint32_t *count)
2553 {
2554         TDB_DATA outdata;
2555         int32_t res;
2556         int ret;
2557         struct ctdb_control_list_tunable *t;
2558         char *p, *s, *ptr;
2559
2560         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2561                            mem_ctx, &outdata, &res, &timeout, NULL);
2562         if (ret != 0 || res != 0) {
2563                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2564                 return -1;
2565         }
2566
2567         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2568         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2569             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2570                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2571                 talloc_free(outdata.dptr);
2572                 return -1;              
2573         }
2574         
2575         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2576         CTDB_NO_MEMORY(ctdb, p);
2577
2578         talloc_free(outdata.dptr);
2579         
2580         (*list) = NULL;
2581         (*count) = 0;
2582
2583         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2584                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2585                 CTDB_NO_MEMORY(ctdb, *list);
2586                 (*list)[*count] = talloc_strdup(*list, s);
2587                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2588                 (*count)++;
2589         }
2590
2591         talloc_free(p);
2592
2593         return 0;
2594 }
2595
2596
2597 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2598                                    struct timeval timeout, uint32_t destnode,
2599                                    TALLOC_CTX *mem_ctx,
2600                                    uint32_t flags,
2601                                    struct ctdb_all_public_ips **ips)
2602 {
2603         int ret;
2604         TDB_DATA outdata;
2605         int32_t res;
2606
2607         ret = ctdb_control(ctdb, destnode, 0, 
2608                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2609                            mem_ctx, &outdata, &res, &timeout, NULL);
2610         if (ret == 0 && res == -1) {
2611                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2612                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2613         }
2614         if (ret != 0 || res != 0) {
2615           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2616                 return -1;
2617         }
2618
2619         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2620         talloc_free(outdata.dptr);
2621                     
2622         return 0;
2623 }
2624
2625 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2626                              struct timeval timeout, uint32_t destnode,
2627                              TALLOC_CTX *mem_ctx,
2628                              struct ctdb_all_public_ips **ips)
2629 {
2630         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2631                                               destnode, mem_ctx,
2632                                               0, ips);
2633 }
2634
2635 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2636                         struct timeval timeout, uint32_t destnode, 
2637                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2638 {
2639         int ret, i, len;
2640         TDB_DATA outdata;
2641         int32_t res;
2642         struct ctdb_all_public_ipsv4 *ipsv4;
2643
2644         ret = ctdb_control(ctdb, destnode, 0, 
2645                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2646                            mem_ctx, &outdata, &res, &timeout, NULL);
2647         if (ret != 0 || res != 0) {
2648                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2649                 return -1;
2650         }
2651
2652         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2653         len = offsetof(struct ctdb_all_public_ips, ips) +
2654                 ipsv4->num*sizeof(struct ctdb_public_ip);
2655         *ips = talloc_zero_size(mem_ctx, len);
2656         CTDB_NO_MEMORY(ctdb, *ips);
2657         (*ips)->num = ipsv4->num;
2658         for (i=0; i<ipsv4->num; i++) {
2659                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2660                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2661         }
2662
2663         talloc_free(outdata.dptr);
2664                     
2665         return 0;
2666 }
2667
2668 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2669                                  struct timeval timeout, uint32_t destnode,
2670                                  TALLOC_CTX *mem_ctx,
2671                                  const ctdb_sock_addr *addr,
2672                                  struct ctdb_control_public_ip_info **_info)
2673 {
2674         int ret;
2675         TDB_DATA indata;
2676         TDB_DATA outdata;
2677         int32_t res;
2678         struct ctdb_control_public_ip_info *info;
2679         uint32_t len;
2680         uint32_t i;
2681
2682         indata.dptr = discard_const_p(uint8_t, addr);
2683         indata.dsize = sizeof(*addr);
2684
2685         ret = ctdb_control(ctdb, destnode, 0,
2686                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2687                            mem_ctx, &outdata, &res, &timeout, NULL);
2688         if (ret != 0 || res != 0) {
2689                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2690                                 "failed ret:%d res:%d\n",
2691                                 ret, res));
2692                 return -1;
2693         }
2694
2695         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2696         if (len > outdata.dsize) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2698                                 "returned invalid data with size %u > %u\n",
2699                                 (unsigned int)outdata.dsize,
2700                                 (unsigned int)len));
2701                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2702                 return -1;
2703         }
2704
2705         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2706         len += info->num*sizeof(struct ctdb_control_iface_info);
2707
2708         if (len > outdata.dsize) {
2709                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2710                                 "returned invalid data with size %u > %u\n",
2711                                 (unsigned int)outdata.dsize,
2712                                 (unsigned int)len));
2713                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2714                 return -1;
2715         }
2716
2717         /* make sure we null terminate the returned strings */
2718         for (i=0; i < info->num; i++) {
2719                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2720         }
2721
2722         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2723                                                                 outdata.dptr,
2724                                                                 outdata.dsize);
2725         talloc_free(outdata.dptr);
2726         if (*_info == NULL) {
2727                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2728                                 "talloc_memdup size %u failed\n",
2729                                 (unsigned int)outdata.dsize));
2730                 return -1;
2731         }
2732
2733         return 0;
2734 }
2735
2736 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2737                          struct timeval timeout, uint32_t destnode,
2738                          TALLOC_CTX *mem_ctx,
2739                          struct ctdb_control_get_ifaces **_ifaces)
2740 {
2741         int ret;
2742         TDB_DATA outdata;
2743         int32_t res;
2744         struct ctdb_control_get_ifaces *ifaces;
2745         uint32_t len;
2746         uint32_t i;
2747
2748         ret = ctdb_control(ctdb, destnode, 0,
2749                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2750                            mem_ctx, &outdata, &res, &timeout, NULL);
2751         if (ret != 0 || res != 0) {
2752                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2753                                 "failed ret:%d res:%d\n",
2754                                 ret, res));
2755                 return -1;
2756         }
2757
2758         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2759         if (len > outdata.dsize) {
2760                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2761                                 "returned invalid data with size %u > %u\n",
2762                                 (unsigned int)outdata.dsize,
2763                                 (unsigned int)len));
2764                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2765                 return -1;
2766         }
2767
2768         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2769         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2770
2771         if (len > outdata.dsize) {
2772                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2773                                 "returned invalid data with size %u > %u\n",
2774                                 (unsigned int)outdata.dsize,
2775                                 (unsigned int)len));
2776                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2777                 return -1;
2778         }
2779
2780         /* make sure we null terminate the returned strings */
2781         for (i=0; i < ifaces->num; i++) {
2782                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2783         }
2784
2785         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2786                                                                   outdata.dptr,
2787                                                                   outdata.dsize);
2788         talloc_free(outdata.dptr);
2789         if (*_ifaces == NULL) {
2790                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2791                                 "talloc_memdup size %u failed\n",
2792                                 (unsigned int)outdata.dsize));
2793                 return -1;
2794         }
2795
2796         return 0;
2797 }
2798
2799 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2800                              struct timeval timeout, uint32_t destnode,
2801                              TALLOC_CTX *mem_ctx,
2802                              const struct ctdb_control_iface_info *info)
2803 {
2804         int ret;
2805         TDB_DATA indata;
2806         int32_t res;
2807
2808         indata.dptr = discard_const_p(uint8_t, info);
2809         indata.dsize = sizeof(*info);
2810
2811         ret = ctdb_control(ctdb, destnode, 0,
2812                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2813                            mem_ctx, NULL, &res, &timeout, NULL);
2814         if (ret != 0 || res != 0) {
2815                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2816                                 "failed ret:%d res:%d\n",
2817                                 ret, res));
2818                 return -1;
2819         }
2820
2821         return 0;
2822 }
2823
2824 /*
2825   set/clear the permanent disabled bit on a remote node
2826  */
2827 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2828                        uint32_t set, uint32_t clear)
2829 {
2830         int ret;
2831         TDB_DATA data;
2832         struct ctdb_node_map *nodemap=NULL;
2833         struct ctdb_node_flag_change c;
2834         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2835         uint32_t recmaster;
2836         uint32_t *nodes;
2837
2838
2839         /* find the recovery master */
2840         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2841         if (ret != 0) {
2842                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2843                 talloc_free(tmp_ctx);
2844                 return ret;
2845         }
2846
2847
2848         /* read the node flags from the recmaster */
2849         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2850         if (ret != 0) {
2851                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2852                 talloc_free(tmp_ctx);
2853                 return -1;
2854         }
2855         if (destnode >= nodemap->num) {
2856                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2857                 talloc_free(tmp_ctx);
2858                 return -1;
2859         }
2860
2861         c.pnn       = destnode;
2862         c.old_flags = nodemap->nodes[destnode].flags;
2863         c.new_flags = c.old_flags;
2864         c.new_flags |= set;
2865         c.new_flags &= ~clear;
2866
2867         data.dsize = sizeof(c);
2868         data.dptr = (unsigned char *)&c;
2869
2870         /* send the flags update to all connected nodes */
2871         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2872
2873         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2874                                         nodes, 0,
2875                                         timeout, false, data,
2876                                         NULL, NULL,
2877                                         NULL) != 0) {
2878                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2879
2880                 talloc_free(tmp_ctx);
2881                 return -1;
2882         }
2883
2884         talloc_free(tmp_ctx);
2885         return 0;
2886 }
2887
2888
2889 /*
2890   get all tunables
2891  */
2892 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2893                                struct timeval timeout, 
2894                                uint32_t destnode,
2895                                struct ctdb_tunable *tunables)
2896 {
2897         TDB_DATA outdata;
2898         int ret;
2899         int32_t res;
2900
2901         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2902                            &outdata, &res, &timeout, NULL);
2903         if (ret != 0 || res != 0) {
2904                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2905                 return -1;
2906         }
2907
2908         if (outdata.dsize != sizeof(*tunables)) {
2909                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2910                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2911                 return -1;              
2912         }
2913
2914         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2915         talloc_free(outdata.dptr);
2916         return 0;
2917 }
2918
2919 /*
2920   add a public address to a node
2921  */
2922 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2923                       struct timeval timeout, 
2924                       uint32_t destnode,
2925                       struct ctdb_control_ip_iface *pub)
2926 {
2927         TDB_DATA data;
2928         int32_t res;
2929         int ret;
2930
2931         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2932         data.dptr  = (unsigned char *)pub;
2933
2934         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2935                            NULL, &res, &timeout, NULL);
2936         if (ret != 0 || res != 0) {
2937                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2938                 return -1;
2939         }
2940
2941         return 0;
2942 }
2943
2944 /*
2945   delete a public address from a node
2946  */
2947 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2948                       struct timeval timeout, 
2949                       uint32_t destnode,
2950                       struct ctdb_control_ip_iface *pub)
2951 {
2952         TDB_DATA data;
2953         int32_t res;
2954         int ret;
2955
2956         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2957         data.dptr  = (unsigned char *)pub;
2958
2959         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2960                            NULL, &res, &timeout, NULL);
2961         if (ret != 0 || res != 0) {
2962                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2963                 return -1;
2964         }
2965
2966         return 0;
2967 }
2968
2969 /*
2970   kill a tcp connection
2971  */
2972 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2973                       struct timeval timeout, 
2974                       uint32_t destnode,
2975                       struct ctdb_control_killtcp *killtcp)
2976 {
2977         TDB_DATA data;
2978         int32_t res;
2979         int ret;
2980
2981         data.dsize = sizeof(struct ctdb_control_killtcp);
2982         data.dptr  = (unsigned char *)killtcp;
2983
2984         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2985                            NULL, &res, &timeout, NULL);
2986         if (ret != 0 || res != 0) {
2987                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2988                 return -1;
2989         }
2990
2991         return 0;
2992 }
2993
2994 /*
2995   send a gratious arp
2996  */
2997 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2998                       struct timeval timeout, 
2999                       uint32_t destnode,
3000                       ctdb_sock_addr *addr,
3001                       const char *ifname)
3002 {
3003         TDB_DATA data;
3004         int32_t res;
3005         int ret, len;
3006         struct ctdb_control_gratious_arp *gratious_arp;
3007         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3008
3009
3010         len = strlen(ifname)+1;
3011         gratious_arp = talloc_size(tmp_ctx, 
3012                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3013         CTDB_NO_MEMORY(ctdb, gratious_arp);
3014
3015         gratious_arp->addr = *addr;
3016         gratious_arp->len = len;
3017         memcpy(&gratious_arp->iface[0], ifname, len);
3018
3019
3020         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3021         data.dptr  = (unsigned char *)gratious_arp;
3022
3023         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3024                            NULL, &res, &timeout, NULL);
3025         if (ret != 0 || res != 0) {
3026                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3027                 talloc_free(tmp_ctx);
3028                 return -1;
3029         }
3030
3031         talloc_free(tmp_ctx);
3032         return 0;
3033 }
3034
3035 /*
3036   get a list of all tcp tickles that a node knows about for a particular vnn
3037  */
3038 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3039                               struct timeval timeout, uint32_t destnode, 
3040                               TALLOC_CTX *mem_ctx, 
3041                               ctdb_sock_addr *addr,
3042                               struct ctdb_control_tcp_tickle_list **list)
3043 {
3044         int ret;
3045         TDB_DATA data, outdata;
3046         int32_t status;
3047
3048         data.dptr = (uint8_t*)addr;
3049         data.dsize = sizeof(ctdb_sock_addr);
3050
3051         ret = ctdb_control(ctdb, destnode, 0, 
3052                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3053                            mem_ctx, &outdata, &status, NULL, NULL);
3054         if (ret != 0 || status != 0) {
3055                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3056                 return -1;
3057         }
3058
3059         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3060
3061         return status;
3062 }
3063
3064 /*
3065   register a server id
3066  */
3067 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3068                       struct timeval timeout, 
3069                       struct ctdb_server_id *id)
3070 {
3071         TDB_DATA data;
3072         int32_t res;
3073         int ret;
3074
3075         data.dsize = sizeof(struct ctdb_server_id);
3076         data.dptr  = (unsigned char *)id;
3077
3078         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3079                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3080                         0, data, NULL,
3081                         NULL, &res, &timeout, NULL);
3082         if (ret != 0 || res != 0) {
3083                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3084                 return -1;
3085         }
3086
3087         return 0;
3088 }
3089
3090 /*
3091   unregister a server id
3092  */
3093 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3094                       struct timeval timeout, 
3095                       struct ctdb_server_id *id)
3096 {
3097         TDB_DATA data;
3098         int32_t res;
3099         int ret;
3100
3101         data.dsize = sizeof(struct ctdb_server_id);
3102         data.dptr  = (unsigned char *)id;
3103
3104         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3105                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3106                         0, data, NULL,
3107                         NULL, &res, &timeout, NULL);
3108         if (ret != 0 || res != 0) {
3109                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3110                 return -1;
3111         }
3112
3113         return 0;
3114 }
3115
3116
3117 /*
3118   check if a server id exists
3119
3120   if a server id does exist, return *status == 1, otherwise *status == 0
3121  */
3122 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3123                       struct timeval timeout, 
3124                       uint32_t destnode,
3125                       struct ctdb_server_id *id,
3126                       uint32_t *status)
3127 {
3128         TDB_DATA data;
3129         int32_t res;
3130         int ret;
3131
3132         data.dsize = sizeof(struct ctdb_server_id);
3133         data.dptr  = (unsigned char *)id;
3134
3135         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3136                         0, data, NULL,
3137                         NULL, &res, &timeout, NULL);
3138         if (ret != 0) {
3139                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3140                 return -1;
3141         }
3142
3143         if (res) {
3144                 *status = 1;
3145         } else {
3146                 *status = 0;
3147         }
3148
3149         return 0;
3150 }
3151
3152 /*
3153    get the list of server ids that are registered on a node
3154 */
3155 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3156                 TALLOC_CTX *mem_ctx,
3157                 struct timeval timeout, uint32_t destnode, 
3158                 struct ctdb_server_id_list **svid_list)
3159 {
3160         int ret;
3161         TDB_DATA outdata;
3162         int32_t res;
3163
3164         ret = ctdb_control(ctdb, destnode, 0, 
3165                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3166                            mem_ctx, &outdata, &res, &timeout, NULL);
3167         if (ret != 0 || res != 0) {
3168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3169                 return -1;
3170         }
3171
3172         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3173                     
3174         return 0;
3175 }
3176
3177 /*
3178   initialise the ctdb daemon for client applications
3179
3180   NOTE: In current code the daemon does not fork. This is for testing purposes only
3181   and to simplify the code.
3182 */
3183 struct ctdb_context *ctdb_init(struct event_context *ev)
3184 {
3185         int ret;
3186         struct ctdb_context *ctdb;
3187
3188         ctdb = talloc_zero(ev, struct ctdb_context);
3189         if (ctdb == NULL) {
3190                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3191                 return NULL;
3192         }
3193         ctdb->ev  = ev;
3194         ctdb->idr = idr_init(ctdb);
3195         /* Wrap early to exercise code. */
3196         ctdb->lastid = INT_MAX-200;
3197         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3198
3199         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3200         if (ret != 0) {
3201                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3202                 talloc_free(ctdb);
3203                 return NULL;
3204         }
3205
3206         ctdb->statistics.statistics_start_time = timeval_current();
3207
3208         return ctdb;
3209 }
3210
3211
3212 /*
3213   set some ctdb flags
3214 */
3215 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3216 {
3217         ctdb->flags |= flags;
3218 }
3219
3220 /*
3221   setup the local socket name
3222 */
3223 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3224 {
3225         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3226         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3227
3228         return 0;
3229 }
3230
3231 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3232 {
3233         return ctdb->daemon.name;
3234 }
3235
3236 /*
3237   return the pnn of this node
3238 */
3239 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3240 {
3241         return ctdb->pnn;
3242 }
3243
3244
3245 /*
3246   get the uptime of a remote node
3247  */
3248 struct ctdb_client_control_state *
3249 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3250 {
3251         return ctdb_control_send(ctdb, destnode, 0, 
3252                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3253                            mem_ctx, &timeout, NULL);
3254 }
3255
3256 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3257 {
3258         int ret;
3259         int32_t res;
3260         TDB_DATA outdata;
3261
3262         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3263         if (ret != 0 || res != 0) {
3264                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3265                 return -1;
3266         }
3267
3268         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3269
3270         return 0;
3271 }
3272
3273 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3274 {
3275         struct ctdb_client_control_state *state;
3276
3277         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3278         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3279 }
3280
3281 /*
3282   send a control to execute the "recovered" event script on a node
3283  */
3284 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3285 {
3286         int ret;
3287         int32_t status;
3288
3289         ret = ctdb_control(ctdb, destnode, 0, 
3290                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3291                            NULL, NULL, &status, &timeout, NULL);
3292         if (ret != 0 || status != 0) {
3293                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3294                 return -1;
3295         }
3296
3297         return 0;
3298 }
3299
3300 /* 
3301   callback for the async helpers used when sending the same control
3302   to multiple nodes in parallell.
3303 */
3304 static void async_callback(struct ctdb_client_control_state *state)
3305 {
3306         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3307         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3308         int ret;
3309         TDB_DATA outdata;
3310         int32_t res;
3311         uint32_t destnode = state->c->hdr.destnode;
3312
3313         /* one more node has responded with recmode data */
3314         data->count--;
3315
3316         /* if we failed to push the db, then return an error and let
3317            the main loop try again.
3318         */
3319         if (state->state != CTDB_CONTROL_DONE) {
3320                 if ( !data->dont_log_errors) {
3321                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3322                 }
3323                 data->fail_count++;
3324                 if (data->fail_callback) {
3325                         data->fail_callback(ctdb, destnode, res, outdata,
3326                                         data->callback_data);
3327                 }
3328                 return;
3329         }
3330         
3331         state->async.fn = NULL;
3332
3333         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3334         if ((ret != 0) || (res != 0)) {
3335                 if ( !data->dont_log_errors) {
3336                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3337                 }
3338                 data->fail_count++;
3339                 if (data->fail_callback) {
3340                         data->fail_callback(ctdb, destnode, res, outdata,
3341                                         data->callback_data);
3342                 }
3343         }
3344         if ((ret == 0) && (data->callback != NULL)) {
3345                 data->callback(ctdb, destnode, res, outdata,
3346                                         data->callback_data);
3347         }
3348 }
3349
3350
3351 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3352 {
3353         /* set up the callback functions */
3354         state->async.fn = async_callback;
3355         state->async.private_data = data;
3356         
3357         /* one more control to wait for to complete */
3358         data->count++;
3359 }
3360
3361
3362 /* wait for up to the maximum number of seconds allowed
3363    or until all nodes we expect a response from has replied
3364 */
3365 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3366 {
3367         while (data->count > 0) {
3368                 event_loop_once(ctdb->ev);
3369         }
3370         if (data->fail_count != 0) {
3371                 if (!data->dont_log_errors) {
3372                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3373                                  data->fail_count));
3374                 }
3375                 return -1;
3376         }
3377         return 0;
3378 }
3379
3380
3381 /* 
3382    perform a simple control on the listed nodes
3383    The control cannot return data
3384  */
3385 int ctdb_client_async_control(struct ctdb_context *ctdb,
3386                                 enum ctdb_controls opcode,
3387                                 uint32_t *nodes,
3388                                 uint64_t srvid,
3389                                 struct timeval timeout,
3390                                 bool dont_log_errors,
3391                                 TDB_DATA data,
3392                                 client_async_callback client_callback,
3393                                 client_async_callback fail_callback,
3394                                 void *callback_data)
3395 {
3396         struct client_async_data *async_data;
3397         struct ctdb_client_control_state *state;
3398         int j, num_nodes;
3399
3400         async_data = talloc_zero(ctdb, struct client_async_data);
3401         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3402         async_data->dont_log_errors = dont_log_errors;
3403         async_data->callback = client_callback;
3404         async_data->fail_callback = fail_callback;
3405         async_data->callback_data = callback_data;
3406         async_data->opcode        = opcode;
3407
3408         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3409
3410         /* loop over all nodes and send an async control to each of them */
3411         for (j=0; j<num_nodes; j++) {
3412                 uint32_t pnn = nodes[j];
3413
3414                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3415                                           0, data, async_data, &timeout, NULL);
3416                 if (state == NULL) {
3417                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3418                         talloc_free(async_data);
3419                         return -1;
3420                 }
3421                 
3422                 ctdb_client_async_add(async_data, state);
3423         }
3424
3425         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3426                 talloc_free(async_data);
3427                 return -1;
3428         }
3429
3430         talloc_free(async_data);
3431         return 0;
3432 }
3433
3434 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3435                                 struct ctdb_vnn_map *vnn_map,
3436                                 TALLOC_CTX *mem_ctx,
3437                                 bool include_self)
3438 {
3439         int i, j, num_nodes;
3440         uint32_t *nodes;
3441
3442         for (i=num_nodes=0;i<vnn_map->size;i++) {
3443                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3444                         continue;
3445                 }
3446                 num_nodes++;
3447         } 
3448
3449         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3450         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3451
3452         for (i=j=0;i<vnn_map->size;i++) {
3453                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3454                         continue;
3455                 }
3456                 nodes[j++] = vnn_map->map[i];
3457         } 
3458
3459         return nodes;
3460 }
3461
3462 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3463                                 struct ctdb_node_map *node_map,
3464                                 TALLOC_CTX *mem_ctx,
3465                                 bool include_self)
3466 {
3467         int i, j, num_nodes;
3468         uint32_t *nodes;
3469
3470         for (i=num_nodes=0;i<node_map->num;i++) {
3471                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3472                         continue;
3473                 }
3474                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3475                         continue;
3476                 }
3477                 num_nodes++;
3478         } 
3479
3480         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3481         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3482
3483         for (i=j=0;i<node_map->num;i++) {
3484                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3485                         continue;
3486                 }
3487                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3488                         continue;
3489                 }
3490                 nodes[j++] = node_map->nodes[i].pnn;
3491         } 
3492
3493         return nodes;
3494 }
3495
3496 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3497                                 struct ctdb_node_map *node_map,
3498                                 TALLOC_CTX *mem_ctx,
3499                                 uint32_t pnn)
3500 {
3501         int i, j, num_nodes;
3502         uint32_t *nodes;
3503
3504         for (i=num_nodes=0;i<node_map->num;i++) {
3505                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3506                         continue;
3507                 }
3508                 if (node_map->nodes[i].pnn == pnn) {
3509                         continue;
3510                 }
3511                 num_nodes++;
3512         } 
3513
3514         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3515         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3516
3517         for (i=j=0;i<node_map->num;i++) {
3518                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3519                         continue;
3520                 }
3521                 if (node_map->nodes[i].pnn == pnn) {
3522                         continue;
3523                 }
3524                 nodes[j++] = node_map->nodes[i].pnn;
3525         } 
3526
3527         return nodes;
3528 }
3529
3530 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3531                                 struct ctdb_node_map *node_map,
3532                                 TALLOC_CTX *mem_ctx,
3533                                 bool include_self)
3534 {
3535         int i, j, num_nodes;
3536         uint32_t *nodes;
3537
3538         for (i=num_nodes=0;i<node_map->num;i++) {
3539                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3540                         continue;
3541                 }
3542                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3543                         continue;
3544                 }
3545                 num_nodes++;
3546         } 
3547
3548         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3549         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3550
3551         for (i=j=0;i<node_map->num;i++) {
3552                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3553                         continue;
3554                 }
3555                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3556                         continue;
3557                 }
3558                 nodes[j++] = node_map->nodes[i].pnn;
3559         } 
3560
3561         return nodes;
3562 }
3563
3564 /* 
3565   this is used to test if a pnn lock exists and if it exists will return
3566   the number of connections that pnn has reported or -1 if that recovery
3567   daemon is not running.
3568 */
3569 int
3570 ctdb_read_pnn_lock(int fd, int32_t pnn)
3571 {
3572         struct flock lock;
3573         char c;
3574
3575         lock.l_type = F_WRLCK;
3576         lock.l_whence = SEEK_SET;
3577         lock.l_start = pnn;
3578         lock.l_len = 1;
3579         lock.l_pid = 0;
3580
3581         if (fcntl(fd, F_GETLK, &lock) != 0) {
3582                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3583                 return -1;
3584         }
3585
3586         if (lock.l_type == F_UNLCK) {
3587                 return -1;
3588         }
3589
3590         if (pread(fd, &c, 1, pnn) == -1) {
3591                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3592                 return -1;
3593         }
3594
3595         return c;
3596 }
3597
3598 /*
3599   get capabilities of a remote node
3600  */
3601 struct ctdb_client_control_state *
3602 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3603 {
3604         return ctdb_control_send(ctdb, destnode, 0, 
3605                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3606                            mem_ctx, &timeout, NULL);
3607 }
3608
3609 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3610 {
3611         int ret;
3612         int32_t res;
3613         TDB_DATA outdata;
3614
3615         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3616         if ( (ret != 0) || (res != 0) ) {
3617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3618                 return -1;
3619         }
3620
3621         if (capabilities) {
3622                 *capabilities = *((uint32_t *)outdata.dptr);
3623         }
3624
3625         return 0;
3626 }
3627
3628 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3629 {
3630         struct ctdb_client_control_state *state;
3631         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3632         int ret;
3633
3634         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3635         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3636         talloc_free(tmp_ctx);
3637         return ret;
3638 }
3639
3640 /**
3641  * check whether a transaction is active on a given db on a given node
3642  */
3643 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3644                                      uint32_t destnode,
3645                                      uint32_t db_id)
3646 {
3647         int32_t status;
3648         int ret;
3649         TDB_DATA indata;
3650
3651         indata.dptr = (uint8_t *)&db_id;
3652         indata.dsize = sizeof(db_id);
3653
3654         ret = ctdb_control(ctdb, destnode, 0,
3655                            CTDB_CONTROL_TRANS2_ACTIVE,
3656                            0, indata, NULL, NULL, &status,
3657                            NULL, NULL);
3658
3659         if (ret != 0) {
3660                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3661                 return -1;
3662         }
3663
3664         return status;
3665 }
3666
3667
3668 struct ctdb_transaction_handle {
3669         struct ctdb_db_context *ctdb_db;
3670         bool in_replay;
3671         /*
3672          * we store the reads and writes done under a transaction:
3673          * - one list stores both reads and writes (m_all),
3674          * - the other just writes (m_write)
3675          */
3676         struct ctdb_marshall_buffer *m_all;
3677         struct ctdb_marshall_buffer *m_write;
3678 };
3679
3680 /* start a transaction on a database */
3681 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3682 {
3683         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3684         return 0;
3685 }
3686
3687 /* start a transaction on a database */
3688 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3689 {
3690         struct ctdb_record_handle *rh;
3691         TDB_DATA key;
3692         TDB_DATA data;
3693         struct ctdb_ltdb_header header;
3694         TALLOC_CTX *tmp_ctx;
3695         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3696         int ret;
3697         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3698         pid_t pid;
3699         int32_t status;
3700
3701         key.dptr = discard_const(keyname);
3702         key.dsize = strlen(keyname);
3703
3704         if (!ctdb_db->persistent) {
3705                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3706                 return -1;
3707         }
3708
3709 again:
3710         tmp_ctx = talloc_new(h);
3711
3712         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3713         if (rh == NULL) {
3714                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3715                 talloc_free(tmp_ctx);
3716                 return -1;
3717         }
3718
3719         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3720                                               CTDB_CURRENT_NODE,
3721                                               ctdb_db->db_id);
3722         if (status == 1) {
3723                 unsigned long int usec = (1000 + random()) % 100000;
3724                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3725                                     "on db_id[0x%08x]. waiting for %lu "
3726                                     "microseconds\n",
3727                                     ctdb_db->db_id, usec));
3728                 talloc_free(tmp_ctx);
3729                 usleep(usec);
3730                 goto again;
3731         }
3732
3733         /*
3734          * store the pid in the database:
3735          * it is not enough that the node is dmaster...
3736          */
3737         pid = getpid();
3738         data.dptr = (unsigned char *)&pid;
3739         data.dsize = sizeof(pid_t);
3740         rh->header.rsn++;
3741         rh->header.dmaster = ctdb_db->ctdb->pnn;
3742         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3743         if (ret != 0) {
3744                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3745                                   "transaction record\n"));
3746                 talloc_free(tmp_ctx);
3747                 return -1;
3748         }
3749
3750         talloc_free(rh);
3751
3752         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3753         if (ret != 0) {
3754                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3755                 talloc_free(tmp_ctx);
3756                 return -1;
3757         }
3758
3759         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3760         if (ret != 0) {
3761                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3762                                  "lock record inside transaction\n"));
3763                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3764                 talloc_free(tmp_ctx);
3765                 goto again;
3766         }
3767
3768         if (header.dmaster != ctdb_db->ctdb->pnn) {
3769                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3770                                    "transaction lock record\n"));
3771                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3772                 talloc_free(tmp_ctx);
3773                 goto again;
3774         }
3775
3776         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3777                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3778                                     "the transaction lock record\n"));
3779                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3780                 talloc_free(tmp_ctx);
3781                 goto again;
3782         }
3783
3784         talloc_free(tmp_ctx);
3785
3786         return 0;
3787 }
3788
3789
3790 /* start a transaction on a database */
3791 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3792                                                        TALLOC_CTX *mem_ctx)
3793 {
3794         struct ctdb_transaction_handle *h;
3795         int ret;
3796
3797         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3798         if (h == NULL) {
3799                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3800                 return NULL;
3801         }
3802
3803         h->ctdb_db = ctdb_db;
3804
3805         ret = ctdb_transaction_fetch_start(h);
3806         if (ret != 0) {
3807                 talloc_free(h);
3808                 return NULL;
3809         }
3810
3811         talloc_set_destructor(h, ctdb_transaction_destructor);
3812
3813         return h;
3814 }
3815
3816
3817
3818 /*
3819   fetch a record inside a transaction
3820  */
3821 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3822                            TALLOC_CTX *mem_ctx, 
3823                            TDB_DATA key, TDB_DATA *data)
3824 {
3825         struct ctdb_ltdb_header header;
3826         int ret;
3827
3828         ZERO_STRUCT(header);
3829
3830         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3831         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3832                 /* record doesn't exist yet */
3833                 *data = tdb_null;
3834                 ret = 0;
3835         }
3836         
3837         if (ret != 0) {
3838                 return ret;
3839         }
3840
3841         if (!h->in_replay) {
3842                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3843                 if (h->m_all == NULL) {
3844                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3845                         return -1;
3846                 }
3847         }
3848
3849         return 0;
3850 }
3851
3852 /*
3853   stores a record inside a transaction
3854  */
3855 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3856                            TDB_DATA key, TDB_DATA data)
3857 {
3858         TALLOC_CTX *tmp_ctx = talloc_new(h);
3859         struct ctdb_ltdb_header header;
3860         TDB_DATA olddata;
3861         int ret;
3862
3863         ZERO_STRUCT(header);
3864
3865         /* we need the header so we can update the RSN */
3866         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3867         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3868                 /* the record doesn't exist - create one with us as dmaster.
3869                    This is only safe because we are in a transaction and this
3870                    is a persistent database */
3871                 ZERO_STRUCT(header);
3872         } else if (ret != 0) {
3873                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3874                 talloc_free(tmp_ctx);
3875                 return ret;
3876         }
3877
3878         if (data.dsize == olddata.dsize &&
3879             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3880                 /* save writing the same data */
3881                 talloc_free(tmp_ctx);
3882                 return 0;
3883         }
3884
3885         header.dmaster = h->ctdb_db->ctdb->pnn;
3886         header.rsn++;
3887
3888         if (!h->in_replay) {
3889                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3890                 if (h->m_all == NULL) {
3891                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3892                         talloc_free(tmp_ctx);
3893                         return -1;
3894                 }
3895         }               
3896
3897         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3898         if (h->m_write == NULL) {
3899                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3900                 talloc_free(tmp_ctx);
3901                 return -1;
3902         }
3903         
3904         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3905
3906         talloc_free(tmp_ctx);
3907         
3908         return ret;
3909 }
3910
3911 /*
3912   replay a transaction
3913  */
3914 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3915 {
3916         int ret, i;
3917         struct ctdb_rec_data *rec = NULL;
3918
3919         h->in_replay = true;
3920         talloc_free(h->m_write);
3921         h->m_write = NULL;
3922
3923         ret = ctdb_transaction_fetch_start(h);
3924         if (ret != 0) {
3925                 return ret;
3926         }
3927
3928         for (i=0;i<h->m_all->count;i++) {
3929                 TDB_DATA key, data;
3930
3931                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3932                 if (rec == NULL) {
3933                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3934                         goto failed;
3935                 }
3936
3937                 if (rec->reqid == 0) {
3938                         /* its a store */
3939                         if (ctdb_transaction_store(h, key, data) != 0) {
3940                                 goto failed;
3941                         }
3942                 } else {
3943                         TDB_DATA data2;
3944                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3945
3946                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3947                                 talloc_free(tmp_ctx);
3948                                 goto failed;
3949                         }
3950                         if (data2.dsize != data.dsize ||
3951                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3952                                 /* the record has changed on us - we have to give up */
3953                                 talloc_free(tmp_ctx);
3954                                 goto failed;
3955                         }
3956                         talloc_free(tmp_ctx);
3957                 }
3958         }
3959         
3960         return 0;
3961
3962 failed:
3963         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3964         return -1;
3965 }
3966
3967
3968 /*
3969   commit a transaction
3970  */
3971 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3972 {
3973         int ret, retries=0;
3974         int32_t status;
3975         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3976         struct timeval timeout;
3977         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3978
3979         talloc_set_destructor(h, NULL);
3980
3981         /* our commit strategy is quite complex.
3982
3983            - we first try to commit the changes to all other nodes
3984
3985            - if that works, then we commit locally and we are done
3986
3987            - if a commit on another node fails, then we need to cancel
3988              the transaction, then restart the transaction (thus
3989              opening a window of time for a pending recovery to
3990              complete), then replay the transaction, checking all the
3991              reads and writes (checking that reads give the same data,
3992              and writes succeed). Then we retry the transaction to the
3993              other nodes
3994         */
3995
3996 again:
3997         if (h->m_write == NULL) {
3998                 /* no changes were made */
3999                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4000                 talloc_free(h);
4001                 return 0;
4002         }
4003
4004         /* tell ctdbd to commit to the other nodes */
4005         timeout = timeval_current_ofs(1, 0);
4006         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4007                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
4008                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
4009                            &timeout, NULL);
4010         if (ret != 0 || status != 0) {
4011                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4012                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
4013                                      ", retrying after 1 second...\n",
4014                                      (retries==0)?"":"retry "));
4015                 sleep(1);
4016
4017                 if (ret != 0) {
4018                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4019                 } else {
4020                         /* work out what error code we will give if we 
4021                            have to fail the operation */
4022                         switch ((enum ctdb_trans2_commit_error)status) {
4023                         case CTDB_TRANS2_COMMIT_SUCCESS:
4024                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4025                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4026                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4027                                 break;
4028                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4029                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4030                                 break;
4031                         }
4032                 }
4033
4034                 if (++retries == 100) {
4035                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4036                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4037                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4038                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4039                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4040                         talloc_free(h);
4041                         return -1;
4042                 }               
4043
4044                 if (ctdb_replay_transaction(h) != 0) {
4045                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4046                                           "transaction on db 0x%08x, "
4047                                           "failure control =%u\n",
4048                                           h->ctdb_db->db_id,
4049                                           (unsigned)failure_control));
4050                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4051                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4052                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4053                         talloc_free(h);
4054                         return -1;
4055                 }
4056                 goto again;
4057         } else {
4058                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4059         }
4060
4061         /* do the real commit locally */
4062         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4063         if (ret != 0) {
4064                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4065                                   "on db id 0x%08x locally, "
4066                                   "failure_control=%u\n",
4067                                   h->ctdb_db->db_id,
4068                                   (unsigned)failure_control));
4069                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4070                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4071                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4072                 talloc_free(h);
4073                 return ret;
4074         }
4075
4076         /* tell ctdbd that we are finished with our local commit */
4077         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4078                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4079                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4080         talloc_free(h);
4081         return 0;
4082 }
4083
4084 /*
4085   recovery daemon ping to main daemon
4086  */
4087 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4088 {
4089         int ret;
4090         int32_t res;
4091
4092         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4093                            ctdb, NULL, &res, NULL, NULL);
4094         if (ret != 0 || res != 0) {
4095                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4096                 return -1;
4097         }
4098
4099         return 0;
4100 }
4101
4102 /* when forking the main daemon and the child process needs to connect back
4103  * to the daemon as a client process, this function can be used to change
4104  * the ctdb context from daemon into client mode
4105  */
4106 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4107 {
4108         int ret;
4109         va_list ap;
4110
4111         /* Add extra information so we can identify this in the logs */
4112         va_start(ap, fmt);
4113         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4114         va_end(ap);
4115
4116         /* shutdown the transport */
4117         if (ctdb->methods) {
4118                 ctdb->methods->shutdown(ctdb);
4119         }
4120
4121         /* get a new event context */
4122         talloc_free(ctdb->ev);
4123         ctdb->ev = event_context_init(ctdb);
4124         tevent_loop_allow_nesting(ctdb->ev);
4125
4126         close(ctdb->daemon.sd);
4127         ctdb->daemon.sd = -1;
4128
4129         /* the client does not need to be realtime */
4130         if (ctdb->do_setsched) {
4131                 ctdb_restore_scheduler(ctdb);
4132         }
4133
4134         /* initialise ctdb */
4135         ret = ctdb_socket_connect(ctdb);
4136         if (ret != 0) {
4137                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4138                 return -1;
4139         }
4140
4141          return 0;
4142 }
4143
4144 /*
4145   get the status of running the monitor eventscripts: NULL means never run.
4146  */
4147 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4148                 struct timeval timeout, uint32_t destnode, 
4149                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4150                 struct ctdb_scripts_wire **script_status)
4151 {
4152         int ret;
4153         TDB_DATA outdata, indata;
4154         int32_t res;
4155         uint32_t uinttype = type;
4156
4157         indata.dptr = (uint8_t *)&uinttype;
4158         indata.dsize = sizeof(uinttype);
4159
4160         ret = ctdb_control(ctdb, destnode, 0, 
4161                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4162                            mem_ctx, &outdata, &res, &timeout, NULL);
4163         if (ret != 0 || res != 0) {
4164                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4165                 return -1;
4166         }
4167
4168         if (outdata.dsize == 0) {
4169                 *script_status = NULL;
4170         } else {
4171                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4172                 talloc_free(outdata.dptr);
4173         }
4174                     
4175         return 0;
4176 }
4177
4178 /*
4179   tell the main daemon how long it took to lock the reclock file
4180  */
4181 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4182 {
4183         int ret;
4184         int32_t res;
4185         TDB_DATA data;
4186
4187         data.dptr = (uint8_t *)&latency;
4188         data.dsize = sizeof(latency);
4189
4190         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4191                            ctdb, NULL, &res, NULL, NULL);
4192         if (ret != 0 || res != 0) {
4193                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4194                 return -1;
4195         }
4196
4197         return 0;
4198 }
4199
4200 /*
4201   get the name of the reclock file
4202  */
4203 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4204                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4205                          const char **name)
4206 {
4207         int ret;
4208         int32_t res;
4209         TDB_DATA data;
4210
4211         ret = ctdb_control(ctdb, destnode, 0, 
4212                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4213                            mem_ctx, &data, &res, &timeout, NULL);
4214         if (ret != 0 || res != 0) {
4215                 return -1;
4216         }
4217
4218         if (data.dsize == 0) {
4219                 *name = NULL;
4220         } else {
4221                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4222         }
4223         talloc_free(data.dptr);
4224
4225         return 0;
4226 }
4227
4228 /*
4229   set the reclock filename for a node
4230  */
4231 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4232 {
4233         int ret;
4234         TDB_DATA data;
4235         int32_t res;
4236
4237         if (reclock == NULL) {
4238                 data.dsize = 0;
4239                 data.dptr  = NULL;
4240         } else {
4241                 data.dsize = strlen(reclock) + 1;
4242                 data.dptr  = discard_const(reclock);
4243         }
4244
4245         ret = ctdb_control(ctdb, destnode, 0, 
4246                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4247                            NULL, NULL, &res, &timeout, NULL);
4248         if (ret != 0 || res != 0) {
4249                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4250                 return -1;
4251         }
4252
4253         return 0;
4254 }
4255
4256 /*
4257   stop a node
4258  */
4259 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4260 {
4261         int ret;
4262         int32_t res;
4263
4264         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4265                            ctdb, NULL, &res, &timeout, NULL);
4266         if (ret != 0 || res != 0) {
4267                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4268                 return -1;
4269         }
4270
4271         return 0;
4272 }
4273
4274 /*
4275   continue a node
4276  */
4277 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4278 {
4279         int ret;
4280
4281         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4282                            ctdb, NULL, NULL, &timeout, NULL);
4283         if (ret != 0) {
4284                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4285                 return -1;
4286         }
4287
4288         return 0;
4289 }
4290
4291 /*
4292   set the natgw state for a node
4293  */
4294 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4295 {
4296         int ret;
4297         TDB_DATA data;
4298         int32_t res;
4299
4300         data.dsize = sizeof(natgwstate);
4301         data.dptr  = (uint8_t *)&natgwstate;
4302
4303         ret = ctdb_control(ctdb, destnode, 0, 
4304                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4305                            NULL, NULL, &res, &timeout, NULL);
4306         if (ret != 0 || res != 0) {
4307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4308                 return -1;
4309         }
4310
4311         return 0;
4312 }
4313
4314 /*
4315   set the lmaster role for a node
4316  */
4317 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4318 {
4319         int ret;
4320         TDB_DATA data;
4321         int32_t res;
4322
4323         data.dsize = sizeof(lmasterrole);
4324         data.dptr  = (uint8_t *)&lmasterrole;
4325
4326         ret = ctdb_control(ctdb, destnode, 0, 
4327                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4328                            NULL, NULL, &res, &timeout, NULL);
4329         if (ret != 0 || res != 0) {
4330                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4331                 return -1;
4332         }
4333
4334         return 0;
4335 }
4336
4337 /*
4338   set the recmaster role for a node
4339  */
4340 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4341 {
4342         int ret;
4343         TDB_DATA data;
4344         int32_t res;
4345
4346         data.dsize = sizeof(recmasterrole);
4347         data.dptr  = (uint8_t *)&recmasterrole;
4348
4349         ret = ctdb_control(ctdb, destnode, 0, 
4350                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4351                            NULL, NULL, &res, &timeout, NULL);
4352         if (ret != 0 || res != 0) {
4353                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4354                 return -1;
4355         }
4356
4357         return 0;
4358 }
4359
4360 /* enable an eventscript
4361  */
4362 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4363 {
4364         int ret;
4365         TDB_DATA data;
4366         int32_t res;
4367
4368         data.dsize = strlen(script) + 1;
4369         data.dptr  = discard_const(script);
4370
4371         ret = ctdb_control(ctdb, destnode, 0, 
4372                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4373                            NULL, NULL, &res, &timeout, NULL);
4374         if (ret != 0 || res != 0) {
4375                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4376                 return -1;
4377         }
4378
4379         return 0;
4380 }
4381
4382 /* disable an eventscript
4383  */
4384 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4385 {
4386         int ret;
4387         TDB_DATA data;
4388         int32_t res;
4389
4390         data.dsize = strlen(script) + 1;
4391         data.dptr  = discard_const(script);
4392
4393         ret = ctdb_control(ctdb, destnode, 0, 
4394                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4395                            NULL, NULL, &res, &timeout, NULL);
4396         if (ret != 0 || res != 0) {
4397                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4398                 return -1;
4399         }
4400
4401         return 0;
4402 }
4403
4404
4405 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4406 {
4407         int ret;
4408         TDB_DATA data;
4409         int32_t res;
4410
4411         data.dsize = sizeof(*bantime);
4412         data.dptr  = (uint8_t *)bantime;
4413
4414         ret = ctdb_control(ctdb, destnode, 0, 
4415                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4416                            NULL, NULL, &res, &timeout, NULL);
4417         if (ret != 0 || res != 0) {
4418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4419                 return -1;
4420         }
4421
4422         return 0;
4423 }
4424
4425
4426 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4427 {
4428         int ret;
4429         TDB_DATA outdata;
4430         int32_t res;
4431         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4432
4433         ret = ctdb_control(ctdb, destnode, 0, 
4434                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4435                            tmp_ctx, &outdata, &res, &timeout, NULL);
4436         if (ret != 0 || res != 0) {
4437                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4438                 talloc_free(tmp_ctx);
4439                 return -1;
4440         }
4441
4442         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4443         talloc_free(tmp_ctx);
4444
4445         return 0;
4446 }
4447
4448
4449 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4450 {
4451         int ret;
4452         int32_t res;
4453         TDB_DATA data;
4454         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4455
4456         data.dptr = (uint8_t*)db_prio;
4457         data.dsize = sizeof(*db_prio);
4458
4459         ret = ctdb_control(ctdb, destnode, 0, 
4460                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4461                            tmp_ctx, NULL, &res, &timeout, NULL);
4462         if (ret != 0 || res != 0) {
4463                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4464                 talloc_free(tmp_ctx);
4465                 return -1;
4466         }
4467
4468         talloc_free(tmp_ctx);
4469
4470         return 0;
4471 }
4472
4473 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4474 {
4475         int ret;
4476         int32_t res;
4477         TDB_DATA data;
4478         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4479
4480         data.dptr = (uint8_t*)&db_id;
4481         data.dsize = sizeof(db_id);
4482
4483         ret = ctdb_control(ctdb, destnode, 0, 
4484                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4485                            tmp_ctx, NULL, &res, &timeout, NULL);
4486         if (ret != 0 || res < 0) {
4487                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4488                 talloc_free(tmp_ctx);
4489                 return -1;
4490         }
4491
4492         if (priority) {
4493                 *priority = res;
4494         }
4495
4496         talloc_free(tmp_ctx);
4497
4498         return 0;
4499 }
4500
4501 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4502 {
4503         int ret;
4504         TDB_DATA outdata;
4505         int32_t res;
4506
4507         ret = ctdb_control(ctdb, destnode, 0, 
4508                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4509                            mem_ctx, &outdata, &res, &timeout, NULL);
4510         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4511                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4512                 return -1;
4513         }
4514
4515         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4516         talloc_free(outdata.dptr);
4517                     
4518         return 0;
4519 }
4520
4521 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4522 {
4523         if (h == NULL) {
4524                 return NULL;
4525         }
4526
4527         return &h->header;
4528 }
4529
4530
4531 struct ctdb_client_control_state *
4532 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4533 {
4534         struct ctdb_client_control_state *handle;
4535         struct ctdb_marshall_buffer *m;
4536         struct ctdb_rec_data *rec;
4537         TDB_DATA outdata;
4538
4539         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4540         if (m == NULL) {
4541                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4542                 return NULL;
4543         }
4544
4545         m->db_id = ctdb_db->db_id;
4546
4547         rec = ctdb_marshall_record(m, 0, key, header, data);
4548         if (rec == NULL) {
4549                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4550                 talloc_free(m);
4551                 return NULL;
4552         }
4553         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4554         if (m == NULL) {
4555                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4556                 talloc_free(m);
4557                 return NULL;
4558         }
4559         m->count++;
4560         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4561
4562
4563         outdata.dptr = (uint8_t *)m;
4564         outdata.dsize = talloc_get_size(m);
4565
4566         handle = ctdb_control_send(ctdb, destnode, 0, 
4567                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4568                            mem_ctx, &timeout, NULL);
4569         talloc_free(m);
4570         return handle;
4571 }
4572
4573 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4574 {
4575         int ret;
4576         int32_t res;
4577
4578         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4579         if ( (ret != 0) || (res != 0) ){
4580                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4581                 return -1;
4582         }
4583
4584         return 0;
4585 }
4586
4587 int
4588 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4589 {
4590         struct ctdb_client_control_state *state;
4591
4592         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4593         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4594 }
4595