traverse: add a flag to enable transferring empty records in cluster wide traverse
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, bool updatetdb)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data && updatetdb) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return call->status;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
391         if (ret != 0) {
392                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
393         }
394
395         return state;
396 }
397
398 /*
399   make a ctdb call to the local daemon - async send. Called from client context.
400
401   This constructs a ctdb_call request and queues it for processing. 
402   This call never blocks.
403 */
404 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
405                                               struct ctdb_call *call)
406 {
407         struct ctdb_client_call_state *state;
408         struct ctdb_context *ctdb = ctdb_db->ctdb;
409         struct ctdb_ltdb_header header;
410         TDB_DATA data;
411         int ret;
412         size_t len;
413         struct ctdb_req_call *c;
414
415         /* if the domain socket is not yet open, open it */
416         if (ctdb->daemon.sd==-1) {
417                 ctdb_socket_connect(ctdb);
418         }
419
420         ret = ctdb_ltdb_lock(ctdb_db, call->key);
421         if (ret != 0) {
422                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
423                 return NULL;
424         }
425
426         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
427
428         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
429                 ret = -1;
430         }
431
432         if (ret == 0 && header.dmaster == ctdb->pnn) {
433                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
434                 talloc_free(data.dptr);
435                 ctdb_ltdb_unlock(ctdb_db, call->key);
436                 return state;
437         }
438
439         ctdb_ltdb_unlock(ctdb_db, call->key);
440         talloc_free(data.dptr);
441
442         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
443         if (state == NULL) {
444                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
445                 return NULL;
446         }
447         state->call = talloc_zero(state, struct ctdb_call);
448         if (state->call == NULL) {
449                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
450                 return NULL;
451         }
452
453         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
454         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
455         if (c == NULL) {
456                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
457                 return NULL;
458         }
459
460         state->reqid     = ctdb_reqid_new(ctdb, state);
461         state->ctdb_db = ctdb_db;
462         talloc_set_destructor(state, ctdb_client_call_destructor);
463
464         c->hdr.reqid     = state->reqid;
465         c->flags         = call->flags;
466         c->db_id         = ctdb_db->db_id;
467         c->callid        = call->call_id;
468         c->hopcount      = 0;
469         c->keylen        = call->key.dsize;
470         c->calldatalen   = call->call_data.dsize;
471         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
472         memcpy(&c->data[call->key.dsize], 
473                call->call_data.dptr, call->call_data.dsize);
474         *(state->call)              = *call;
475         state->call->call_data.dptr = &c->data[call->key.dsize];
476         state->call->key.dptr       = &c->data[0];
477
478         state->state  = CTDB_CALL_WAIT;
479
480
481         ctdb_client_queue_pkt(ctdb, &c->hdr);
482
483         return state;
484 }
485
486
487 /*
488   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
489 */
490 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
491 {
492         struct ctdb_client_call_state *state;
493
494         state = ctdb_call_send(ctdb_db, call);
495         return ctdb_call_recv(state, call);
496 }
497
498
499 /*
500   tell the daemon what messaging srvid we will use, and register the message
501   handler function in the client
502 */
503 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
504                              ctdb_msg_fn_t handler,
505                              void *private_data)
506                                     
507 {
508         int res;
509         int32_t status;
510         
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
512                            tdb_null, NULL, NULL, &status, NULL, NULL);
513         if (res != 0 || status != 0) {
514                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
515                 return -1;
516         }
517
518         /* also need to register the handler with our own ctdb structure */
519         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
520 }
521
522 /*
523   tell the daemon we no longer want a srvid
524 */
525 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
526 {
527         int res;
528         int32_t status;
529         
530         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
531                            tdb_null, NULL, NULL, &status, NULL, NULL);
532         if (res != 0 || status != 0) {
533                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
534                 return -1;
535         }
536
537         /* also need to register the handler with our own ctdb structure */
538         ctdb_deregister_message_handler(ctdb, srvid, private_data);
539         return 0;
540 }
541
542
543 /*
544   send a message - from client context
545  */
546 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
547                       uint64_t srvid, TDB_DATA data)
548 {
549         struct ctdb_req_message *r;
550         int len, res;
551
552         len = offsetof(struct ctdb_req_message, data) + data.dsize;
553         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
554                                len, struct ctdb_req_message);
555         CTDB_NO_MEMORY(ctdb, r);
556
557         r->hdr.destnode  = pnn;
558         r->srvid         = srvid;
559         r->datalen       = data.dsize;
560         memcpy(&r->data[0], data.dptr, data.dsize);
561         
562         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
563         if (res != 0) {
564                 return res;
565         }
566
567         talloc_free(r);
568         return 0;
569 }
570
571
572 /*
573   cancel a ctdb_fetch_lock operation, releasing the lock
574  */
575 static int fetch_lock_destructor(struct ctdb_record_handle *h)
576 {
577         ctdb_ltdb_unlock(h->ctdb_db, h->key);
578         return 0;
579 }
580
581 /*
582   force the migration of a record to this node
583  */
584 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
585 {
586         struct ctdb_call call;
587         ZERO_STRUCT(call);
588         call.call_id = CTDB_NULL_FUNC;
589         call.key = key;
590         call.flags = CTDB_IMMEDIATE_MIGRATION;
591         return ctdb_call(ctdb_db, &call);
592 }
593
594 /*
595   try to fetch a readonly copy of a record
596  */
597 static int
598 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
599 {
600         int ret;
601
602         struct ctdb_call call;
603         ZERO_STRUCT(call);
604
605         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
606         call.call_data.dptr = NULL;
607         call.call_data.dsize = 0;
608         call.key = key;
609         call.flags = CTDB_WANT_READONLY;
610         ret = ctdb_call(ctdb_db, &call);
611
612         if (ret != 0) {
613                 return -1;
614         }
615         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
616                 return -1;
617         }
618
619         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
620         if (*hdr == NULL) {
621                 talloc_free(call.reply_data.dptr);
622                 return -1;
623         }
624
625         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
626         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
627         if (data->dptr == NULL) {
628                 talloc_free(call.reply_data.dptr);
629                 talloc_free(hdr);
630                 return -1;
631         }
632
633         return 0;
634 }
635
636 /*
637   get a lock on a record, and return the records data. Blocks until it gets the lock
638  */
639 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
640                                            TDB_DATA key, TDB_DATA *data)
641 {
642         int ret;
643         struct ctdb_record_handle *h;
644
645         /*
646           procedure is as follows:
647
648           1) get the chain lock. 
649           2) check if we are dmaster
650           3) if we are the dmaster then return handle 
651           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
652              reply from ctdbd
653           5) when we get the reply, goto (1)
654          */
655
656         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
657         if (h == NULL) {
658                 return NULL;
659         }
660
661         h->ctdb_db = ctdb_db;
662         h->key     = key;
663         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
664         if (h->key.dptr == NULL) {
665                 talloc_free(h);
666                 return NULL;
667         }
668         h->data    = data;
669
670         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
671                  (const char *)key.dptr));
672
673 again:
674         /* step 1 - get the chain lock */
675         ret = ctdb_ltdb_lock(ctdb_db, key);
676         if (ret != 0) {
677                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
678                 talloc_free(h);
679                 return NULL;
680         }
681
682         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
683
684         talloc_set_destructor(h, fetch_lock_destructor);
685
686         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
687
688         /* when torturing, ensure we test the remote path */
689         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
690             random() % 5 == 0) {
691                 h->header.dmaster = (uint32_t)-1;
692         }
693
694
695         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
696
697         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
698                 ctdb_ltdb_unlock(ctdb_db, key);
699                 ret = ctdb_client_force_migration(ctdb_db, key);
700                 if (ret != 0) {
701                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
702                         talloc_free(h);
703                         return NULL;
704                 }
705                 goto again;
706         }
707
708         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
709         return h;
710 }
711
712 /*
713   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
714  */
715 struct ctdb_record_handle *
716 ctdb_fetch_readonly_lock(
717         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
718         TDB_DATA key, TDB_DATA *data,
719         int read_only)
720 {
721         int ret;
722         struct ctdb_record_handle *h;
723         struct ctdb_ltdb_header *roheader = NULL;
724
725         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
726         if (h == NULL) {
727                 return NULL;
728         }
729
730         h->ctdb_db = ctdb_db;
731         h->key     = key;
732         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
733         if (h->key.dptr == NULL) {
734                 talloc_free(h);
735                 return NULL;
736         }
737         h->data    = data;
738
739         data->dptr = NULL;
740         data->dsize = 0;
741
742
743 again:
744         talloc_free(roheader);
745         roheader = NULL;
746
747         talloc_free(data->dptr);
748         data->dptr = NULL;
749         data->dsize = 0;
750
751         /* Lock the record/chain */
752         ret = ctdb_ltdb_lock(ctdb_db, key);
753         if (ret != 0) {
754                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
755                 talloc_free(h);
756                 return NULL;
757         }
758
759         talloc_set_destructor(h, fetch_lock_destructor);
760
761         /* Check if record exists yet in the TDB */
762         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
763         if (ret != 0) {
764                 ctdb_ltdb_unlock(ctdb_db, key);
765                 ret = ctdb_client_force_migration(ctdb_db, key);
766                 if (ret != 0) {
767                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
768                         talloc_free(h);
769                         return NULL;
770                 }
771                 goto again;
772         }
773
774         /* if this is a request for read/write and we have delegations
775            we have to revoke all delegations first
776         */
777         if ((read_only == 0) 
778         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
779         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
780                 ctdb_ltdb_unlock(ctdb_db, key);
781                 ret = ctdb_client_force_migration(ctdb_db, key);
782                 if (ret != 0) {
783                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
784                         talloc_free(h);
785                         return NULL;
786                 }
787                 goto again;
788         }
789
790         /* if we are dmaster, just return the handle */
791         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
792                 return h;
793         }
794
795         if (read_only != 0) {
796                 TDB_DATA rodata = {NULL, 0};
797
798                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
799                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
800                         return h;
801                 }
802
803                 ctdb_ltdb_unlock(ctdb_db, key);
804                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
805                 if (ret != 0) {
806                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
807                         ret = ctdb_client_force_migration(ctdb_db, key);
808                         if (ret != 0) {
809                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
810                                 talloc_free(h);
811                                 return NULL;
812                         }
813
814                         goto again;
815                 }
816
817                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
818                         ret = ctdb_client_force_migration(ctdb_db, key);
819                         if (ret != 0) {
820                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
821                                 talloc_free(h);
822                                 return NULL;
823                         }
824
825                         goto again;
826                 }
827
828                 ret = ctdb_ltdb_lock(ctdb_db, key);
829                 if (ret != 0) {
830                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
831                         talloc_free(h);
832                         return NULL;
833                 }
834
835                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
836                 if (ret != 0) {
837                         ctdb_ltdb_unlock(ctdb_db, key);
838
839                         ret = ctdb_client_force_migration(ctdb_db, key);
840                         if (ret != 0) {
841                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
842                                 talloc_free(h);
843                                 return NULL;
844                         }
845
846                         goto again;
847                 }
848
849                 return h;
850         }
851
852         /* we are not dmaster and this was not a request for a readonly lock
853          * so unlock the record, migrate it and try again
854          */
855         ctdb_ltdb_unlock(ctdb_db, key);
856         ret = ctdb_client_force_migration(ctdb_db, key);
857         if (ret != 0) {
858                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
859                 talloc_free(h);
860                 return NULL;
861         }
862         goto again;
863 }
864
865 /*
866   store some data to the record that was locked with ctdb_fetch_lock()
867 */
868 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
869 {
870         if (h->ctdb_db->persistent) {
871                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
872                 return -1;
873         }
874
875         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
876 }
877
878 /*
879   non-locking fetch of a record
880  */
881 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
882                TDB_DATA key, TDB_DATA *data)
883 {
884         struct ctdb_call call;
885         int ret;
886
887         call.call_id = CTDB_FETCH_FUNC;
888         call.call_data.dptr = NULL;
889         call.call_data.dsize = 0;
890         call.key = key;
891
892         ret = ctdb_call(ctdb_db, &call);
893
894         if (ret == 0) {
895                 *data = call.reply_data;
896                 talloc_steal(mem_ctx, data->dptr);
897         }
898
899         return ret;
900 }
901
902
903
904 /*
905    called when a control completes or timesout to invoke the callback
906    function the user provided
907 */
908 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
909         struct timeval t, void *private_data)
910 {
911         struct ctdb_client_control_state *state;
912         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
913         int ret;
914
915         state = talloc_get_type(private_data, struct ctdb_client_control_state);
916         talloc_steal(tmp_ctx, state);
917
918         ret = ctdb_control_recv(state->ctdb, state, state,
919                         NULL, 
920                         NULL, 
921                         NULL);
922         if (ret != 0) {
923                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
924         }
925
926         talloc_free(tmp_ctx);
927 }
928
929 /*
930   called when a CTDB_REPLY_CONTROL packet comes in in the client
931
932   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
933   contains any reply data from the control
934 */
935 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
936                                       struct ctdb_req_header *hdr)
937 {
938         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
939         struct ctdb_client_control_state *state;
940
941         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
942         if (state == NULL) {
943                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
944                 return;
945         }
946
947         if (hdr->reqid != state->reqid) {
948                 /* we found a record  but it was the wrong one */
949                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
950                 return;
951         }
952
953         state->outdata.dptr = c->data;
954         state->outdata.dsize = c->datalen;
955         state->status = c->status;
956         if (c->errorlen) {
957                 state->errormsg = talloc_strndup(state, 
958                                                  (char *)&c->data[c->datalen], 
959                                                  c->errorlen);
960         }
961
962         /* state->outdata now uses resources from c so we dont want c
963            to just dissappear from under us while state is still alive
964         */
965         talloc_steal(state, c);
966
967         state->state = CTDB_CONTROL_DONE;
968
969         /* if we had a callback registered for this control, pull the response
970            and call the callback.
971         */
972         if (state->async.fn) {
973                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
974         }
975 }
976
977
978 /*
979   destroy a ctdb_control in client
980 */
981 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
982 {
983         ctdb_reqid_remove(state->ctdb, state->reqid);
984         return 0;
985 }
986
987
988 /* time out handler for ctdb_control */
989 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
990         struct timeval t, void *private_data)
991 {
992         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
993
994         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
995                          "dstnode:%u\n", state->reqid, state->c->opcode,
996                          state->c->hdr.destnode));
997
998         state->state = CTDB_CONTROL_TIMEOUT;
999
1000         /* if we had a callback registered for this control, pull the response
1001            and call the callback.
1002         */
1003         if (state->async.fn) {
1004                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1005         }
1006 }
1007
1008 /* async version of send control request */
1009 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1010                 uint32_t destnode, uint64_t srvid, 
1011                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1012                 TALLOC_CTX *mem_ctx,
1013                 struct timeval *timeout,
1014                 char **errormsg)
1015 {
1016         struct ctdb_client_control_state *state;
1017         size_t len;
1018         struct ctdb_req_control *c;
1019         int ret;
1020
1021         if (errormsg) {
1022                 *errormsg = NULL;
1023         }
1024
1025         /* if the domain socket is not yet open, open it */
1026         if (ctdb->daemon.sd==-1) {
1027                 ctdb_socket_connect(ctdb);
1028         }
1029
1030         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1031         CTDB_NO_MEMORY_NULL(ctdb, state);
1032
1033         state->ctdb       = ctdb;
1034         state->reqid      = ctdb_reqid_new(ctdb, state);
1035         state->state      = CTDB_CONTROL_WAIT;
1036         state->errormsg   = NULL;
1037
1038         talloc_set_destructor(state, ctdb_client_control_destructor);
1039
1040         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1041         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1042                                len, struct ctdb_req_control);
1043         state->c            = c;        
1044         CTDB_NO_MEMORY_NULL(ctdb, c);
1045         c->hdr.reqid        = state->reqid;
1046         c->hdr.destnode     = destnode;
1047         c->opcode           = opcode;
1048         c->client_id        = 0;
1049         c->flags            = flags;
1050         c->srvid            = srvid;
1051         c->datalen          = data.dsize;
1052         if (data.dsize) {
1053                 memcpy(&c->data[0], data.dptr, data.dsize);
1054         }
1055
1056         /* timeout */
1057         if (timeout && !timeval_is_zero(timeout)) {
1058                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1059         }
1060
1061         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1062         if (ret != 0) {
1063                 talloc_free(state);
1064                 return NULL;
1065         }
1066
1067         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1068                 talloc_free(state);
1069                 return NULL;
1070         }
1071
1072         return state;
1073 }
1074
1075
1076 /* async version of receive control reply */
1077 int ctdb_control_recv(struct ctdb_context *ctdb, 
1078                 struct ctdb_client_control_state *state, 
1079                 TALLOC_CTX *mem_ctx,
1080                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1081 {
1082         TALLOC_CTX *tmp_ctx;
1083
1084         if (status != NULL) {
1085                 *status = -1;
1086         }
1087         if (errormsg != NULL) {
1088                 *errormsg = NULL;
1089         }
1090
1091         if (state == NULL) {
1092                 return -1;
1093         }
1094
1095         /* prevent double free of state */
1096         tmp_ctx = talloc_new(ctdb);
1097         talloc_steal(tmp_ctx, state);
1098
1099         /* loop one event at a time until we either timeout or the control
1100            completes.
1101         */
1102         while (state->state == CTDB_CONTROL_WAIT) {
1103                 event_loop_once(ctdb->ev);
1104         }
1105
1106         if (state->state != CTDB_CONTROL_DONE) {
1107                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1108                 if (state->async.fn) {
1109                         state->async.fn(state);
1110                 }
1111                 talloc_free(tmp_ctx);
1112                 return -1;
1113         }
1114
1115         if (state->errormsg) {
1116                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1117                 if (errormsg) {
1118                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1119                 }
1120                 if (state->async.fn) {
1121                         state->async.fn(state);
1122                 }
1123                 talloc_free(tmp_ctx);
1124                 return -1;
1125         }
1126
1127         if (outdata) {
1128                 *outdata = state->outdata;
1129                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1130         }
1131
1132         if (status) {
1133                 *status = state->status;
1134         }
1135
1136         if (state->async.fn) {
1137                 state->async.fn(state);
1138         }
1139
1140         talloc_free(tmp_ctx);
1141         return 0;
1142 }
1143
1144
1145
1146 /*
1147   send a ctdb control message
1148   timeout specifies how long we should wait for a reply.
1149   if timeout is NULL we wait indefinitely
1150  */
1151 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1152                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1153                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1154                  struct timeval *timeout,
1155                  char **errormsg)
1156 {
1157         struct ctdb_client_control_state *state;
1158
1159         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1160                         flags, data, mem_ctx,
1161                         timeout, errormsg);
1162         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1163                         errormsg);
1164 }
1165
1166
1167
1168
1169 /*
1170   a process exists call. Returns 0 if process exists, -1 otherwise
1171  */
1172 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1173 {
1174         int ret;
1175         TDB_DATA data;
1176         int32_t status;
1177
1178         data.dptr = (uint8_t*)&pid;
1179         data.dsize = sizeof(pid);
1180
1181         ret = ctdb_control(ctdb, destnode, 0, 
1182                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1183                            NULL, NULL, &status, NULL, NULL);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1186                 return -1;
1187         }
1188
1189         return status;
1190 }
1191
1192 /*
1193   get remote statistics
1194  */
1195 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1196 {
1197         int ret;
1198         TDB_DATA data;
1199         int32_t res;
1200
1201         ret = ctdb_control(ctdb, destnode, 0, 
1202                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1203                            ctdb, &data, &res, NULL, NULL);
1204         if (ret != 0 || res != 0) {
1205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1206                 return -1;
1207         }
1208
1209         if (data.dsize != sizeof(struct ctdb_statistics)) {
1210                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1211                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1212                       return -1;
1213         }
1214
1215         *status = *(struct ctdb_statistics *)data.dptr;
1216         talloc_free(data.dptr);
1217                         
1218         return 0;
1219 }
1220
1221 /*
1222   shutdown a remote ctdb node
1223  */
1224 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1225 {
1226         struct ctdb_client_control_state *state;
1227
1228         state = ctdb_control_send(ctdb, destnode, 0, 
1229                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1230                            NULL, &timeout, NULL);
1231         if (state == NULL) {
1232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1233                 return -1;
1234         }
1235
1236         return 0;
1237 }
1238
1239 /*
1240   get vnn map from a remote node
1241  */
1242 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1243 {
1244         int ret;
1245         TDB_DATA outdata;
1246         int32_t res;
1247         struct ctdb_vnn_map_wire *map;
1248
1249         ret = ctdb_control(ctdb, destnode, 0, 
1250                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1251                            mem_ctx, &outdata, &res, &timeout, NULL);
1252         if (ret != 0 || res != 0) {
1253                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1254                 return -1;
1255         }
1256         
1257         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1258         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1259             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1260                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1261                 return -1;
1262         }
1263
1264         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1265         CTDB_NO_MEMORY(ctdb, *vnnmap);
1266         (*vnnmap)->generation = map->generation;
1267         (*vnnmap)->size       = map->size;
1268         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1269
1270         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1271         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1272         talloc_free(outdata.dptr);
1273                     
1274         return 0;
1275 }
1276
1277
1278 /*
1279   get the recovery mode of a remote node
1280  */
1281 struct ctdb_client_control_state *
1282 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1283 {
1284         return ctdb_control_send(ctdb, destnode, 0, 
1285                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1286                            mem_ctx, &timeout, NULL);
1287 }
1288
1289 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1290 {
1291         int ret;
1292         int32_t res;
1293
1294         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1295         if (ret != 0) {
1296                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1297                 return -1;
1298         }
1299
1300         if (recmode) {
1301                 *recmode = (uint32_t)res;
1302         }
1303
1304         return 0;
1305 }
1306
1307 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1312         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1313 }
1314
1315
1316
1317
1318 /*
1319   set the recovery mode of a remote node
1320  */
1321 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1322 {
1323         int ret;
1324         TDB_DATA data;
1325         int32_t res;
1326
1327         data.dsize = sizeof(uint32_t);
1328         data.dptr = (unsigned char *)&recmode;
1329
1330         ret = ctdb_control(ctdb, destnode, 0, 
1331                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1332                            NULL, NULL, &res, &timeout, NULL);
1333         if (ret != 0 || res != 0) {
1334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1335                 return -1;
1336         }
1337
1338         return 0;
1339 }
1340
1341
1342
1343 /*
1344   get the recovery master of a remote node
1345  */
1346 struct ctdb_client_control_state *
1347 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1348                         struct timeval timeout, uint32_t destnode)
1349 {
1350         return ctdb_control_send(ctdb, destnode, 0, 
1351                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1352                            mem_ctx, &timeout, NULL);
1353 }
1354
1355 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1356 {
1357         int ret;
1358         int32_t res;
1359
1360         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1361         if (ret != 0) {
1362                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1363                 return -1;
1364         }
1365
1366         if (recmaster) {
1367                 *recmaster = (uint32_t)res;
1368         }
1369
1370         return 0;
1371 }
1372
1373 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1374 {
1375         struct ctdb_client_control_state *state;
1376
1377         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1378         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1379 }
1380
1381
1382 /*
1383   set the recovery master of a remote node
1384  */
1385 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1386 {
1387         int ret;
1388         TDB_DATA data;
1389         int32_t res;
1390
1391         ZERO_STRUCT(data);
1392         data.dsize = sizeof(uint32_t);
1393         data.dptr = (unsigned char *)&recmaster;
1394
1395         ret = ctdb_control(ctdb, destnode, 0, 
1396                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1397                            NULL, NULL, &res, &timeout, NULL);
1398         if (ret != 0 || res != 0) {
1399                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1400                 return -1;
1401         }
1402
1403         return 0;
1404 }
1405
1406
1407 /*
1408   get a list of databases off a remote node
1409  */
1410 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1411                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1412 {
1413         int ret;
1414         TDB_DATA outdata;
1415         int32_t res;
1416
1417         ret = ctdb_control(ctdb, destnode, 0, 
1418                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1419                            mem_ctx, &outdata, &res, &timeout, NULL);
1420         if (ret != 0 || res != 0) {
1421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1422                 return -1;
1423         }
1424
1425         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1426         talloc_free(outdata.dptr);
1427                     
1428         return 0;
1429 }
1430
1431 /*
1432   get a list of nodes (vnn and flags ) from a remote node
1433  */
1434 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1435                 struct timeval timeout, uint32_t destnode, 
1436                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1437 {
1438         int ret;
1439         TDB_DATA outdata;
1440         int32_t res;
1441
1442         ret = ctdb_control(ctdb, destnode, 0, 
1443                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1444                            mem_ctx, &outdata, &res, &timeout, NULL);
1445         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1446                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1447                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1448         }
1449         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1450                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1451                 return -1;
1452         }
1453
1454         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1455         talloc_free(outdata.dptr);
1456                     
1457         return 0;
1458 }
1459
1460 /*
1461   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1462  */
1463 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1464                 struct timeval timeout, uint32_t destnode, 
1465                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1466 {
1467         int ret, i, len;
1468         TDB_DATA outdata;
1469         struct ctdb_node_mapv4 *nodemapv4;
1470         int32_t res;
1471
1472         ret = ctdb_control(ctdb, destnode, 0, 
1473                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1474                            mem_ctx, &outdata, &res, &timeout, NULL);
1475         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1476                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1477                 return -1;
1478         }
1479
1480         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1481
1482         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1483         (*nodemap) = talloc_zero_size(mem_ctx, len);
1484         CTDB_NO_MEMORY(ctdb, (*nodemap));
1485
1486         (*nodemap)->num = nodemapv4->num;
1487         for (i=0; i<nodemapv4->num; i++) {
1488                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1489                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1490                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1491                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1492         }
1493                 
1494         talloc_free(outdata.dptr);
1495                     
1496         return 0;
1497 }
1498
1499 /*
1500   drop the transport, reload the nodes file and restart the transport
1501  */
1502 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1503                     struct timeval timeout, uint32_t destnode)
1504 {
1505         int ret;
1506         int32_t res;
1507
1508         ret = ctdb_control(ctdb, destnode, 0, 
1509                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1510                            NULL, NULL, &res, &timeout, NULL);
1511         if (ret != 0 || res != 0) {
1512                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1513                 return -1;
1514         }
1515
1516         return 0;
1517 }
1518
1519
1520 /*
1521   set vnn map on a node
1522  */
1523 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1524                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1525 {
1526         int ret;
1527         TDB_DATA data;
1528         int32_t res;
1529         struct ctdb_vnn_map_wire *map;
1530         size_t len;
1531
1532         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1533         map = talloc_size(mem_ctx, len);
1534         CTDB_NO_MEMORY(ctdb, map);
1535
1536         map->generation = vnnmap->generation;
1537         map->size = vnnmap->size;
1538         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1539         
1540         data.dsize = len;
1541         data.dptr  = (uint8_t *)map;
1542
1543         ret = ctdb_control(ctdb, destnode, 0, 
1544                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1545                            NULL, NULL, &res, &timeout, NULL);
1546         if (ret != 0 || res != 0) {
1547                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1548                 return -1;
1549         }
1550
1551         talloc_free(map);
1552
1553         return 0;
1554 }
1555
1556
1557 /*
1558   async send for pull database
1559  */
1560 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1561         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1562         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1563 {
1564         TDB_DATA indata;
1565         struct ctdb_control_pulldb *pull;
1566         struct ctdb_client_control_state *state;
1567
1568         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1569         CTDB_NO_MEMORY_NULL(ctdb, pull);
1570
1571         pull->db_id   = dbid;
1572         pull->lmaster = lmaster;
1573
1574         indata.dsize = sizeof(struct ctdb_control_pulldb);
1575         indata.dptr  = (unsigned char *)pull;
1576
1577         state = ctdb_control_send(ctdb, destnode, 0, 
1578                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1579                                   mem_ctx, &timeout, NULL);
1580         talloc_free(pull);
1581
1582         return state;
1583 }
1584
1585 /*
1586   async recv for pull database
1587  */
1588 int ctdb_ctrl_pulldb_recv(
1589         struct ctdb_context *ctdb, 
1590         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1591         TDB_DATA *outdata)
1592 {
1593         int ret;
1594         int32_t res;
1595
1596         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1597         if ( (ret != 0) || (res != 0) ){
1598                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1599                 return -1;
1600         }
1601
1602         return 0;
1603 }
1604
1605 /*
1606   pull all keys and records for a specific database on a node
1607  */
1608 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1609                 uint32_t dbid, uint32_t lmaster, 
1610                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1611                 TDB_DATA *outdata)
1612 {
1613         struct ctdb_client_control_state *state;
1614
1615         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1616                                       timeout);
1617         
1618         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1619 }
1620
1621
1622 /*
1623   change dmaster for all keys in the database to the new value
1624  */
1625 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1626                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1627 {
1628         int ret;
1629         TDB_DATA indata;
1630         int32_t res;
1631
1632         indata.dsize = 2*sizeof(uint32_t);
1633         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1634
1635         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1636         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1637
1638         ret = ctdb_control(ctdb, destnode, 0, 
1639                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1640                            NULL, NULL, &res, &timeout, NULL);
1641         if (ret != 0 || res != 0) {
1642                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1643                 return -1;
1644         }
1645
1646         return 0;
1647 }
1648
1649 /*
1650   ping a node, return number of clients connected
1651  */
1652 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1653 {
1654         int ret;
1655         int32_t res;
1656
1657         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1658                            tdb_null, NULL, NULL, &res, NULL, NULL);
1659         if (ret != 0) {
1660                 return -1;
1661         }
1662         return res;
1663 }
1664
1665 /*
1666   find the real path to a ltdb 
1667  */
1668 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1669                    const char **path)
1670 {
1671         int ret;
1672         int32_t res;
1673         TDB_DATA data;
1674
1675         data.dptr = (uint8_t *)&dbid;
1676         data.dsize = sizeof(dbid);
1677
1678         ret = ctdb_control(ctdb, destnode, 0, 
1679                            CTDB_CONTROL_GETDBPATH, 0, data, 
1680                            mem_ctx, &data, &res, &timeout, NULL);
1681         if (ret != 0 || res != 0) {
1682                 return -1;
1683         }
1684
1685         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1686         if ((*path) == NULL) {
1687                 return -1;
1688         }
1689
1690         talloc_free(data.dptr);
1691
1692         return 0;
1693 }
1694
1695 /*
1696   find the name of a db 
1697  */
1698 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1699                    const char **name)
1700 {
1701         int ret;
1702         int32_t res;
1703         TDB_DATA data;
1704
1705         data.dptr = (uint8_t *)&dbid;
1706         data.dsize = sizeof(dbid);
1707
1708         ret = ctdb_control(ctdb, destnode, 0, 
1709                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1710                            mem_ctx, &data, &res, &timeout, NULL);
1711         if (ret != 0 || res != 0) {
1712                 return -1;
1713         }
1714
1715         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1716         if ((*name) == NULL) {
1717                 return -1;
1718         }
1719
1720         talloc_free(data.dptr);
1721
1722         return 0;
1723 }
1724
1725 /*
1726   get the health status of a db
1727  */
1728 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1729                           struct timeval timeout,
1730                           uint32_t destnode,
1731                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1732                           const char **reason)
1733 {
1734         int ret;
1735         int32_t res;
1736         TDB_DATA data;
1737
1738         data.dptr = (uint8_t *)&dbid;
1739         data.dsize = sizeof(dbid);
1740
1741         ret = ctdb_control(ctdb, destnode, 0,
1742                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1743                            mem_ctx, &data, &res, &timeout, NULL);
1744         if (ret != 0 || res != 0) {
1745                 return -1;
1746         }
1747
1748         if (data.dsize == 0) {
1749                 (*reason) = NULL;
1750                 return 0;
1751         }
1752
1753         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1754         if ((*reason) == NULL) {
1755                 return -1;
1756         }
1757
1758         talloc_free(data.dptr);
1759
1760         return 0;
1761 }
1762
1763 /*
1764   create a database
1765  */
1766 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1767                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1768 {
1769         int ret;
1770         int32_t res;
1771         TDB_DATA data;
1772
1773         data.dptr = discard_const(name);
1774         data.dsize = strlen(name)+1;
1775
1776         ret = ctdb_control(ctdb, destnode, 0, 
1777                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1778                            0, data, 
1779                            mem_ctx, &data, &res, &timeout, NULL);
1780
1781         if (ret != 0 || res != 0) {
1782                 return -1;
1783         }
1784
1785         return 0;
1786 }
1787
1788 /*
1789   get debug level on a node
1790  */
1791 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1792 {
1793         int ret;
1794         int32_t res;
1795         TDB_DATA data;
1796
1797         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1798                            ctdb, &data, &res, NULL, NULL);
1799         if (ret != 0 || res != 0) {
1800                 return -1;
1801         }
1802         if (data.dsize != sizeof(int32_t)) {
1803                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1804                          (unsigned)data.dsize));
1805                 return -1;
1806         }
1807         *level = *(int32_t *)data.dptr;
1808         talloc_free(data.dptr);
1809         return 0;
1810 }
1811
1812 /*
1813   set debug level on a node
1814  */
1815 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1816 {
1817         int ret;
1818         int32_t res;
1819         TDB_DATA data;
1820
1821         data.dptr = (uint8_t *)&level;
1822         data.dsize = sizeof(level);
1823
1824         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1825                            NULL, NULL, &res, NULL, NULL);
1826         if (ret != 0 || res != 0) {
1827                 return -1;
1828         }
1829         return 0;
1830 }
1831
1832
1833 /*
1834   get a list of connected nodes
1835  */
1836 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1837                                 struct timeval timeout,
1838                                 TALLOC_CTX *mem_ctx,
1839                                 uint32_t *num_nodes)
1840 {
1841         struct ctdb_node_map *map=NULL;
1842         int ret, i;
1843         uint32_t *nodes;
1844
1845         *num_nodes = 0;
1846
1847         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1848         if (ret != 0) {
1849                 return NULL;
1850         }
1851
1852         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1853         if (nodes == NULL) {
1854                 return NULL;
1855         }
1856
1857         for (i=0;i<map->num;i++) {
1858                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1859                         nodes[*num_nodes] = map->nodes[i].pnn;
1860                         (*num_nodes)++;
1861                 }
1862         }
1863
1864         return nodes;
1865 }
1866
1867
1868 /*
1869   reset remote status
1870  */
1871 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1872 {
1873         int ret;
1874         int32_t res;
1875
1876         ret = ctdb_control(ctdb, destnode, 0, 
1877                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1878                            NULL, NULL, &res, NULL, NULL);
1879         if (ret != 0 || res != 0) {
1880                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1881                 return -1;
1882         }
1883         return 0;
1884 }
1885
1886 /*
1887   attach to a specific database - client call
1888 */
1889 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1890                                     struct timeval timeout,
1891                                     const char *name,
1892                                     bool persistent,
1893                                     uint32_t tdb_flags)
1894 {
1895         struct ctdb_db_context *ctdb_db;
1896         TDB_DATA data;
1897         int ret;
1898         int32_t res;
1899
1900         ctdb_db = ctdb_db_handle(ctdb, name);
1901         if (ctdb_db) {
1902                 return ctdb_db;
1903         }
1904
1905         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1906         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1907
1908         ctdb_db->ctdb = ctdb;
1909         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1910         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1911
1912         data.dptr = discard_const(name);
1913         data.dsize = strlen(name)+1;
1914
1915         /* tell ctdb daemon to attach */
1916         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1918                            0, data, ctdb_db, &data, &res, NULL, NULL);
1919         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1920                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1921                 talloc_free(ctdb_db);
1922                 return NULL;
1923         }
1924         
1925         ctdb_db->db_id = *(uint32_t *)data.dptr;
1926         talloc_free(data.dptr);
1927
1928         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1929         if (ret != 0) {
1930                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1931                 talloc_free(ctdb_db);
1932                 return NULL;
1933         }
1934
1935         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1936         if (ctdb->valgrinding) {
1937                 tdb_flags |= TDB_NOMMAP;
1938         }
1939         tdb_flags |= TDB_DISALLOW_NESTING;
1940
1941         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1942         if (ctdb_db->ltdb == NULL) {
1943                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1944                 talloc_free(ctdb_db);
1945                 return NULL;
1946         }
1947
1948         ctdb_db->persistent = persistent;
1949
1950         DLIST_ADD(ctdb->db_list, ctdb_db);
1951
1952         /* add well known functions */
1953         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1954         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1955         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1956
1957         return ctdb_db;
1958 }
1959
1960
1961 /*
1962   setup a call for a database
1963  */
1964 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1965 {
1966         struct ctdb_registered_call *call;
1967
1968 #if 0
1969         TDB_DATA data;
1970         int32_t status;
1971         struct ctdb_control_set_call c;
1972         int ret;
1973
1974         /* this is no longer valid with the separate daemon architecture */
1975         c.db_id = ctdb_db->db_id;
1976         c.fn    = fn;
1977         c.id    = id;
1978
1979         data.dptr = (uint8_t *)&c;
1980         data.dsize = sizeof(c);
1981
1982         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1983                            data, NULL, NULL, &status, NULL, NULL);
1984         if (ret != 0 || status != 0) {
1985                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1986                 return -1;
1987         }
1988 #endif
1989
1990         /* also register locally */
1991         call = talloc(ctdb_db, struct ctdb_registered_call);
1992         call->fn = fn;
1993         call->id = id;
1994
1995         DLIST_ADD(ctdb_db->calls, call);        
1996         return 0;
1997 }
1998
1999
2000 struct traverse_state {
2001         bool done;
2002         uint32_t count;
2003         ctdb_traverse_func fn;
2004         void *private_data;
2005 };
2006
2007 /*
2008   called on each key during a ctdb_traverse
2009  */
2010 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2011 {
2012         struct traverse_state *state = (struct traverse_state *)p;
2013         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2014         TDB_DATA key;
2015
2016         if (data.dsize < sizeof(uint32_t) ||
2017             d->length != data.dsize) {
2018                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2019                 state->done = True;
2020                 return;
2021         }
2022
2023         key.dsize = d->keylen;
2024         key.dptr  = &d->data[0];
2025         data.dsize = d->datalen;
2026         data.dptr = &d->data[d->keylen];
2027
2028         if (key.dsize == 0 && data.dsize == 0) {
2029                 /* end of traverse */
2030                 state->done = True;
2031                 return;
2032         }
2033
2034         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
2035                 /* empty records are deleted records in ctdb */
2036                 return;
2037         }
2038
2039         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2040                 state->done = True;
2041         }
2042
2043         state->count++;
2044 }
2045
2046
2047 /*
2048   start a cluster wide traverse, calling the supplied fn on each record
2049   return the number of records traversed, or -1 on error
2050  */
2051 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2052 {
2053         TDB_DATA data;
2054         struct ctdb_traverse_start t;
2055         int32_t status;
2056         int ret;
2057         uint64_t srvid = (getpid() | 0xFLL<<60);
2058         struct traverse_state state;
2059
2060         state.done = False;
2061         state.count = 0;
2062         state.private_data = private_data;
2063         state.fn = fn;
2064
2065         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2066         if (ret != 0) {
2067                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2068                 return -1;
2069         }
2070
2071         t.db_id = ctdb_db->db_id;
2072         t.srvid = srvid;
2073         t.reqid = 0;
2074         t.withemptyrecords = false;
2075
2076         data.dptr = (uint8_t *)&t;
2077         data.dsize = sizeof(t);
2078
2079         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
2080                            data, NULL, NULL, &status, NULL, NULL);
2081         if (ret != 0 || status != 0) {
2082                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2083                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2084                 return -1;
2085         }
2086
2087         while (!state.done) {
2088                 event_loop_once(ctdb_db->ctdb->ev);
2089         }
2090
2091         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2094                 return -1;
2095         }
2096
2097         return state.count;
2098 }
2099
2100 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2101 /*
2102   called on each key during a catdb
2103  */
2104 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2105 {
2106         int i;
2107         FILE *f = (FILE *)p;
2108         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2109
2110         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2111         for (i=0;i<key.dsize;i++) {
2112                 if (ISASCII(key.dptr[i])) {
2113                         fprintf(f, "%c", key.dptr[i]);
2114                 } else {
2115                         fprintf(f, "\\%02X", key.dptr[i]);
2116                 }
2117         }
2118         fprintf(f, "\"\n");
2119
2120         fprintf(f, "dmaster: %u\n", h->dmaster);
2121         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2122         fprintf(f, "flags: 0x%08x", h->flags);
2123         if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2124         if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2125         if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2126         if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2127         if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2128         if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2129         if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2130         fprintf(f, "\n");
2131
2132         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2133         for (i=sizeof(*h);i<data.dsize;i++) {
2134                 if (ISASCII(data.dptr[i])) {
2135                         fprintf(f, "%c", data.dptr[i]);
2136                 } else {
2137                         fprintf(f, "\\%02X", data.dptr[i]);
2138                 }
2139         }
2140         fprintf(f, "\"\n");
2141
2142         fprintf(f, "\n");
2143
2144         return 0;
2145 }
2146
2147 /*
2148   convenience function to list all keys to stdout
2149  */
2150 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
2151 {
2152         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
2153 }
2154
2155 /*
2156   get the pid of a ctdb daemon
2157  */
2158 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2159 {
2160         int ret;
2161         int32_t res;
2162
2163         ret = ctdb_control(ctdb, destnode, 0, 
2164                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2165                            NULL, NULL, &res, &timeout, NULL);
2166         if (ret != 0) {
2167                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2168                 return -1;
2169         }
2170
2171         *pid = res;
2172
2173         return 0;
2174 }
2175
2176
2177 /*
2178   async freeze send control
2179  */
2180 struct ctdb_client_control_state *
2181 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2182 {
2183         return ctdb_control_send(ctdb, destnode, priority, 
2184                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2185                            mem_ctx, &timeout, NULL);
2186 }
2187
2188 /* 
2189    async freeze recv control
2190 */
2191 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2192 {
2193         int ret;
2194         int32_t res;
2195
2196         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2197         if ( (ret != 0) || (res != 0) ){
2198                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2199                 return -1;
2200         }
2201
2202         return 0;
2203 }
2204
2205 /*
2206   freeze databases of a certain priority
2207  */
2208 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2209 {
2210         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2211         struct ctdb_client_control_state *state;
2212         int ret;
2213
2214         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2215         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2216         talloc_free(tmp_ctx);
2217
2218         return ret;
2219 }
2220
2221 /* Freeze all databases */
2222 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2223 {
2224         int i;
2225
2226         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2227                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2228                         return -1;
2229                 }
2230         }
2231         return 0;
2232 }
2233
2234 /*
2235   thaw databases of a certain priority
2236  */
2237 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2238 {
2239         int ret;
2240         int32_t res;
2241
2242         ret = ctdb_control(ctdb, destnode, priority, 
2243                            CTDB_CONTROL_THAW, 0, tdb_null, 
2244                            NULL, NULL, &res, &timeout, NULL);
2245         if (ret != 0 || res != 0) {
2246                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2247                 return -1;
2248         }
2249
2250         return 0;
2251 }
2252
2253 /* thaw all databases */
2254 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2255 {
2256         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2257 }
2258
2259 /*
2260   get pnn of a node, or -1
2261  */
2262 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2263 {
2264         int ret;
2265         int32_t res;
2266
2267         ret = ctdb_control(ctdb, destnode, 0, 
2268                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2269                            NULL, NULL, &res, &timeout, NULL);
2270         if (ret != 0) {
2271                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2272                 return -1;
2273         }
2274
2275         return res;
2276 }
2277
2278 /*
2279   get the monitoring mode of a remote node
2280  */
2281 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2282 {
2283         int ret;
2284         int32_t res;
2285
2286         ret = ctdb_control(ctdb, destnode, 0, 
2287                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2288                            NULL, NULL, &res, &timeout, NULL);
2289         if (ret != 0) {
2290                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2291                 return -1;
2292         }
2293
2294         *monmode = res;
2295
2296         return 0;
2297 }
2298
2299
2300 /*
2301  set the monitoring mode of a remote node to active
2302  */
2303 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2304 {
2305         int ret;
2306         
2307
2308         ret = ctdb_control(ctdb, destnode, 0, 
2309                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2310                            NULL, NULL,NULL, &timeout, NULL);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2313                 return -1;
2314         }
2315
2316         
2317
2318         return 0;
2319 }
2320
2321 /*
2322   set the monitoring mode of a remote node to disable
2323  */
2324 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2325 {
2326         int ret;
2327         
2328
2329         ret = ctdb_control(ctdb, destnode, 0, 
2330                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2331                            NULL, NULL, NULL, &timeout, NULL);
2332         if (ret != 0) {
2333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2334                 return -1;
2335         }
2336
2337         
2338
2339         return 0;
2340 }
2341
2342
2343
2344 /* 
2345   sent to a node to make it take over an ip address
2346 */
2347 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2348                           uint32_t destnode, struct ctdb_public_ip *ip)
2349 {
2350         TDB_DATA data;
2351         struct ctdb_public_ipv4 ipv4;
2352         int ret;
2353         int32_t res;
2354
2355         if (ip->addr.sa.sa_family == AF_INET) {
2356                 ipv4.pnn = ip->pnn;
2357                 ipv4.sin = ip->addr.ip;
2358
2359                 data.dsize = sizeof(ipv4);
2360                 data.dptr  = (uint8_t *)&ipv4;
2361
2362                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2363                            NULL, &res, &timeout, NULL);
2364         } else {
2365                 data.dsize = sizeof(*ip);
2366                 data.dptr  = (uint8_t *)ip;
2367
2368                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2369                            NULL, &res, &timeout, NULL);
2370         }
2371
2372         if (ret != 0 || res != 0) {
2373                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2374                 return -1;
2375         }
2376
2377         return 0;       
2378 }
2379
2380
2381 /* 
2382   sent to a node to make it release an ip address
2383 */
2384 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2385                          uint32_t destnode, struct ctdb_public_ip *ip)
2386 {
2387         TDB_DATA data;
2388         struct ctdb_public_ipv4 ipv4;
2389         int ret;
2390         int32_t res;
2391
2392         if (ip->addr.sa.sa_family == AF_INET) {
2393                 ipv4.pnn = ip->pnn;
2394                 ipv4.sin = ip->addr.ip;
2395
2396                 data.dsize = sizeof(ipv4);
2397                 data.dptr  = (uint8_t *)&ipv4;
2398
2399                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2400                                    NULL, &res, &timeout, NULL);
2401         } else {
2402                 data.dsize = sizeof(*ip);
2403                 data.dptr  = (uint8_t *)ip;
2404
2405                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2406                                    NULL, &res, &timeout, NULL);
2407         }
2408
2409         if (ret != 0 || res != 0) {
2410                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2411                 return -1;
2412         }
2413
2414         return 0;       
2415 }
2416
2417
2418 /*
2419   get a tunable
2420  */
2421 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2422                           struct timeval timeout, 
2423                           uint32_t destnode,
2424                           const char *name, uint32_t *value)
2425 {
2426         struct ctdb_control_get_tunable *t;
2427         TDB_DATA data, outdata;
2428         int32_t res;
2429         int ret;
2430
2431         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2432         data.dptr  = talloc_size(ctdb, data.dsize);
2433         CTDB_NO_MEMORY(ctdb, data.dptr);
2434
2435         t = (struct ctdb_control_get_tunable *)data.dptr;
2436         t->length = strlen(name)+1;
2437         memcpy(t->name, name, t->length);
2438
2439         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2440                            &outdata, &res, &timeout, NULL);
2441         talloc_free(data.dptr);
2442         if (ret != 0 || res != 0) {
2443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2444                 return -1;
2445         }
2446
2447         if (outdata.dsize != sizeof(uint32_t)) {
2448                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2449                 talloc_free(outdata.dptr);
2450                 return -1;
2451         }
2452         
2453         *value = *(uint32_t *)outdata.dptr;
2454         talloc_free(outdata.dptr);
2455
2456         return 0;
2457 }
2458
2459 /*
2460   set a tunable
2461  */
2462 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2463                           struct timeval timeout, 
2464                           uint32_t destnode,
2465                           const char *name, uint32_t value)
2466 {
2467         struct ctdb_control_set_tunable *t;
2468         TDB_DATA data;
2469         int32_t res;
2470         int ret;
2471
2472         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2473         data.dptr  = talloc_size(ctdb, data.dsize);
2474         CTDB_NO_MEMORY(ctdb, data.dptr);
2475
2476         t = (struct ctdb_control_set_tunable *)data.dptr;
2477         t->length = strlen(name)+1;
2478         memcpy(t->name, name, t->length);
2479         t->value = value;
2480
2481         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2482                            NULL, &res, &timeout, NULL);
2483         talloc_free(data.dptr);
2484         if (ret != 0 || res != 0) {
2485                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2486                 return -1;
2487         }
2488
2489         return 0;
2490 }
2491
2492 /*
2493   list tunables
2494  */
2495 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2496                             struct timeval timeout, 
2497                             uint32_t destnode,
2498                             TALLOC_CTX *mem_ctx,
2499                             const char ***list, uint32_t *count)
2500 {
2501         TDB_DATA outdata;
2502         int32_t res;
2503         int ret;
2504         struct ctdb_control_list_tunable *t;
2505         char *p, *s, *ptr;
2506
2507         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2508                            mem_ctx, &outdata, &res, &timeout, NULL);
2509         if (ret != 0 || res != 0) {
2510                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2511                 return -1;
2512         }
2513
2514         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2515         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2516             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2517                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2518                 talloc_free(outdata.dptr);
2519                 return -1;              
2520         }
2521         
2522         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2523         CTDB_NO_MEMORY(ctdb, p);
2524
2525         talloc_free(outdata.dptr);
2526         
2527         (*list) = NULL;
2528         (*count) = 0;
2529
2530         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2531                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2532                 CTDB_NO_MEMORY(ctdb, *list);
2533                 (*list)[*count] = talloc_strdup(*list, s);
2534                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2535                 (*count)++;
2536         }
2537
2538         talloc_free(p);
2539
2540         return 0;
2541 }
2542
2543
2544 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2545                                    struct timeval timeout, uint32_t destnode,
2546                                    TALLOC_CTX *mem_ctx,
2547                                    uint32_t flags,
2548                                    struct ctdb_all_public_ips **ips)
2549 {
2550         int ret;
2551         TDB_DATA outdata;
2552         int32_t res;
2553
2554         ret = ctdb_control(ctdb, destnode, 0, 
2555                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2556                            mem_ctx, &outdata, &res, &timeout, NULL);
2557         if (ret == 0 && res == -1) {
2558                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2559                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2560         }
2561         if (ret != 0 || res != 0) {
2562           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2563                 return -1;
2564         }
2565
2566         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2567         talloc_free(outdata.dptr);
2568                     
2569         return 0;
2570 }
2571
2572 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2573                              struct timeval timeout, uint32_t destnode,
2574                              TALLOC_CTX *mem_ctx,
2575                              struct ctdb_all_public_ips **ips)
2576 {
2577         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2578                                               destnode, mem_ctx,
2579                                               0, ips);
2580 }
2581
2582 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2583                         struct timeval timeout, uint32_t destnode, 
2584                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2585 {
2586         int ret, i, len;
2587         TDB_DATA outdata;
2588         int32_t res;
2589         struct ctdb_all_public_ipsv4 *ipsv4;
2590
2591         ret = ctdb_control(ctdb, destnode, 0, 
2592                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2593                            mem_ctx, &outdata, &res, &timeout, NULL);
2594         if (ret != 0 || res != 0) {
2595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2596                 return -1;
2597         }
2598
2599         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2600         len = offsetof(struct ctdb_all_public_ips, ips) +
2601                 ipsv4->num*sizeof(struct ctdb_public_ip);
2602         *ips = talloc_zero_size(mem_ctx, len);
2603         CTDB_NO_MEMORY(ctdb, *ips);
2604         (*ips)->num = ipsv4->num;
2605         for (i=0; i<ipsv4->num; i++) {
2606                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2607                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2608         }
2609
2610         talloc_free(outdata.dptr);
2611                     
2612         return 0;
2613 }
2614
2615 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2616                                  struct timeval timeout, uint32_t destnode,
2617                                  TALLOC_CTX *mem_ctx,
2618                                  const ctdb_sock_addr *addr,
2619                                  struct ctdb_control_public_ip_info **_info)
2620 {
2621         int ret;
2622         TDB_DATA indata;
2623         TDB_DATA outdata;
2624         int32_t res;
2625         struct ctdb_control_public_ip_info *info;
2626         uint32_t len;
2627         uint32_t i;
2628
2629         indata.dptr = discard_const_p(uint8_t, addr);
2630         indata.dsize = sizeof(*addr);
2631
2632         ret = ctdb_control(ctdb, destnode, 0,
2633                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2634                            mem_ctx, &outdata, &res, &timeout, NULL);
2635         if (ret != 0 || res != 0) {
2636                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2637                                 "failed ret:%d res:%d\n",
2638                                 ret, res));
2639                 return -1;
2640         }
2641
2642         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2643         if (len > outdata.dsize) {
2644                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2645                                 "returned invalid data with size %u > %u\n",
2646                                 (unsigned int)outdata.dsize,
2647                                 (unsigned int)len));
2648                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2649                 return -1;
2650         }
2651
2652         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2653         len += info->num*sizeof(struct ctdb_control_iface_info);
2654
2655         if (len > outdata.dsize) {
2656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2657                                 "returned invalid data with size %u > %u\n",
2658                                 (unsigned int)outdata.dsize,
2659                                 (unsigned int)len));
2660                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2661                 return -1;
2662         }
2663
2664         /* make sure we null terminate the returned strings */
2665         for (i=0; i < info->num; i++) {
2666                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2667         }
2668
2669         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2670                                                                 outdata.dptr,
2671                                                                 outdata.dsize);
2672         talloc_free(outdata.dptr);
2673         if (*_info == NULL) {
2674                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2675                                 "talloc_memdup size %u failed\n",
2676                                 (unsigned int)outdata.dsize));
2677                 return -1;
2678         }
2679
2680         return 0;
2681 }
2682
2683 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2684                          struct timeval timeout, uint32_t destnode,
2685                          TALLOC_CTX *mem_ctx,
2686                          struct ctdb_control_get_ifaces **_ifaces)
2687 {
2688         int ret;
2689         TDB_DATA outdata;
2690         int32_t res;
2691         struct ctdb_control_get_ifaces *ifaces;
2692         uint32_t len;
2693         uint32_t i;
2694
2695         ret = ctdb_control(ctdb, destnode, 0,
2696                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2697                            mem_ctx, &outdata, &res, &timeout, NULL);
2698         if (ret != 0 || res != 0) {
2699                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2700                                 "failed ret:%d res:%d\n",
2701                                 ret, res));
2702                 return -1;
2703         }
2704
2705         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2706         if (len > outdata.dsize) {
2707                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2708                                 "returned invalid data with size %u > %u\n",
2709                                 (unsigned int)outdata.dsize,
2710                                 (unsigned int)len));
2711                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2712                 return -1;
2713         }
2714
2715         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2716         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2717
2718         if (len > outdata.dsize) {
2719                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2720                                 "returned invalid data with size %u > %u\n",
2721                                 (unsigned int)outdata.dsize,
2722                                 (unsigned int)len));
2723                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2724                 return -1;
2725         }
2726
2727         /* make sure we null terminate the returned strings */
2728         for (i=0; i < ifaces->num; i++) {
2729                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2730         }
2731
2732         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2733                                                                   outdata.dptr,
2734                                                                   outdata.dsize);
2735         talloc_free(outdata.dptr);
2736         if (*_ifaces == NULL) {
2737                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2738                                 "talloc_memdup size %u failed\n",
2739                                 (unsigned int)outdata.dsize));
2740                 return -1;
2741         }
2742
2743         return 0;
2744 }
2745
2746 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2747                              struct timeval timeout, uint32_t destnode,
2748                              TALLOC_CTX *mem_ctx,
2749                              const struct ctdb_control_iface_info *info)
2750 {
2751         int ret;
2752         TDB_DATA indata;
2753         int32_t res;
2754
2755         indata.dptr = discard_const_p(uint8_t, info);
2756         indata.dsize = sizeof(*info);
2757
2758         ret = ctdb_control(ctdb, destnode, 0,
2759                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2760                            mem_ctx, NULL, &res, &timeout, NULL);
2761         if (ret != 0 || res != 0) {
2762                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2763                                 "failed ret:%d res:%d\n",
2764                                 ret, res));
2765                 return -1;
2766         }
2767
2768         return 0;
2769 }
2770
2771 /*
2772   set/clear the permanent disabled bit on a remote node
2773  */
2774 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2775                        uint32_t set, uint32_t clear)
2776 {
2777         int ret;
2778         TDB_DATA data;
2779         struct ctdb_node_map *nodemap=NULL;
2780         struct ctdb_node_flag_change c;
2781         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2782         uint32_t recmaster;
2783         uint32_t *nodes;
2784
2785
2786         /* find the recovery master */
2787         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2788         if (ret != 0) {
2789                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2790                 talloc_free(tmp_ctx);
2791                 return ret;
2792         }
2793
2794
2795         /* read the node flags from the recmaster */
2796         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2797         if (ret != 0) {
2798                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2799                 talloc_free(tmp_ctx);
2800                 return -1;
2801         }
2802         if (destnode >= nodemap->num) {
2803                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2804                 talloc_free(tmp_ctx);
2805                 return -1;
2806         }
2807
2808         c.pnn       = destnode;
2809         c.old_flags = nodemap->nodes[destnode].flags;
2810         c.new_flags = c.old_flags;
2811         c.new_flags |= set;
2812         c.new_flags &= ~clear;
2813
2814         data.dsize = sizeof(c);
2815         data.dptr = (unsigned char *)&c;
2816
2817         /* send the flags update to all connected nodes */
2818         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2819
2820         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2821                                         nodes, 0,
2822                                         timeout, false, data,
2823                                         NULL, NULL,
2824                                         NULL) != 0) {
2825                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2826
2827                 talloc_free(tmp_ctx);
2828                 return -1;
2829         }
2830
2831         talloc_free(tmp_ctx);
2832         return 0;
2833 }
2834
2835
2836 /*
2837   get all tunables
2838  */
2839 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2840                                struct timeval timeout, 
2841                                uint32_t destnode,
2842                                struct ctdb_tunable *tunables)
2843 {
2844         TDB_DATA outdata;
2845         int ret;
2846         int32_t res;
2847
2848         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2849                            &outdata, &res, &timeout, NULL);
2850         if (ret != 0 || res != 0) {
2851                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2852                 return -1;
2853         }
2854
2855         if (outdata.dsize != sizeof(*tunables)) {
2856                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2857                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2858                 return -1;              
2859         }
2860
2861         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2862         talloc_free(outdata.dptr);
2863         return 0;
2864 }
2865
2866 /*
2867   add a public address to a node
2868  */
2869 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2870                       struct timeval timeout, 
2871                       uint32_t destnode,
2872                       struct ctdb_control_ip_iface *pub)
2873 {
2874         TDB_DATA data;
2875         int32_t res;
2876         int ret;
2877
2878         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2879         data.dptr  = (unsigned char *)pub;
2880
2881         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2882                            NULL, &res, &timeout, NULL);
2883         if (ret != 0 || res != 0) {
2884                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2885                 return -1;
2886         }
2887
2888         return 0;
2889 }
2890
2891 /*
2892   delete a public address from a node
2893  */
2894 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2895                       struct timeval timeout, 
2896                       uint32_t destnode,
2897                       struct ctdb_control_ip_iface *pub)
2898 {
2899         TDB_DATA data;
2900         int32_t res;
2901         int ret;
2902
2903         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2904         data.dptr  = (unsigned char *)pub;
2905
2906         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2907                            NULL, &res, &timeout, NULL);
2908         if (ret != 0 || res != 0) {
2909                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2910                 return -1;
2911         }
2912
2913         return 0;
2914 }
2915
2916 /*
2917   kill a tcp connection
2918  */
2919 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2920                       struct timeval timeout, 
2921                       uint32_t destnode,
2922                       struct ctdb_control_killtcp *killtcp)
2923 {
2924         TDB_DATA data;
2925         int32_t res;
2926         int ret;
2927
2928         data.dsize = sizeof(struct ctdb_control_killtcp);
2929         data.dptr  = (unsigned char *)killtcp;
2930
2931         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2932                            NULL, &res, &timeout, NULL);
2933         if (ret != 0 || res != 0) {
2934                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2935                 return -1;
2936         }
2937
2938         return 0;
2939 }
2940
2941 /*
2942   send a gratious arp
2943  */
2944 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2945                       struct timeval timeout, 
2946                       uint32_t destnode,
2947                       ctdb_sock_addr *addr,
2948                       const char *ifname)
2949 {
2950         TDB_DATA data;
2951         int32_t res;
2952         int ret, len;
2953         struct ctdb_control_gratious_arp *gratious_arp;
2954         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2955
2956
2957         len = strlen(ifname)+1;
2958         gratious_arp = talloc_size(tmp_ctx, 
2959                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2960         CTDB_NO_MEMORY(ctdb, gratious_arp);
2961
2962         gratious_arp->addr = *addr;
2963         gratious_arp->len = len;
2964         memcpy(&gratious_arp->iface[0], ifname, len);
2965
2966
2967         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2968         data.dptr  = (unsigned char *)gratious_arp;
2969
2970         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2971                            NULL, &res, &timeout, NULL);
2972         if (ret != 0 || res != 0) {
2973                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2974                 talloc_free(tmp_ctx);
2975                 return -1;
2976         }
2977
2978         talloc_free(tmp_ctx);
2979         return 0;
2980 }
2981
2982 /*
2983   get a list of all tcp tickles that a node knows about for a particular vnn
2984  */
2985 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2986                               struct timeval timeout, uint32_t destnode, 
2987                               TALLOC_CTX *mem_ctx, 
2988                               ctdb_sock_addr *addr,
2989                               struct ctdb_control_tcp_tickle_list **list)
2990 {
2991         int ret;
2992         TDB_DATA data, outdata;
2993         int32_t status;
2994
2995         data.dptr = (uint8_t*)addr;
2996         data.dsize = sizeof(ctdb_sock_addr);
2997
2998         ret = ctdb_control(ctdb, destnode, 0, 
2999                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3000                            mem_ctx, &outdata, &status, NULL, NULL);
3001         if (ret != 0 || status != 0) {
3002                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3003                 return -1;
3004         }
3005
3006         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3007
3008         return status;
3009 }
3010
3011 /*
3012   register a server id
3013  */
3014 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3015                       struct timeval timeout, 
3016                       struct ctdb_server_id *id)
3017 {
3018         TDB_DATA data;
3019         int32_t res;
3020         int ret;
3021
3022         data.dsize = sizeof(struct ctdb_server_id);
3023         data.dptr  = (unsigned char *)id;
3024
3025         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3026                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3027                         0, data, NULL,
3028                         NULL, &res, &timeout, NULL);
3029         if (ret != 0 || res != 0) {
3030                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3031                 return -1;
3032         }
3033
3034         return 0;
3035 }
3036
3037 /*
3038   unregister a server id
3039  */
3040 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3041                       struct timeval timeout, 
3042                       struct ctdb_server_id *id)
3043 {
3044         TDB_DATA data;
3045         int32_t res;
3046         int ret;
3047
3048         data.dsize = sizeof(struct ctdb_server_id);
3049         data.dptr  = (unsigned char *)id;
3050
3051         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3052                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3053                         0, data, NULL,
3054                         NULL, &res, &timeout, NULL);
3055         if (ret != 0 || res != 0) {
3056                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3057                 return -1;
3058         }
3059
3060         return 0;
3061 }
3062
3063
3064 /*
3065   check if a server id exists
3066
3067   if a server id does exist, return *status == 1, otherwise *status == 0
3068  */
3069 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3070                       struct timeval timeout, 
3071                       uint32_t destnode,
3072                       struct ctdb_server_id *id,
3073                       uint32_t *status)
3074 {
3075         TDB_DATA data;
3076         int32_t res;
3077         int ret;
3078
3079         data.dsize = sizeof(struct ctdb_server_id);
3080         data.dptr  = (unsigned char *)id;
3081
3082         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3083                         0, data, NULL,
3084                         NULL, &res, &timeout, NULL);
3085         if (ret != 0) {
3086                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3087                 return -1;
3088         }
3089
3090         if (res) {
3091                 *status = 1;
3092         } else {
3093                 *status = 0;
3094         }
3095
3096         return 0;
3097 }
3098
3099 /*
3100    get the list of server ids that are registered on a node
3101 */
3102 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3103                 TALLOC_CTX *mem_ctx,
3104                 struct timeval timeout, uint32_t destnode, 
3105                 struct ctdb_server_id_list **svid_list)
3106 {
3107         int ret;
3108         TDB_DATA outdata;
3109         int32_t res;
3110
3111         ret = ctdb_control(ctdb, destnode, 0, 
3112                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3113                            mem_ctx, &outdata, &res, &timeout, NULL);
3114         if (ret != 0 || res != 0) {
3115                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3116                 return -1;
3117         }
3118
3119         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3120                     
3121         return 0;
3122 }
3123
3124 /*
3125   initialise the ctdb daemon for client applications
3126
3127   NOTE: In current code the daemon does not fork. This is for testing purposes only
3128   and to simplify the code.
3129 */
3130 struct ctdb_context *ctdb_init(struct event_context *ev)
3131 {
3132         int ret;
3133         struct ctdb_context *ctdb;
3134
3135         ctdb = talloc_zero(ev, struct ctdb_context);
3136         if (ctdb == NULL) {
3137                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3138                 return NULL;
3139         }
3140         ctdb->ev  = ev;
3141         ctdb->idr = idr_init(ctdb);
3142         /* Wrap early to exercise code. */
3143         ctdb->lastid = INT_MAX-200;
3144         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3145
3146         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3147         if (ret != 0) {
3148                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3149                 talloc_free(ctdb);
3150                 return NULL;
3151         }
3152
3153         ctdb->statistics.statistics_start_time = timeval_current();
3154
3155         return ctdb;
3156 }
3157
3158
3159 /*
3160   set some ctdb flags
3161 */
3162 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3163 {
3164         ctdb->flags |= flags;
3165 }
3166
3167 /*
3168   setup the local socket name
3169 */
3170 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3171 {
3172         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3173         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3174
3175         return 0;
3176 }
3177
3178 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3179 {
3180         return ctdb->daemon.name;
3181 }
3182
3183 /*
3184   return the pnn of this node
3185 */
3186 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3187 {
3188         return ctdb->pnn;
3189 }
3190
3191
3192 /*
3193   get the uptime of a remote node
3194  */
3195 struct ctdb_client_control_state *
3196 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3197 {
3198         return ctdb_control_send(ctdb, destnode, 0, 
3199                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3200                            mem_ctx, &timeout, NULL);
3201 }
3202
3203 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3204 {
3205         int ret;
3206         int32_t res;
3207         TDB_DATA outdata;
3208
3209         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3210         if (ret != 0 || res != 0) {
3211                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3212                 return -1;
3213         }
3214
3215         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3216
3217         return 0;
3218 }
3219
3220 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3221 {
3222         struct ctdb_client_control_state *state;
3223
3224         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3225         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3226 }
3227
3228 /*
3229   send a control to execute the "recovered" event script on a node
3230  */
3231 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3232 {
3233         int ret;
3234         int32_t status;
3235
3236         ret = ctdb_control(ctdb, destnode, 0, 
3237                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3238                            NULL, NULL, &status, &timeout, NULL);
3239         if (ret != 0 || status != 0) {
3240                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3241                 return -1;
3242         }
3243
3244         return 0;
3245 }
3246
3247 /* 
3248   callback for the async helpers used when sending the same control
3249   to multiple nodes in parallell.
3250 */
3251 static void async_callback(struct ctdb_client_control_state *state)
3252 {
3253         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3254         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3255         int ret;
3256         TDB_DATA outdata;
3257         int32_t res;
3258         uint32_t destnode = state->c->hdr.destnode;
3259
3260         /* one more node has responded with recmode data */
3261         data->count--;
3262
3263         /* if we failed to push the db, then return an error and let
3264            the main loop try again.
3265         */
3266         if (state->state != CTDB_CONTROL_DONE) {
3267                 if ( !data->dont_log_errors) {
3268                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3269                 }
3270                 data->fail_count++;
3271                 if (data->fail_callback) {
3272                         data->fail_callback(ctdb, destnode, res, outdata,
3273                                         data->callback_data);
3274                 }
3275                 return;
3276         }
3277         
3278         state->async.fn = NULL;
3279
3280         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3281         if ((ret != 0) || (res != 0)) {
3282                 if ( !data->dont_log_errors) {
3283                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3284                 }
3285                 data->fail_count++;
3286                 if (data->fail_callback) {
3287                         data->fail_callback(ctdb, destnode, res, outdata,
3288                                         data->callback_data);
3289                 }
3290         }
3291         if ((ret == 0) && (data->callback != NULL)) {
3292                 data->callback(ctdb, destnode, res, outdata,
3293                                         data->callback_data);
3294         }
3295 }
3296
3297
3298 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3299 {
3300         /* set up the callback functions */
3301         state->async.fn = async_callback;
3302         state->async.private_data = data;
3303         
3304         /* one more control to wait for to complete */
3305         data->count++;
3306 }
3307
3308
3309 /* wait for up to the maximum number of seconds allowed
3310    or until all nodes we expect a response from has replied
3311 */
3312 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3313 {
3314         while (data->count > 0) {
3315                 event_loop_once(ctdb->ev);
3316         }
3317         if (data->fail_count != 0) {
3318                 if (!data->dont_log_errors) {
3319                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3320                                  data->fail_count));
3321                 }
3322                 return -1;
3323         }
3324         return 0;
3325 }
3326
3327
3328 /* 
3329    perform a simple control on the listed nodes
3330    The control cannot return data
3331  */
3332 int ctdb_client_async_control(struct ctdb_context *ctdb,
3333                                 enum ctdb_controls opcode,
3334                                 uint32_t *nodes,
3335                                 uint64_t srvid,
3336                                 struct timeval timeout,
3337                                 bool dont_log_errors,
3338                                 TDB_DATA data,
3339                                 client_async_callback client_callback,
3340                                 client_async_callback fail_callback,
3341                                 void *callback_data)
3342 {
3343         struct client_async_data *async_data;
3344         struct ctdb_client_control_state *state;
3345         int j, num_nodes;
3346
3347         async_data = talloc_zero(ctdb, struct client_async_data);
3348         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3349         async_data->dont_log_errors = dont_log_errors;
3350         async_data->callback = client_callback;
3351         async_data->fail_callback = fail_callback;
3352         async_data->callback_data = callback_data;
3353         async_data->opcode        = opcode;
3354
3355         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3356
3357         /* loop over all nodes and send an async control to each of them */
3358         for (j=0; j<num_nodes; j++) {
3359                 uint32_t pnn = nodes[j];
3360
3361                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3362                                           0, data, async_data, &timeout, NULL);
3363                 if (state == NULL) {
3364                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3365                         talloc_free(async_data);
3366                         return -1;
3367                 }
3368                 
3369                 ctdb_client_async_add(async_data, state);
3370         }
3371
3372         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3373                 talloc_free(async_data);
3374                 return -1;
3375         }
3376
3377         talloc_free(async_data);
3378         return 0;
3379 }
3380
3381 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3382                                 struct ctdb_vnn_map *vnn_map,
3383                                 TALLOC_CTX *mem_ctx,
3384                                 bool include_self)
3385 {
3386         int i, j, num_nodes;
3387         uint32_t *nodes;
3388
3389         for (i=num_nodes=0;i<vnn_map->size;i++) {
3390                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3391                         continue;
3392                 }
3393                 num_nodes++;
3394         } 
3395
3396         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3397         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3398
3399         for (i=j=0;i<vnn_map->size;i++) {
3400                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3401                         continue;
3402                 }
3403                 nodes[j++] = vnn_map->map[i];
3404         } 
3405
3406         return nodes;
3407 }
3408
3409 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3410                                 struct ctdb_node_map *node_map,
3411                                 TALLOC_CTX *mem_ctx,
3412                                 bool include_self)
3413 {
3414         int i, j, num_nodes;
3415         uint32_t *nodes;
3416
3417         for (i=num_nodes=0;i<node_map->num;i++) {
3418                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3419                         continue;
3420                 }
3421                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3422                         continue;
3423                 }
3424                 num_nodes++;
3425         } 
3426
3427         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3428         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3429
3430         for (i=j=0;i<node_map->num;i++) {
3431                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3432                         continue;
3433                 }
3434                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3435                         continue;
3436                 }
3437                 nodes[j++] = node_map->nodes[i].pnn;
3438         } 
3439
3440         return nodes;
3441 }
3442
3443 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3444                                 struct ctdb_node_map *node_map,
3445                                 TALLOC_CTX *mem_ctx,
3446                                 uint32_t pnn)
3447 {
3448         int i, j, num_nodes;
3449         uint32_t *nodes;
3450
3451         for (i=num_nodes=0;i<node_map->num;i++) {
3452                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3453                         continue;
3454                 }
3455                 if (node_map->nodes[i].pnn == pnn) {
3456                         continue;
3457                 }
3458                 num_nodes++;
3459         } 
3460
3461         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3462         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3463
3464         for (i=j=0;i<node_map->num;i++) {
3465                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3466                         continue;
3467                 }
3468                 if (node_map->nodes[i].pnn == pnn) {
3469                         continue;
3470                 }
3471                 nodes[j++] = node_map->nodes[i].pnn;
3472         } 
3473
3474         return nodes;
3475 }
3476
3477 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3478                                 struct ctdb_node_map *node_map,
3479                                 TALLOC_CTX *mem_ctx,
3480                                 bool include_self)
3481 {
3482         int i, j, num_nodes;
3483         uint32_t *nodes;
3484
3485         for (i=num_nodes=0;i<node_map->num;i++) {
3486                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3487                         continue;
3488                 }
3489                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3490                         continue;
3491                 }
3492                 num_nodes++;
3493         } 
3494
3495         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3496         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3497
3498         for (i=j=0;i<node_map->num;i++) {
3499                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3500                         continue;
3501                 }
3502                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3503                         continue;
3504                 }
3505                 nodes[j++] = node_map->nodes[i].pnn;
3506         } 
3507
3508         return nodes;
3509 }
3510
3511 /* 
3512   this is used to test if a pnn lock exists and if it exists will return
3513   the number of connections that pnn has reported or -1 if that recovery
3514   daemon is not running.
3515 */
3516 int
3517 ctdb_read_pnn_lock(int fd, int32_t pnn)
3518 {
3519         struct flock lock;
3520         char c;
3521
3522         lock.l_type = F_WRLCK;
3523         lock.l_whence = SEEK_SET;
3524         lock.l_start = pnn;
3525         lock.l_len = 1;
3526         lock.l_pid = 0;
3527
3528         if (fcntl(fd, F_GETLK, &lock) != 0) {
3529                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3530                 return -1;
3531         }
3532
3533         if (lock.l_type == F_UNLCK) {
3534                 return -1;
3535         }
3536
3537         if (pread(fd, &c, 1, pnn) == -1) {
3538                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3539                 return -1;
3540         }
3541
3542         return c;
3543 }
3544
3545 /*
3546   get capabilities of a remote node
3547  */
3548 struct ctdb_client_control_state *
3549 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3550 {
3551         return ctdb_control_send(ctdb, destnode, 0, 
3552                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3553                            mem_ctx, &timeout, NULL);
3554 }
3555
3556 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3557 {
3558         int ret;
3559         int32_t res;
3560         TDB_DATA outdata;
3561
3562         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3563         if ( (ret != 0) || (res != 0) ) {
3564                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3565                 return -1;
3566         }
3567
3568         if (capabilities) {
3569                 *capabilities = *((uint32_t *)outdata.dptr);
3570         }
3571
3572         return 0;
3573 }
3574
3575 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3576 {
3577         struct ctdb_client_control_state *state;
3578         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3579         int ret;
3580
3581         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3582         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3583         talloc_free(tmp_ctx);
3584         return ret;
3585 }
3586
3587 /**
3588  * check whether a transaction is active on a given db on a given node
3589  */
3590 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3591                                      uint32_t destnode,
3592                                      uint32_t db_id)
3593 {
3594         int32_t status;
3595         int ret;
3596         TDB_DATA indata;
3597
3598         indata.dptr = (uint8_t *)&db_id;
3599         indata.dsize = sizeof(db_id);
3600
3601         ret = ctdb_control(ctdb, destnode, 0,
3602                            CTDB_CONTROL_TRANS2_ACTIVE,
3603                            0, indata, NULL, NULL, &status,
3604                            NULL, NULL);
3605
3606         if (ret != 0) {
3607                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3608                 return -1;
3609         }
3610
3611         return status;
3612 }
3613
3614
3615 struct ctdb_transaction_handle {
3616         struct ctdb_db_context *ctdb_db;
3617         bool in_replay;
3618         /*
3619          * we store the reads and writes done under a transaction:
3620          * - one list stores both reads and writes (m_all),
3621          * - the other just writes (m_write)
3622          */
3623         struct ctdb_marshall_buffer *m_all;
3624         struct ctdb_marshall_buffer *m_write;
3625 };
3626
3627 /* start a transaction on a database */
3628 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3629 {
3630         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3631         return 0;
3632 }
3633
3634 /* start a transaction on a database */
3635 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3636 {
3637         struct ctdb_record_handle *rh;
3638         TDB_DATA key;
3639         TDB_DATA data;
3640         struct ctdb_ltdb_header header;
3641         TALLOC_CTX *tmp_ctx;
3642         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3643         int ret;
3644         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3645         pid_t pid;
3646         int32_t status;
3647
3648         key.dptr = discard_const(keyname);
3649         key.dsize = strlen(keyname);
3650
3651         if (!ctdb_db->persistent) {
3652                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3653                 return -1;
3654         }
3655
3656 again:
3657         tmp_ctx = talloc_new(h);
3658
3659         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3660         if (rh == NULL) {
3661                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3662                 talloc_free(tmp_ctx);
3663                 return -1;
3664         }
3665
3666         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3667                                               CTDB_CURRENT_NODE,
3668                                               ctdb_db->db_id);
3669         if (status == 1) {
3670                 unsigned long int usec = (1000 + random()) % 100000;
3671                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3672                                     "on db_id[0x%08x]. waiting for %lu "
3673                                     "microseconds\n",
3674                                     ctdb_db->db_id, usec));
3675                 talloc_free(tmp_ctx);
3676                 usleep(usec);
3677                 goto again;
3678         }
3679
3680         /*
3681          * store the pid in the database:
3682          * it is not enough that the node is dmaster...
3683          */
3684         pid = getpid();
3685         data.dptr = (unsigned char *)&pid;
3686         data.dsize = sizeof(pid_t);
3687         rh->header.rsn++;
3688         rh->header.dmaster = ctdb_db->ctdb->pnn;
3689         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3690         if (ret != 0) {
3691                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3692                                   "transaction record\n"));
3693                 talloc_free(tmp_ctx);
3694                 return -1;
3695         }
3696
3697         talloc_free(rh);
3698
3699         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3700         if (ret != 0) {
3701                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3702                 talloc_free(tmp_ctx);
3703                 return -1;
3704         }
3705
3706         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3707         if (ret != 0) {
3708                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3709                                  "lock record inside transaction\n"));
3710                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3711                 talloc_free(tmp_ctx);
3712                 goto again;
3713         }
3714
3715         if (header.dmaster != ctdb_db->ctdb->pnn) {
3716                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3717                                    "transaction lock record\n"));
3718                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3719                 talloc_free(tmp_ctx);
3720                 goto again;
3721         }
3722
3723         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3724                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3725                                     "the transaction lock record\n"));
3726                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3727                 talloc_free(tmp_ctx);
3728                 goto again;
3729         }
3730
3731         talloc_free(tmp_ctx);
3732
3733         return 0;
3734 }
3735
3736
3737 /* start a transaction on a database */
3738 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3739                                                        TALLOC_CTX *mem_ctx)
3740 {
3741         struct ctdb_transaction_handle *h;
3742         int ret;
3743
3744         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3745         if (h == NULL) {
3746                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3747                 return NULL;
3748         }
3749
3750         h->ctdb_db = ctdb_db;
3751
3752         ret = ctdb_transaction_fetch_start(h);
3753         if (ret != 0) {
3754                 talloc_free(h);
3755                 return NULL;
3756         }
3757
3758         talloc_set_destructor(h, ctdb_transaction_destructor);
3759
3760         return h;
3761 }
3762
3763
3764
3765 /*
3766   fetch a record inside a transaction
3767  */
3768 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3769                            TALLOC_CTX *mem_ctx, 
3770                            TDB_DATA key, TDB_DATA *data)
3771 {
3772         struct ctdb_ltdb_header header;
3773         int ret;
3774
3775         ZERO_STRUCT(header);
3776
3777         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3778         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3779                 /* record doesn't exist yet */
3780                 *data = tdb_null;
3781                 ret = 0;
3782         }
3783         
3784         if (ret != 0) {
3785                 return ret;
3786         }
3787
3788         if (!h->in_replay) {
3789                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3790                 if (h->m_all == NULL) {
3791                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3792                         return -1;
3793                 }
3794         }
3795
3796         return 0;
3797 }
3798
3799 /*
3800   stores a record inside a transaction
3801  */
3802 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3803                            TDB_DATA key, TDB_DATA data)
3804 {
3805         TALLOC_CTX *tmp_ctx = talloc_new(h);
3806         struct ctdb_ltdb_header header;
3807         TDB_DATA olddata;
3808         int ret;
3809
3810         ZERO_STRUCT(header);
3811
3812         /* we need the header so we can update the RSN */
3813         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3814         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3815                 /* the record doesn't exist - create one with us as dmaster.
3816                    This is only safe because we are in a transaction and this
3817                    is a persistent database */
3818                 ZERO_STRUCT(header);
3819         } else if (ret != 0) {
3820                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3821                 talloc_free(tmp_ctx);
3822                 return ret;
3823         }
3824
3825         if (data.dsize == olddata.dsize &&
3826             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3827                 /* save writing the same data */
3828                 talloc_free(tmp_ctx);
3829                 return 0;
3830         }
3831
3832         header.dmaster = h->ctdb_db->ctdb->pnn;
3833         header.rsn++;
3834
3835         if (!h->in_replay) {
3836                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3837                 if (h->m_all == NULL) {
3838                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3839                         talloc_free(tmp_ctx);
3840                         return -1;
3841                 }
3842         }               
3843
3844         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3845         if (h->m_write == NULL) {
3846                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3847                 talloc_free(tmp_ctx);
3848                 return -1;
3849         }
3850         
3851         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3852
3853         talloc_free(tmp_ctx);
3854         
3855         return ret;
3856 }
3857
3858 /*
3859   replay a transaction
3860  */
3861 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3862 {
3863         int ret, i;
3864         struct ctdb_rec_data *rec = NULL;
3865
3866         h->in_replay = true;
3867         talloc_free(h->m_write);
3868         h->m_write = NULL;
3869
3870         ret = ctdb_transaction_fetch_start(h);
3871         if (ret != 0) {
3872                 return ret;
3873         }
3874
3875         for (i=0;i<h->m_all->count;i++) {
3876                 TDB_DATA key, data;
3877
3878                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3879                 if (rec == NULL) {
3880                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3881                         goto failed;
3882                 }
3883
3884                 if (rec->reqid == 0) {
3885                         /* its a store */
3886                         if (ctdb_transaction_store(h, key, data) != 0) {
3887                                 goto failed;
3888                         }
3889                 } else {
3890                         TDB_DATA data2;
3891                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3892
3893                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3894                                 talloc_free(tmp_ctx);
3895                                 goto failed;
3896                         }
3897                         if (data2.dsize != data.dsize ||
3898                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3899                                 /* the record has changed on us - we have to give up */
3900                                 talloc_free(tmp_ctx);
3901                                 goto failed;
3902                         }
3903                         talloc_free(tmp_ctx);
3904                 }
3905         }
3906         
3907         return 0;
3908
3909 failed:
3910         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3911         return -1;
3912 }
3913
3914
3915 /*
3916   commit a transaction
3917  */
3918 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3919 {
3920         int ret, retries=0;
3921         int32_t status;
3922         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3923         struct timeval timeout;
3924         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3925
3926         talloc_set_destructor(h, NULL);
3927
3928         /* our commit strategy is quite complex.
3929
3930            - we first try to commit the changes to all other nodes
3931
3932            - if that works, then we commit locally and we are done
3933
3934            - if a commit on another node fails, then we need to cancel
3935              the transaction, then restart the transaction (thus
3936              opening a window of time for a pending recovery to
3937              complete), then replay the transaction, checking all the
3938              reads and writes (checking that reads give the same data,
3939              and writes succeed). Then we retry the transaction to the
3940              other nodes
3941         */
3942
3943 again:
3944         if (h->m_write == NULL) {
3945                 /* no changes were made */
3946                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3947                 talloc_free(h);
3948                 return 0;
3949         }
3950
3951         /* tell ctdbd to commit to the other nodes */
3952         timeout = timeval_current_ofs(1, 0);
3953         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3954                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3955                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3956                            &timeout, NULL);
3957         if (ret != 0 || status != 0) {
3958                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3959                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3960                                      ", retrying after 1 second...\n",
3961                                      (retries==0)?"":"retry "));
3962                 sleep(1);
3963
3964                 if (ret != 0) {
3965                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3966                 } else {
3967                         /* work out what error code we will give if we 
3968                            have to fail the operation */
3969                         switch ((enum ctdb_trans2_commit_error)status) {
3970                         case CTDB_TRANS2_COMMIT_SUCCESS:
3971                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3972                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3973                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3974                                 break;
3975                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3976                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3977                                 break;
3978                         }
3979                 }
3980
3981                 if (++retries == 100) {
3982                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3983                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3984                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3985                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3986                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3987                         talloc_free(h);
3988                         return -1;
3989                 }               
3990
3991                 if (ctdb_replay_transaction(h) != 0) {
3992                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3993                                           "transaction on db 0x%08x, "
3994                                           "failure control =%u\n",
3995                                           h->ctdb_db->db_id,
3996                                           (unsigned)failure_control));
3997                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3998                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3999                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4000                         talloc_free(h);
4001                         return -1;
4002                 }
4003                 goto again;
4004         } else {
4005                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4006         }
4007
4008         /* do the real commit locally */
4009         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4010         if (ret != 0) {
4011                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4012                                   "on db id 0x%08x locally, "
4013                                   "failure_control=%u\n",
4014                                   h->ctdb_db->db_id,
4015                                   (unsigned)failure_control));
4016                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4017                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4018                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4019                 talloc_free(h);
4020                 return ret;
4021         }
4022
4023         /* tell ctdbd that we are finished with our local commit */
4024         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4025                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4026                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4027         talloc_free(h);
4028         return 0;
4029 }
4030
4031 /*
4032   recovery daemon ping to main daemon
4033  */
4034 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4035 {
4036         int ret;
4037         int32_t res;
4038
4039         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4040                            ctdb, NULL, &res, NULL, NULL);
4041         if (ret != 0 || res != 0) {
4042                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4043                 return -1;
4044         }
4045
4046         return 0;
4047 }
4048
4049 /* when forking the main daemon and the child process needs to connect back
4050  * to the daemon as a client process, this function can be used to change
4051  * the ctdb context from daemon into client mode
4052  */
4053 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4054 {
4055         int ret;
4056         va_list ap;
4057
4058         /* Add extra information so we can identify this in the logs */
4059         va_start(ap, fmt);
4060         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4061         va_end(ap);
4062
4063         /* shutdown the transport */
4064         if (ctdb->methods) {
4065                 ctdb->methods->shutdown(ctdb);
4066         }
4067
4068         /* get a new event context */
4069         talloc_free(ctdb->ev);
4070         ctdb->ev = event_context_init(ctdb);
4071         tevent_loop_allow_nesting(ctdb->ev);
4072
4073         close(ctdb->daemon.sd);
4074         ctdb->daemon.sd = -1;
4075
4076         /* the client does not need to be realtime */
4077         if (ctdb->do_setsched) {
4078                 ctdb_restore_scheduler(ctdb);
4079         }
4080
4081         /* initialise ctdb */
4082         ret = ctdb_socket_connect(ctdb);
4083         if (ret != 0) {
4084                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4085                 return -1;
4086         }
4087
4088          return 0;
4089 }
4090
4091 /*
4092   get the status of running the monitor eventscripts: NULL means never run.
4093  */
4094 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4095                 struct timeval timeout, uint32_t destnode, 
4096                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4097                 struct ctdb_scripts_wire **scripts)
4098 {
4099         int ret;
4100         TDB_DATA outdata, indata;
4101         int32_t res;
4102         uint32_t uinttype = type;
4103
4104         indata.dptr = (uint8_t *)&uinttype;
4105         indata.dsize = sizeof(uinttype);
4106
4107         ret = ctdb_control(ctdb, destnode, 0, 
4108                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4109                            mem_ctx, &outdata, &res, &timeout, NULL);
4110         if (ret != 0 || res != 0) {
4111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4112                 return -1;
4113         }
4114
4115         if (outdata.dsize == 0) {
4116                 *scripts = NULL;
4117         } else {
4118                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4119                 talloc_free(outdata.dptr);
4120         }
4121                     
4122         return 0;
4123 }
4124
4125 /*
4126   tell the main daemon how long it took to lock the reclock file
4127  */
4128 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4129 {
4130         int ret;
4131         int32_t res;
4132         TDB_DATA data;
4133
4134         data.dptr = (uint8_t *)&latency;
4135         data.dsize = sizeof(latency);
4136
4137         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4138                            ctdb, NULL, &res, NULL, NULL);
4139         if (ret != 0 || res != 0) {
4140                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4141                 return -1;
4142         }
4143
4144         return 0;
4145 }
4146
4147 /*
4148   get the name of the reclock file
4149  */
4150 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4151                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4152                          const char **name)
4153 {
4154         int ret;
4155         int32_t res;
4156         TDB_DATA data;
4157
4158         ret = ctdb_control(ctdb, destnode, 0, 
4159                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4160                            mem_ctx, &data, &res, &timeout, NULL);
4161         if (ret != 0 || res != 0) {
4162                 return -1;
4163         }
4164
4165         if (data.dsize == 0) {
4166                 *name = NULL;
4167         } else {
4168                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4169         }
4170         talloc_free(data.dptr);
4171
4172         return 0;
4173 }
4174
4175 /*
4176   set the reclock filename for a node
4177  */
4178 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4179 {
4180         int ret;
4181         TDB_DATA data;
4182         int32_t res;
4183
4184         if (reclock == NULL) {
4185                 data.dsize = 0;
4186                 data.dptr  = NULL;
4187         } else {
4188                 data.dsize = strlen(reclock) + 1;
4189                 data.dptr  = discard_const(reclock);
4190         }
4191
4192         ret = ctdb_control(ctdb, destnode, 0, 
4193                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4194                            NULL, NULL, &res, &timeout, NULL);
4195         if (ret != 0 || res != 0) {
4196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4197                 return -1;
4198         }
4199
4200         return 0;
4201 }
4202
4203 /*
4204   stop a node
4205  */
4206 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4207 {
4208         int ret;
4209         int32_t res;
4210
4211         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4212                            ctdb, NULL, &res, &timeout, NULL);
4213         if (ret != 0 || res != 0) {
4214                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4215                 return -1;
4216         }
4217
4218         return 0;
4219 }
4220
4221 /*
4222   continue a node
4223  */
4224 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4225 {
4226         int ret;
4227
4228         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4229                            ctdb, NULL, NULL, &timeout, NULL);
4230         if (ret != 0) {
4231                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4232                 return -1;
4233         }
4234
4235         return 0;
4236 }
4237
4238 /*
4239   set the natgw state for a node
4240  */
4241 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4242 {
4243         int ret;
4244         TDB_DATA data;
4245         int32_t res;
4246
4247         data.dsize = sizeof(natgwstate);
4248         data.dptr  = (uint8_t *)&natgwstate;
4249
4250         ret = ctdb_control(ctdb, destnode, 0, 
4251                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4252                            NULL, NULL, &res, &timeout, NULL);
4253         if (ret != 0 || res != 0) {
4254                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4255                 return -1;
4256         }
4257
4258         return 0;
4259 }
4260
4261 /*
4262   set the lmaster role for a node
4263  */
4264 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4265 {
4266         int ret;
4267         TDB_DATA data;
4268         int32_t res;
4269
4270         data.dsize = sizeof(lmasterrole);
4271         data.dptr  = (uint8_t *)&lmasterrole;
4272
4273         ret = ctdb_control(ctdb, destnode, 0, 
4274                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4275                            NULL, NULL, &res, &timeout, NULL);
4276         if (ret != 0 || res != 0) {
4277                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4278                 return -1;
4279         }
4280
4281         return 0;
4282 }
4283
4284 /*
4285   set the recmaster role for a node
4286  */
4287 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4288 {
4289         int ret;
4290         TDB_DATA data;
4291         int32_t res;
4292
4293         data.dsize = sizeof(recmasterrole);
4294         data.dptr  = (uint8_t *)&recmasterrole;
4295
4296         ret = ctdb_control(ctdb, destnode, 0, 
4297                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4298                            NULL, NULL, &res, &timeout, NULL);
4299         if (ret != 0 || res != 0) {
4300                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4301                 return -1;
4302         }
4303
4304         return 0;
4305 }
4306
4307 /* enable an eventscript
4308  */
4309 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4310 {
4311         int ret;
4312         TDB_DATA data;
4313         int32_t res;
4314
4315         data.dsize = strlen(script) + 1;
4316         data.dptr  = discard_const(script);
4317
4318         ret = ctdb_control(ctdb, destnode, 0, 
4319                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4320                            NULL, NULL, &res, &timeout, NULL);
4321         if (ret != 0 || res != 0) {
4322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4323                 return -1;
4324         }
4325
4326         return 0;
4327 }
4328
4329 /* disable an eventscript
4330  */
4331 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4332 {
4333         int ret;
4334         TDB_DATA data;
4335         int32_t res;
4336
4337         data.dsize = strlen(script) + 1;
4338         data.dptr  = discard_const(script);
4339
4340         ret = ctdb_control(ctdb, destnode, 0, 
4341                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4342                            NULL, NULL, &res, &timeout, NULL);
4343         if (ret != 0 || res != 0) {
4344                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4345                 return -1;
4346         }
4347
4348         return 0;
4349 }
4350
4351
4352 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4353 {
4354         int ret;
4355         TDB_DATA data;
4356         int32_t res;
4357
4358         data.dsize = sizeof(*bantime);
4359         data.dptr  = (uint8_t *)bantime;
4360
4361         ret = ctdb_control(ctdb, destnode, 0, 
4362                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4363                            NULL, NULL, &res, &timeout, NULL);
4364         if (ret != 0 || res != 0) {
4365                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4366                 return -1;
4367         }
4368
4369         return 0;
4370 }
4371
4372
4373 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4374 {
4375         int ret;
4376         TDB_DATA outdata;
4377         int32_t res;
4378         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4379
4380         ret = ctdb_control(ctdb, destnode, 0, 
4381                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4382                            tmp_ctx, &outdata, &res, &timeout, NULL);
4383         if (ret != 0 || res != 0) {
4384                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4385                 talloc_free(tmp_ctx);
4386                 return -1;
4387         }
4388
4389         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4390         talloc_free(tmp_ctx);
4391
4392         return 0;
4393 }
4394
4395
4396 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4397 {
4398         int ret;
4399         int32_t res;
4400         TDB_DATA data;
4401         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4402
4403         data.dptr = (uint8_t*)db_prio;
4404         data.dsize = sizeof(*db_prio);
4405
4406         ret = ctdb_control(ctdb, destnode, 0, 
4407                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4408                            tmp_ctx, NULL, &res, &timeout, NULL);
4409         if (ret != 0 || res != 0) {
4410                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4411                 talloc_free(tmp_ctx);
4412                 return -1;
4413         }
4414
4415         talloc_free(tmp_ctx);
4416
4417         return 0;
4418 }
4419
4420 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4421 {
4422         int ret;
4423         int32_t res;
4424         TDB_DATA data;
4425         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4426
4427         data.dptr = (uint8_t*)&db_id;
4428         data.dsize = sizeof(db_id);
4429
4430         ret = ctdb_control(ctdb, destnode, 0, 
4431                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4432                            tmp_ctx, NULL, &res, &timeout, NULL);
4433         if (ret != 0 || res < 0) {
4434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4435                 talloc_free(tmp_ctx);
4436                 return -1;
4437         }
4438
4439         if (priority) {
4440                 *priority = res;
4441         }
4442
4443         talloc_free(tmp_ctx);
4444
4445         return 0;
4446 }
4447
4448 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4449 {
4450         int ret;
4451         TDB_DATA outdata;
4452         int32_t res;
4453
4454         ret = ctdb_control(ctdb, destnode, 0, 
4455                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4456                            mem_ctx, &outdata, &res, &timeout, NULL);
4457         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4459                 return -1;
4460         }
4461
4462         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4463         talloc_free(outdata.dptr);
4464                     
4465         return 0;
4466 }
4467
4468 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4469 {
4470         if (h == NULL) {
4471                 return NULL;
4472         }
4473
4474         return &h->header;
4475 }
4476
4477
4478 struct ctdb_client_control_state *
4479 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4480 {
4481         struct ctdb_client_control_state *handle;
4482         struct ctdb_marshall_buffer *m;
4483         struct ctdb_rec_data *rec;
4484         TDB_DATA outdata;
4485
4486         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4487         if (m == NULL) {
4488                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4489                 return NULL;
4490         }
4491
4492         m->db_id = ctdb_db->db_id;
4493
4494         rec = ctdb_marshall_record(m, 0, key, header, data);
4495         if (rec == NULL) {
4496                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4497                 talloc_free(m);
4498                 return NULL;
4499         }
4500         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4501         if (m == NULL) {
4502                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4503                 talloc_free(m);
4504                 return NULL;
4505         }
4506         m->count++;
4507         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4508
4509
4510         outdata.dptr = (uint8_t *)m;
4511         outdata.dsize = talloc_get_size(m);
4512
4513         handle = ctdb_control_send(ctdb, destnode, 0, 
4514                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4515                            mem_ctx, &timeout, NULL);
4516         talloc_free(m);
4517         return handle;
4518 }
4519
4520 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4521 {
4522         int ret;
4523         int32_t res;
4524
4525         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4526         if ( (ret != 0) || (res != 0) ){
4527                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4528                 return -1;
4529         }
4530
4531         return 0;
4532 }
4533
4534 int
4535 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4536 {
4537         struct ctdb_client_control_state *state;
4538
4539         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4540         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4541 }
4542
4543
4544
4545
4546
4547
4548 /*
4549   set a database to be readonly
4550  */
4551 struct ctdb_client_control_state *
4552 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4553 {
4554         TDB_DATA data;
4555
4556         data.dptr = (uint8_t *)&dbid;
4557         data.dsize = sizeof(dbid);
4558
4559         return ctdb_control_send(ctdb, destnode, 0, 
4560                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4561                            ctdb, NULL, NULL);
4562 }
4563
4564 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4565 {
4566         int ret;
4567         int32_t res;
4568
4569         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4570         if (ret != 0 || res != 0) {
4571           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4572                 return -1;
4573         }
4574
4575         return 0;
4576 }
4577
4578 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4579 {
4580         struct ctdb_client_control_state *state;
4581
4582         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4583         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4584 }