Merge remote branch 'martins/ctdb_control_oom'
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, bool updatetdb)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data && updatetdb) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return call->status;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
391         if (ret != 0) {
392                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
393         }
394
395         return state;
396 }
397
398 /*
399   make a ctdb call to the local daemon - async send. Called from client context.
400
401   This constructs a ctdb_call request and queues it for processing. 
402   This call never blocks.
403 */
404 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
405                                               struct ctdb_call *call)
406 {
407         struct ctdb_client_call_state *state;
408         struct ctdb_context *ctdb = ctdb_db->ctdb;
409         struct ctdb_ltdb_header header;
410         TDB_DATA data;
411         int ret;
412         size_t len;
413         struct ctdb_req_call *c;
414
415         /* if the domain socket is not yet open, open it */
416         if (ctdb->daemon.sd==-1) {
417                 ctdb_socket_connect(ctdb);
418         }
419
420         ret = ctdb_ltdb_lock(ctdb_db, call->key);
421         if (ret != 0) {
422                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
423                 return NULL;
424         }
425
426         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
427
428         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
429                 ret = -1;
430         }
431
432         if (ret == 0 && header.dmaster == ctdb->pnn) {
433                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
434                 talloc_free(data.dptr);
435                 ctdb_ltdb_unlock(ctdb_db, call->key);
436                 return state;
437         }
438
439         ctdb_ltdb_unlock(ctdb_db, call->key);
440         talloc_free(data.dptr);
441
442         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
443         if (state == NULL) {
444                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
445                 return NULL;
446         }
447         state->call = talloc_zero(state, struct ctdb_call);
448         if (state->call == NULL) {
449                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
450                 return NULL;
451         }
452
453         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
454         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
455         if (c == NULL) {
456                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
457                 return NULL;
458         }
459
460         state->reqid     = ctdb_reqid_new(ctdb, state);
461         state->ctdb_db = ctdb_db;
462         talloc_set_destructor(state, ctdb_client_call_destructor);
463
464         c->hdr.reqid     = state->reqid;
465         c->flags         = call->flags;
466         c->db_id         = ctdb_db->db_id;
467         c->callid        = call->call_id;
468         c->hopcount      = 0;
469         c->keylen        = call->key.dsize;
470         c->calldatalen   = call->call_data.dsize;
471         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
472         memcpy(&c->data[call->key.dsize], 
473                call->call_data.dptr, call->call_data.dsize);
474         *(state->call)              = *call;
475         state->call->call_data.dptr = &c->data[call->key.dsize];
476         state->call->key.dptr       = &c->data[0];
477
478         state->state  = CTDB_CALL_WAIT;
479
480
481         ctdb_client_queue_pkt(ctdb, &c->hdr);
482
483         return state;
484 }
485
486
487 /*
488   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
489 */
490 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
491 {
492         struct ctdb_client_call_state *state;
493
494         state = ctdb_call_send(ctdb_db, call);
495         return ctdb_call_recv(state, call);
496 }
497
498
499 /*
500   tell the daemon what messaging srvid we will use, and register the message
501   handler function in the client
502 */
503 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
504                              ctdb_msg_fn_t handler,
505                              void *private_data)
506                                     
507 {
508         int res;
509         int32_t status;
510         
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
512                            tdb_null, NULL, NULL, &status, NULL, NULL);
513         if (res != 0 || status != 0) {
514                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
515                 return -1;
516         }
517
518         /* also need to register the handler with our own ctdb structure */
519         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
520 }
521
522 /*
523   tell the daemon we no longer want a srvid
524 */
525 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
526 {
527         int res;
528         int32_t status;
529         
530         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
531                            tdb_null, NULL, NULL, &status, NULL, NULL);
532         if (res != 0 || status != 0) {
533                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
534                 return -1;
535         }
536
537         /* also need to register the handler with our own ctdb structure */
538         ctdb_deregister_message_handler(ctdb, srvid, private_data);
539         return 0;
540 }
541
542
543 /*
544   send a message - from client context
545  */
546 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
547                       uint64_t srvid, TDB_DATA data)
548 {
549         struct ctdb_req_message *r;
550         int len, res;
551
552         len = offsetof(struct ctdb_req_message, data) + data.dsize;
553         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
554                                len, struct ctdb_req_message);
555         CTDB_NO_MEMORY(ctdb, r);
556
557         r->hdr.destnode  = pnn;
558         r->srvid         = srvid;
559         r->datalen       = data.dsize;
560         memcpy(&r->data[0], data.dptr, data.dsize);
561         
562         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
563         if (res != 0) {
564                 return res;
565         }
566
567         talloc_free(r);
568         return 0;
569 }
570
571
572 /*
573   cancel a ctdb_fetch_lock operation, releasing the lock
574  */
575 static int fetch_lock_destructor(struct ctdb_record_handle *h)
576 {
577         ctdb_ltdb_unlock(h->ctdb_db, h->key);
578         return 0;
579 }
580
581 /*
582   force the migration of a record to this node
583  */
584 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
585 {
586         struct ctdb_call call;
587         ZERO_STRUCT(call);
588         call.call_id = CTDB_NULL_FUNC;
589         call.key = key;
590         call.flags = CTDB_IMMEDIATE_MIGRATION;
591         return ctdb_call(ctdb_db, &call);
592 }
593
594 /*
595   try to fetch a readonly copy of a record
596  */
597 static int
598 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
599 {
600         int ret;
601
602         struct ctdb_call call;
603         ZERO_STRUCT(call);
604
605         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
606         call.call_data.dptr = NULL;
607         call.call_data.dsize = 0;
608         call.key = key;
609         call.flags = CTDB_WANT_READONLY;
610         ret = ctdb_call(ctdb_db, &call);
611
612         if (ret != 0) {
613                 return -1;
614         }
615         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
616                 return -1;
617         }
618
619         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
620         if (*hdr == NULL) {
621                 talloc_free(call.reply_data.dptr);
622                 return -1;
623         }
624
625         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
626         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
627         if (data->dptr == NULL) {
628                 talloc_free(call.reply_data.dptr);
629                 talloc_free(hdr);
630                 return -1;
631         }
632
633         return 0;
634 }
635
636 /*
637   get a lock on a record, and return the records data. Blocks until it gets the lock
638  */
639 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
640                                            TDB_DATA key, TDB_DATA *data)
641 {
642         int ret;
643         struct ctdb_record_handle *h;
644
645         /*
646           procedure is as follows:
647
648           1) get the chain lock. 
649           2) check if we are dmaster
650           3) if we are the dmaster then return handle 
651           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
652              reply from ctdbd
653           5) when we get the reply, goto (1)
654          */
655
656         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
657         if (h == NULL) {
658                 return NULL;
659         }
660
661         h->ctdb_db = ctdb_db;
662         h->key     = key;
663         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
664         if (h->key.dptr == NULL) {
665                 talloc_free(h);
666                 return NULL;
667         }
668         h->data    = data;
669
670         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
671                  (const char *)key.dptr));
672
673 again:
674         /* step 1 - get the chain lock */
675         ret = ctdb_ltdb_lock(ctdb_db, key);
676         if (ret != 0) {
677                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
678                 talloc_free(h);
679                 return NULL;
680         }
681
682         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
683
684         talloc_set_destructor(h, fetch_lock_destructor);
685
686         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
687
688         /* when torturing, ensure we test the remote path */
689         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
690             random() % 5 == 0) {
691                 h->header.dmaster = (uint32_t)-1;
692         }
693
694
695         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
696
697         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
698                 ctdb_ltdb_unlock(ctdb_db, key);
699                 ret = ctdb_client_force_migration(ctdb_db, key);
700                 if (ret != 0) {
701                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
702                         talloc_free(h);
703                         return NULL;
704                 }
705                 goto again;
706         }
707
708         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
709         return h;
710 }
711
712 /*
713   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
714  */
715 struct ctdb_record_handle *
716 ctdb_fetch_readonly_lock(
717         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
718         TDB_DATA key, TDB_DATA *data,
719         int read_only)
720 {
721         int ret;
722         struct ctdb_record_handle *h;
723         struct ctdb_ltdb_header *roheader = NULL;
724
725         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
726         if (h == NULL) {
727                 return NULL;
728         }
729
730         h->ctdb_db = ctdb_db;
731         h->key     = key;
732         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
733         if (h->key.dptr == NULL) {
734                 talloc_free(h);
735                 return NULL;
736         }
737         h->data    = data;
738
739         data->dptr = NULL;
740         data->dsize = 0;
741
742
743 again:
744         talloc_free(roheader);
745         roheader = NULL;
746
747         talloc_free(data->dptr);
748         data->dptr = NULL;
749         data->dsize = 0;
750
751         /* Lock the record/chain */
752         ret = ctdb_ltdb_lock(ctdb_db, key);
753         if (ret != 0) {
754                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
755                 talloc_free(h);
756                 return NULL;
757         }
758
759         talloc_set_destructor(h, fetch_lock_destructor);
760
761         /* Check if record exists yet in the TDB */
762         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
763         if (ret != 0) {
764                 ctdb_ltdb_unlock(ctdb_db, key);
765                 ret = ctdb_client_force_migration(ctdb_db, key);
766                 if (ret != 0) {
767                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
768                         talloc_free(h);
769                         return NULL;
770                 }
771                 goto again;
772         }
773
774         /* if this is a request for read/write and we have delegations
775            we have to revoke all delegations first
776         */
777         if ((read_only == 0) 
778         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
779         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
780                 ctdb_ltdb_unlock(ctdb_db, key);
781                 ret = ctdb_client_force_migration(ctdb_db, key);
782                 if (ret != 0) {
783                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
784                         talloc_free(h);
785                         return NULL;
786                 }
787                 goto again;
788         }
789
790         /* if we are dmaster, just return the handle */
791         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
792                 return h;
793         }
794
795         if (read_only != 0) {
796                 TDB_DATA rodata = {NULL, 0};
797
798                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
799                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
800                         return h;
801                 }
802
803                 ctdb_ltdb_unlock(ctdb_db, key);
804                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
805                 if (ret != 0) {
806                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
807                         ret = ctdb_client_force_migration(ctdb_db, key);
808                         if (ret != 0) {
809                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
810                                 talloc_free(h);
811                                 return NULL;
812                         }
813
814                         goto again;
815                 }
816
817                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
818                         ret = ctdb_client_force_migration(ctdb_db, key);
819                         if (ret != 0) {
820                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
821                                 talloc_free(h);
822                                 return NULL;
823                         }
824
825                         goto again;
826                 }
827
828                 ret = ctdb_ltdb_lock(ctdb_db, key);
829                 if (ret != 0) {
830                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
831                         talloc_free(h);
832                         return NULL;
833                 }
834
835                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
836                 if (ret != 0) {
837                         ctdb_ltdb_unlock(ctdb_db, key);
838
839                         ret = ctdb_client_force_migration(ctdb_db, key);
840                         if (ret != 0) {
841                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
842                                 talloc_free(h);
843                                 return NULL;
844                         }
845
846                         goto again;
847                 }
848
849                 return h;
850         }
851
852         /* we are not dmaster and this was not a request for a readonly lock
853          * so unlock the record, migrate it and try again
854          */
855         ctdb_ltdb_unlock(ctdb_db, key);
856         ret = ctdb_client_force_migration(ctdb_db, key);
857         if (ret != 0) {
858                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
859                 talloc_free(h);
860                 return NULL;
861         }
862         goto again;
863 }
864
865 /*
866   store some data to the record that was locked with ctdb_fetch_lock()
867 */
868 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
869 {
870         if (h->ctdb_db->persistent) {
871                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
872                 return -1;
873         }
874
875         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
876 }
877
878 /*
879   non-locking fetch of a record
880  */
881 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
882                TDB_DATA key, TDB_DATA *data)
883 {
884         struct ctdb_call call;
885         int ret;
886
887         call.call_id = CTDB_FETCH_FUNC;
888         call.call_data.dptr = NULL;
889         call.call_data.dsize = 0;
890         call.key = key;
891
892         ret = ctdb_call(ctdb_db, &call);
893
894         if (ret == 0) {
895                 *data = call.reply_data;
896                 talloc_steal(mem_ctx, data->dptr);
897         }
898
899         return ret;
900 }
901
902
903
904 /*
905    called when a control completes or timesout to invoke the callback
906    function the user provided
907 */
908 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
909         struct timeval t, void *private_data)
910 {
911         struct ctdb_client_control_state *state;
912         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
913         int ret;
914
915         state = talloc_get_type(private_data, struct ctdb_client_control_state);
916         talloc_steal(tmp_ctx, state);
917
918         ret = ctdb_control_recv(state->ctdb, state, state,
919                         NULL, 
920                         NULL, 
921                         NULL);
922         if (ret != 0) {
923                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
924         }
925
926         talloc_free(tmp_ctx);
927 }
928
929 /*
930   called when a CTDB_REPLY_CONTROL packet comes in in the client
931
932   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
933   contains any reply data from the control
934 */
935 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
936                                       struct ctdb_req_header *hdr)
937 {
938         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
939         struct ctdb_client_control_state *state;
940
941         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
942         if (state == NULL) {
943                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
944                 return;
945         }
946
947         if (hdr->reqid != state->reqid) {
948                 /* we found a record  but it was the wrong one */
949                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
950                 return;
951         }
952
953         state->outdata.dptr = c->data;
954         state->outdata.dsize = c->datalen;
955         state->status = c->status;
956         if (c->errorlen) {
957                 state->errormsg = talloc_strndup(state, 
958                                                  (char *)&c->data[c->datalen], 
959                                                  c->errorlen);
960         }
961
962         /* state->outdata now uses resources from c so we dont want c
963            to just dissappear from under us while state is still alive
964         */
965         talloc_steal(state, c);
966
967         state->state = CTDB_CONTROL_DONE;
968
969         /* if we had a callback registered for this control, pull the response
970            and call the callback.
971         */
972         if (state->async.fn) {
973                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
974         }
975 }
976
977
978 /*
979   destroy a ctdb_control in client
980 */
981 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
982 {
983         ctdb_reqid_remove(state->ctdb, state->reqid);
984         return 0;
985 }
986
987
988 /* time out handler for ctdb_control */
989 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
990         struct timeval t, void *private_data)
991 {
992         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
993
994         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
995                          "dstnode:%u\n", state->reqid, state->c->opcode,
996                          state->c->hdr.destnode));
997
998         state->state = CTDB_CONTROL_TIMEOUT;
999
1000         /* if we had a callback registered for this control, pull the response
1001            and call the callback.
1002         */
1003         if (state->async.fn) {
1004                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1005         }
1006 }
1007
1008 /* async version of send control request */
1009 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1010                 uint32_t destnode, uint64_t srvid, 
1011                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1012                 TALLOC_CTX *mem_ctx,
1013                 struct timeval *timeout,
1014                 char **errormsg)
1015 {
1016         struct ctdb_client_control_state *state;
1017         size_t len;
1018         struct ctdb_req_control *c;
1019         int ret;
1020
1021         if (errormsg) {
1022                 *errormsg = NULL;
1023         }
1024
1025         /* if the domain socket is not yet open, open it */
1026         if (ctdb->daemon.sd==-1) {
1027                 ctdb_socket_connect(ctdb);
1028         }
1029
1030         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1031         CTDB_NO_MEMORY_NULL(ctdb, state);
1032
1033         state->ctdb       = ctdb;
1034         state->reqid      = ctdb_reqid_new(ctdb, state);
1035         state->state      = CTDB_CONTROL_WAIT;
1036         state->errormsg   = NULL;
1037
1038         talloc_set_destructor(state, ctdb_client_control_destructor);
1039
1040         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1041         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1042                                len, struct ctdb_req_control);
1043         state->c            = c;        
1044         CTDB_NO_MEMORY_NULL(ctdb, c);
1045         c->hdr.reqid        = state->reqid;
1046         c->hdr.destnode     = destnode;
1047         c->opcode           = opcode;
1048         c->client_id        = 0;
1049         c->flags            = flags;
1050         c->srvid            = srvid;
1051         c->datalen          = data.dsize;
1052         if (data.dsize) {
1053                 memcpy(&c->data[0], data.dptr, data.dsize);
1054         }
1055
1056         /* timeout */
1057         if (timeout && !timeval_is_zero(timeout)) {
1058                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1059         }
1060
1061         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1062         if (ret != 0) {
1063                 talloc_free(state);
1064                 return NULL;
1065         }
1066
1067         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1068                 talloc_free(state);
1069                 return NULL;
1070         }
1071
1072         return state;
1073 }
1074
1075
1076 /* async version of receive control reply */
1077 int ctdb_control_recv(struct ctdb_context *ctdb, 
1078                 struct ctdb_client_control_state *state, 
1079                 TALLOC_CTX *mem_ctx,
1080                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1081 {
1082         TALLOC_CTX *tmp_ctx;
1083
1084         if (status != NULL) {
1085                 *status = -1;
1086         }
1087         if (errormsg != NULL) {
1088                 *errormsg = NULL;
1089         }
1090
1091         if (state == NULL) {
1092                 return -1;
1093         }
1094
1095         /* prevent double free of state */
1096         tmp_ctx = talloc_new(ctdb);
1097         talloc_steal(tmp_ctx, state);
1098
1099         /* loop one event at a time until we either timeout or the control
1100            completes.
1101         */
1102         while (state->state == CTDB_CONTROL_WAIT) {
1103                 event_loop_once(ctdb->ev);
1104         }
1105
1106         if (state->state != CTDB_CONTROL_DONE) {
1107                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1108                 if (state->async.fn) {
1109                         state->async.fn(state);
1110                 }
1111                 talloc_free(tmp_ctx);
1112                 return -1;
1113         }
1114
1115         if (state->errormsg) {
1116                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1117                 if (errormsg) {
1118                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1119                 }
1120                 if (state->async.fn) {
1121                         state->async.fn(state);
1122                 }
1123                 talloc_free(tmp_ctx);
1124                 return -1;
1125         }
1126
1127         if (outdata) {
1128                 *outdata = state->outdata;
1129                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1130         }
1131
1132         if (status) {
1133                 *status = state->status;
1134         }
1135
1136         if (state->async.fn) {
1137                 state->async.fn(state);
1138         }
1139
1140         talloc_free(tmp_ctx);
1141         return 0;
1142 }
1143
1144
1145
1146 /*
1147   send a ctdb control message
1148   timeout specifies how long we should wait for a reply.
1149   if timeout is NULL we wait indefinitely
1150  */
1151 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1152                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1153                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1154                  struct timeval *timeout,
1155                  char **errormsg)
1156 {
1157         struct ctdb_client_control_state *state;
1158
1159         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1160                         flags, data, mem_ctx,
1161                         timeout, errormsg);
1162         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1163                         errormsg);
1164 }
1165
1166
1167
1168
1169 /*
1170   a process exists call. Returns 0 if process exists, -1 otherwise
1171  */
1172 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1173 {
1174         int ret;
1175         TDB_DATA data;
1176         int32_t status;
1177
1178         data.dptr = (uint8_t*)&pid;
1179         data.dsize = sizeof(pid);
1180
1181         ret = ctdb_control(ctdb, destnode, 0, 
1182                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1183                            NULL, NULL, &status, NULL, NULL);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1186                 return -1;
1187         }
1188
1189         return status;
1190 }
1191
1192 /*
1193   get remote statistics
1194  */
1195 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1196 {
1197         int ret;
1198         TDB_DATA data;
1199         int32_t res;
1200
1201         ret = ctdb_control(ctdb, destnode, 0, 
1202                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1203                            ctdb, &data, &res, NULL, NULL);
1204         if (ret != 0 || res != 0) {
1205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1206                 return -1;
1207         }
1208
1209         if (data.dsize != sizeof(struct ctdb_statistics)) {
1210                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1211                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1212                       return -1;
1213         }
1214
1215         *status = *(struct ctdb_statistics *)data.dptr;
1216         talloc_free(data.dptr);
1217                         
1218         return 0;
1219 }
1220
1221 /*
1222   shutdown a remote ctdb node
1223  */
1224 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1225 {
1226         struct ctdb_client_control_state *state;
1227
1228         state = ctdb_control_send(ctdb, destnode, 0, 
1229                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1230                            NULL, &timeout, NULL);
1231         if (state == NULL) {
1232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1233                 return -1;
1234         }
1235
1236         return 0;
1237 }
1238
1239 /*
1240   get vnn map from a remote node
1241  */
1242 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1243 {
1244         int ret;
1245         TDB_DATA outdata;
1246         int32_t res;
1247         struct ctdb_vnn_map_wire *map;
1248
1249         ret = ctdb_control(ctdb, destnode, 0, 
1250                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1251                            mem_ctx, &outdata, &res, &timeout, NULL);
1252         if (ret != 0 || res != 0) {
1253                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1254                 return -1;
1255         }
1256         
1257         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1258         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1259             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1260                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1261                 return -1;
1262         }
1263
1264         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1265         CTDB_NO_MEMORY(ctdb, *vnnmap);
1266         (*vnnmap)->generation = map->generation;
1267         (*vnnmap)->size       = map->size;
1268         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1269
1270         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1271         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1272         talloc_free(outdata.dptr);
1273                     
1274         return 0;
1275 }
1276
1277
1278 /*
1279   get the recovery mode of a remote node
1280  */
1281 struct ctdb_client_control_state *
1282 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1283 {
1284         return ctdb_control_send(ctdb, destnode, 0, 
1285                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1286                            mem_ctx, &timeout, NULL);
1287 }
1288
1289 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1290 {
1291         int ret;
1292         int32_t res;
1293
1294         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1295         if (ret != 0) {
1296                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1297                 return -1;
1298         }
1299
1300         if (recmode) {
1301                 *recmode = (uint32_t)res;
1302         }
1303
1304         return 0;
1305 }
1306
1307 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1312         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1313 }
1314
1315
1316
1317
1318 /*
1319   set the recovery mode of a remote node
1320  */
1321 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1322 {
1323         int ret;
1324         TDB_DATA data;
1325         int32_t res;
1326
1327         data.dsize = sizeof(uint32_t);
1328         data.dptr = (unsigned char *)&recmode;
1329
1330         ret = ctdb_control(ctdb, destnode, 0, 
1331                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1332                            NULL, NULL, &res, &timeout, NULL);
1333         if (ret != 0 || res != 0) {
1334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1335                 return -1;
1336         }
1337
1338         return 0;
1339 }
1340
1341
1342
1343 /*
1344   get the recovery master of a remote node
1345  */
1346 struct ctdb_client_control_state *
1347 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1348                         struct timeval timeout, uint32_t destnode)
1349 {
1350         return ctdb_control_send(ctdb, destnode, 0, 
1351                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1352                            mem_ctx, &timeout, NULL);
1353 }
1354
1355 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1356 {
1357         int ret;
1358         int32_t res;
1359
1360         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1361         if (ret != 0) {
1362                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1363                 return -1;
1364         }
1365
1366         if (recmaster) {
1367                 *recmaster = (uint32_t)res;
1368         }
1369
1370         return 0;
1371 }
1372
1373 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1374 {
1375         struct ctdb_client_control_state *state;
1376
1377         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1378         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1379 }
1380
1381
1382 /*
1383   set the recovery master of a remote node
1384  */
1385 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1386 {
1387         int ret;
1388         TDB_DATA data;
1389         int32_t res;
1390
1391         ZERO_STRUCT(data);
1392         data.dsize = sizeof(uint32_t);
1393         data.dptr = (unsigned char *)&recmaster;
1394
1395         ret = ctdb_control(ctdb, destnode, 0, 
1396                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1397                            NULL, NULL, &res, &timeout, NULL);
1398         if (ret != 0 || res != 0) {
1399                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1400                 return -1;
1401         }
1402
1403         return 0;
1404 }
1405
1406
1407 /*
1408   get a list of databases off a remote node
1409  */
1410 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1411                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1412 {
1413         int ret;
1414         TDB_DATA outdata;
1415         int32_t res;
1416
1417         ret = ctdb_control(ctdb, destnode, 0, 
1418                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1419                            mem_ctx, &outdata, &res, &timeout, NULL);
1420         if (ret != 0 || res != 0) {
1421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1422                 return -1;
1423         }
1424
1425         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1426         talloc_free(outdata.dptr);
1427                     
1428         return 0;
1429 }
1430
1431 /*
1432   get a list of nodes (vnn and flags ) from a remote node
1433  */
1434 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1435                 struct timeval timeout, uint32_t destnode, 
1436                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1437 {
1438         int ret;
1439         TDB_DATA outdata;
1440         int32_t res;
1441
1442         ret = ctdb_control(ctdb, destnode, 0, 
1443                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1444                            mem_ctx, &outdata, &res, &timeout, NULL);
1445         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1446                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1447                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1448         }
1449         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1450                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1451                 return -1;
1452         }
1453
1454         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1455         talloc_free(outdata.dptr);
1456                     
1457         return 0;
1458 }
1459
1460 /*
1461   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1462  */
1463 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1464                 struct timeval timeout, uint32_t destnode, 
1465                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1466 {
1467         int ret, i, len;
1468         TDB_DATA outdata;
1469         struct ctdb_node_mapv4 *nodemapv4;
1470         int32_t res;
1471
1472         ret = ctdb_control(ctdb, destnode, 0, 
1473                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1474                            mem_ctx, &outdata, &res, &timeout, NULL);
1475         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1476                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1477                 return -1;
1478         }
1479
1480         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1481
1482         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1483         (*nodemap) = talloc_zero_size(mem_ctx, len);
1484         CTDB_NO_MEMORY(ctdb, (*nodemap));
1485
1486         (*nodemap)->num = nodemapv4->num;
1487         for (i=0; i<nodemapv4->num; i++) {
1488                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1489                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1490                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1491                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1492         }
1493                 
1494         talloc_free(outdata.dptr);
1495                     
1496         return 0;
1497 }
1498
1499 /*
1500   drop the transport, reload the nodes file and restart the transport
1501  */
1502 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1503                     struct timeval timeout, uint32_t destnode)
1504 {
1505         int ret;
1506         int32_t res;
1507
1508         ret = ctdb_control(ctdb, destnode, 0, 
1509                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1510                            NULL, NULL, &res, &timeout, NULL);
1511         if (ret != 0 || res != 0) {
1512                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1513                 return -1;
1514         }
1515
1516         return 0;
1517 }
1518
1519
1520 /*
1521   set vnn map on a node
1522  */
1523 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1524                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1525 {
1526         int ret;
1527         TDB_DATA data;
1528         int32_t res;
1529         struct ctdb_vnn_map_wire *map;
1530         size_t len;
1531
1532         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1533         map = talloc_size(mem_ctx, len);
1534         CTDB_NO_MEMORY(ctdb, map);
1535
1536         map->generation = vnnmap->generation;
1537         map->size = vnnmap->size;
1538         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1539         
1540         data.dsize = len;
1541         data.dptr  = (uint8_t *)map;
1542
1543         ret = ctdb_control(ctdb, destnode, 0, 
1544                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1545                            NULL, NULL, &res, &timeout, NULL);
1546         if (ret != 0 || res != 0) {
1547                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1548                 return -1;
1549         }
1550
1551         talloc_free(map);
1552
1553         return 0;
1554 }
1555
1556
1557 /*
1558   async send for pull database
1559  */
1560 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1561         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1562         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1563 {
1564         TDB_DATA indata;
1565         struct ctdb_control_pulldb *pull;
1566         struct ctdb_client_control_state *state;
1567
1568         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1569         CTDB_NO_MEMORY_NULL(ctdb, pull);
1570
1571         pull->db_id   = dbid;
1572         pull->lmaster = lmaster;
1573
1574         indata.dsize = sizeof(struct ctdb_control_pulldb);
1575         indata.dptr  = (unsigned char *)pull;
1576
1577         state = ctdb_control_send(ctdb, destnode, 0, 
1578                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1579                                   mem_ctx, &timeout, NULL);
1580         talloc_free(pull);
1581
1582         return state;
1583 }
1584
1585 /*
1586   async recv for pull database
1587  */
1588 int ctdb_ctrl_pulldb_recv(
1589         struct ctdb_context *ctdb, 
1590         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1591         TDB_DATA *outdata)
1592 {
1593         int ret;
1594         int32_t res;
1595
1596         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1597         if ( (ret != 0) || (res != 0) ){
1598                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1599                 return -1;
1600         }
1601
1602         return 0;
1603 }
1604
1605 /*
1606   pull all keys and records for a specific database on a node
1607  */
1608 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1609                 uint32_t dbid, uint32_t lmaster, 
1610                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1611                 TDB_DATA *outdata)
1612 {
1613         struct ctdb_client_control_state *state;
1614
1615         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1616                                       timeout);
1617         
1618         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1619 }
1620
1621
1622 /*
1623   change dmaster for all keys in the database to the new value
1624  */
1625 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1626                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1627 {
1628         int ret;
1629         TDB_DATA indata;
1630         int32_t res;
1631
1632         indata.dsize = 2*sizeof(uint32_t);
1633         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1634
1635         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1636         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1637
1638         ret = ctdb_control(ctdb, destnode, 0, 
1639                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1640                            NULL, NULL, &res, &timeout, NULL);
1641         if (ret != 0 || res != 0) {
1642                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1643                 return -1;
1644         }
1645
1646         return 0;
1647 }
1648
1649 /*
1650   ping a node, return number of clients connected
1651  */
1652 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1653 {
1654         int ret;
1655         int32_t res;
1656
1657         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1658                            tdb_null, NULL, NULL, &res, NULL, NULL);
1659         if (ret != 0) {
1660                 return -1;
1661         }
1662         return res;
1663 }
1664
1665 /*
1666   find the real path to a ltdb 
1667  */
1668 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1669                    const char **path)
1670 {
1671         int ret;
1672         int32_t res;
1673         TDB_DATA data;
1674
1675         data.dptr = (uint8_t *)&dbid;
1676         data.dsize = sizeof(dbid);
1677
1678         ret = ctdb_control(ctdb, destnode, 0, 
1679                            CTDB_CONTROL_GETDBPATH, 0, data, 
1680                            mem_ctx, &data, &res, &timeout, NULL);
1681         if (ret != 0 || res != 0) {
1682                 return -1;
1683         }
1684
1685         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1686         if ((*path) == NULL) {
1687                 return -1;
1688         }
1689
1690         talloc_free(data.dptr);
1691
1692         return 0;
1693 }
1694
1695 /*
1696   find the name of a db 
1697  */
1698 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1699                    const char **name)
1700 {
1701         int ret;
1702         int32_t res;
1703         TDB_DATA data;
1704
1705         data.dptr = (uint8_t *)&dbid;
1706         data.dsize = sizeof(dbid);
1707
1708         ret = ctdb_control(ctdb, destnode, 0, 
1709                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1710                            mem_ctx, &data, &res, &timeout, NULL);
1711         if (ret != 0 || res != 0) {
1712                 return -1;
1713         }
1714
1715         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1716         if ((*name) == NULL) {
1717                 return -1;
1718         }
1719
1720         talloc_free(data.dptr);
1721
1722         return 0;
1723 }
1724
1725 /*
1726   get the health status of a db
1727  */
1728 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1729                           struct timeval timeout,
1730                           uint32_t destnode,
1731                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1732                           const char **reason)
1733 {
1734         int ret;
1735         int32_t res;
1736         TDB_DATA data;
1737
1738         data.dptr = (uint8_t *)&dbid;
1739         data.dsize = sizeof(dbid);
1740
1741         ret = ctdb_control(ctdb, destnode, 0,
1742                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1743                            mem_ctx, &data, &res, &timeout, NULL);
1744         if (ret != 0 || res != 0) {
1745                 return -1;
1746         }
1747
1748         if (data.dsize == 0) {
1749                 (*reason) = NULL;
1750                 return 0;
1751         }
1752
1753         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1754         if ((*reason) == NULL) {
1755                 return -1;
1756         }
1757
1758         talloc_free(data.dptr);
1759
1760         return 0;
1761 }
1762
1763 /*
1764   create a database
1765  */
1766 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1767                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1768 {
1769         int ret;
1770         int32_t res;
1771         TDB_DATA data;
1772
1773         data.dptr = discard_const(name);
1774         data.dsize = strlen(name)+1;
1775
1776         ret = ctdb_control(ctdb, destnode, 0, 
1777                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1778                            0, data, 
1779                            mem_ctx, &data, &res, &timeout, NULL);
1780
1781         if (ret != 0 || res != 0) {
1782                 return -1;
1783         }
1784
1785         return 0;
1786 }
1787
1788 /*
1789   get debug level on a node
1790  */
1791 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1792 {
1793         int ret;
1794         int32_t res;
1795         TDB_DATA data;
1796
1797         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1798                            ctdb, &data, &res, NULL, NULL);
1799         if (ret != 0 || res != 0) {
1800                 return -1;
1801         }
1802         if (data.dsize != sizeof(int32_t)) {
1803                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1804                          (unsigned)data.dsize));
1805                 return -1;
1806         }
1807         *level = *(int32_t *)data.dptr;
1808         talloc_free(data.dptr);
1809         return 0;
1810 }
1811
1812 /*
1813   set debug level on a node
1814  */
1815 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1816 {
1817         int ret;
1818         int32_t res;
1819         TDB_DATA data;
1820
1821         data.dptr = (uint8_t *)&level;
1822         data.dsize = sizeof(level);
1823
1824         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1825                            NULL, NULL, &res, NULL, NULL);
1826         if (ret != 0 || res != 0) {
1827                 return -1;
1828         }
1829         return 0;
1830 }
1831
1832
1833 /*
1834   get a list of connected nodes
1835  */
1836 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1837                                 struct timeval timeout,
1838                                 TALLOC_CTX *mem_ctx,
1839                                 uint32_t *num_nodes)
1840 {
1841         struct ctdb_node_map *map=NULL;
1842         int ret, i;
1843         uint32_t *nodes;
1844
1845         *num_nodes = 0;
1846
1847         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1848         if (ret != 0) {
1849                 return NULL;
1850         }
1851
1852         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1853         if (nodes == NULL) {
1854                 return NULL;
1855         }
1856
1857         for (i=0;i<map->num;i++) {
1858                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1859                         nodes[*num_nodes] = map->nodes[i].pnn;
1860                         (*num_nodes)++;
1861                 }
1862         }
1863
1864         return nodes;
1865 }
1866
1867
1868 /*
1869   reset remote status
1870  */
1871 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1872 {
1873         int ret;
1874         int32_t res;
1875
1876         ret = ctdb_control(ctdb, destnode, 0, 
1877                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1878                            NULL, NULL, &res, NULL, NULL);
1879         if (ret != 0 || res != 0) {
1880                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1881                 return -1;
1882         }
1883         return 0;
1884 }
1885
1886 /*
1887   attach to a specific database - client call
1888 */
1889 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1890                                     struct timeval timeout,
1891                                     const char *name,
1892                                     bool persistent,
1893                                     uint32_t tdb_flags)
1894 {
1895         struct ctdb_db_context *ctdb_db;
1896         TDB_DATA data;
1897         int ret;
1898         int32_t res;
1899
1900         ctdb_db = ctdb_db_handle(ctdb, name);
1901         if (ctdb_db) {
1902                 return ctdb_db;
1903         }
1904
1905         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1906         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1907
1908         ctdb_db->ctdb = ctdb;
1909         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1910         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1911
1912         data.dptr = discard_const(name);
1913         data.dsize = strlen(name)+1;
1914
1915         /* tell ctdb daemon to attach */
1916         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1918                            0, data, ctdb_db, &data, &res, NULL, NULL);
1919         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1920                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1921                 talloc_free(ctdb_db);
1922                 return NULL;
1923         }
1924         
1925         ctdb_db->db_id = *(uint32_t *)data.dptr;
1926         talloc_free(data.dptr);
1927
1928         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1929         if (ret != 0) {
1930                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1931                 talloc_free(ctdb_db);
1932                 return NULL;
1933         }
1934
1935         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1936         if (ctdb->valgrinding) {
1937                 tdb_flags |= TDB_NOMMAP;
1938         }
1939         tdb_flags |= TDB_DISALLOW_NESTING;
1940
1941         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1942         if (ctdb_db->ltdb == NULL) {
1943                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1944                 talloc_free(ctdb_db);
1945                 return NULL;
1946         }
1947
1948         ctdb_db->persistent = persistent;
1949
1950         DLIST_ADD(ctdb->db_list, ctdb_db);
1951
1952         /* add well known functions */
1953         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1954         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1955         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1956
1957         return ctdb_db;
1958 }
1959
1960
1961 /*
1962   setup a call for a database
1963  */
1964 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1965 {
1966         struct ctdb_registered_call *call;
1967
1968 #if 0
1969         TDB_DATA data;
1970         int32_t status;
1971         struct ctdb_control_set_call c;
1972         int ret;
1973
1974         /* this is no longer valid with the separate daemon architecture */
1975         c.db_id = ctdb_db->db_id;
1976         c.fn    = fn;
1977         c.id    = id;
1978
1979         data.dptr = (uint8_t *)&c;
1980         data.dsize = sizeof(c);
1981
1982         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1983                            data, NULL, NULL, &status, NULL, NULL);
1984         if (ret != 0 || status != 0) {
1985                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1986                 return -1;
1987         }
1988 #endif
1989
1990         /* also register locally */
1991         call = talloc(ctdb_db, struct ctdb_registered_call);
1992         call->fn = fn;
1993         call->id = id;
1994
1995         DLIST_ADD(ctdb_db->calls, call);        
1996         return 0;
1997 }
1998
1999
2000 struct traverse_state {
2001         bool done;
2002         uint32_t count;
2003         ctdb_traverse_func fn;
2004         void *private_data;
2005 };
2006
2007 /*
2008   called on each key during a ctdb_traverse
2009  */
2010 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2011 {
2012         struct traverse_state *state = (struct traverse_state *)p;
2013         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2014         TDB_DATA key;
2015
2016         if (data.dsize < sizeof(uint32_t) ||
2017             d->length != data.dsize) {
2018                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2019                 state->done = True;
2020                 return;
2021         }
2022
2023         key.dsize = d->keylen;
2024         key.dptr  = &d->data[0];
2025         data.dsize = d->datalen;
2026         data.dptr = &d->data[d->keylen];
2027
2028         if (key.dsize == 0 && data.dsize == 0) {
2029                 /* end of traverse */
2030                 state->done = True;
2031                 return;
2032         }
2033
2034         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
2035                 /* empty records are deleted records in ctdb */
2036                 return;
2037         }
2038
2039         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2040                 state->done = True;
2041         }
2042
2043         state->count++;
2044 }
2045
2046
2047 /*
2048   start a cluster wide traverse, calling the supplied fn on each record
2049   return the number of records traversed, or -1 on error
2050  */
2051 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2052 {
2053         TDB_DATA data;
2054         struct ctdb_traverse_start t;
2055         int32_t status;
2056         int ret;
2057         uint64_t srvid = (getpid() | 0xFLL<<60);
2058         struct traverse_state state;
2059
2060         state.done = False;
2061         state.count = 0;
2062         state.private_data = private_data;
2063         state.fn = fn;
2064
2065         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2066         if (ret != 0) {
2067                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2068                 return -1;
2069         }
2070
2071         t.db_id = ctdb_db->db_id;
2072         t.srvid = srvid;
2073         t.reqid = 0;
2074
2075         data.dptr = (uint8_t *)&t;
2076         data.dsize = sizeof(t);
2077
2078         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
2079                            data, NULL, NULL, &status, NULL, NULL);
2080         if (ret != 0 || status != 0) {
2081                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2082                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2083                 return -1;
2084         }
2085
2086         while (!state.done) {
2087                 event_loop_once(ctdb_db->ctdb->ev);
2088         }
2089
2090         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2091         if (ret != 0) {
2092                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2093                 return -1;
2094         }
2095
2096         return state.count;
2097 }
2098
2099 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2100 /*
2101   called on each key during a catdb
2102  */
2103 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2104 {
2105         int i;
2106         FILE *f = (FILE *)p;
2107         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2108
2109         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2110         for (i=0;i<key.dsize;i++) {
2111                 if (ISASCII(key.dptr[i])) {
2112                         fprintf(f, "%c", key.dptr[i]);
2113                 } else {
2114                         fprintf(f, "\\%02X", key.dptr[i]);
2115                 }
2116         }
2117         fprintf(f, "\"\n");
2118
2119         fprintf(f, "dmaster: %u\n", h->dmaster);
2120         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2121         fprintf(f, "flags: 0x%08x", h->flags);
2122         if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2123         if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2124         if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2125         if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2126         if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2127         if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2128         if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2129         fprintf(f, "\n");
2130
2131         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2132         for (i=sizeof(*h);i<data.dsize;i++) {
2133                 if (ISASCII(data.dptr[i])) {
2134                         fprintf(f, "%c", data.dptr[i]);
2135                 } else {
2136                         fprintf(f, "\\%02X", data.dptr[i]);
2137                 }
2138         }
2139         fprintf(f, "\"\n");
2140
2141         fprintf(f, "\n");
2142
2143         return 0;
2144 }
2145
2146 /*
2147   convenience function to list all keys to stdout
2148  */
2149 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
2150 {
2151         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
2152 }
2153
2154 /*
2155   get the pid of a ctdb daemon
2156  */
2157 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2158 {
2159         int ret;
2160         int32_t res;
2161
2162         ret = ctdb_control(ctdb, destnode, 0, 
2163                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2164                            NULL, NULL, &res, &timeout, NULL);
2165         if (ret != 0) {
2166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2167                 return -1;
2168         }
2169
2170         *pid = res;
2171
2172         return 0;
2173 }
2174
2175
2176 /*
2177   async freeze send control
2178  */
2179 struct ctdb_client_control_state *
2180 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2181 {
2182         return ctdb_control_send(ctdb, destnode, priority, 
2183                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2184                            mem_ctx, &timeout, NULL);
2185 }
2186
2187 /* 
2188    async freeze recv control
2189 */
2190 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2191 {
2192         int ret;
2193         int32_t res;
2194
2195         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2196         if ( (ret != 0) || (res != 0) ){
2197                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2198                 return -1;
2199         }
2200
2201         return 0;
2202 }
2203
2204 /*
2205   freeze databases of a certain priority
2206  */
2207 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2208 {
2209         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2210         struct ctdb_client_control_state *state;
2211         int ret;
2212
2213         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2214         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2215         talloc_free(tmp_ctx);
2216
2217         return ret;
2218 }
2219
2220 /* Freeze all databases */
2221 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2222 {
2223         int i;
2224
2225         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2226                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2227                         return -1;
2228                 }
2229         }
2230         return 0;
2231 }
2232
2233 /*
2234   thaw databases of a certain priority
2235  */
2236 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2237 {
2238         int ret;
2239         int32_t res;
2240
2241         ret = ctdb_control(ctdb, destnode, priority, 
2242                            CTDB_CONTROL_THAW, 0, tdb_null, 
2243                            NULL, NULL, &res, &timeout, NULL);
2244         if (ret != 0 || res != 0) {
2245                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2246                 return -1;
2247         }
2248
2249         return 0;
2250 }
2251
2252 /* thaw all databases */
2253 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2254 {
2255         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2256 }
2257
2258 /*
2259   get pnn of a node, or -1
2260  */
2261 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2262 {
2263         int ret;
2264         int32_t res;
2265
2266         ret = ctdb_control(ctdb, destnode, 0, 
2267                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2268                            NULL, NULL, &res, &timeout, NULL);
2269         if (ret != 0) {
2270                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2271                 return -1;
2272         }
2273
2274         return res;
2275 }
2276
2277 /*
2278   get the monitoring mode of a remote node
2279  */
2280 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2281 {
2282         int ret;
2283         int32_t res;
2284
2285         ret = ctdb_control(ctdb, destnode, 0, 
2286                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2287                            NULL, NULL, &res, &timeout, NULL);
2288         if (ret != 0) {
2289                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2290                 return -1;
2291         }
2292
2293         *monmode = res;
2294
2295         return 0;
2296 }
2297
2298
2299 /*
2300  set the monitoring mode of a remote node to active
2301  */
2302 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2303 {
2304         int ret;
2305         
2306
2307         ret = ctdb_control(ctdb, destnode, 0, 
2308                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2309                            NULL, NULL,NULL, &timeout, NULL);
2310         if (ret != 0) {
2311                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2312                 return -1;
2313         }
2314
2315         
2316
2317         return 0;
2318 }
2319
2320 /*
2321   set the monitoring mode of a remote node to disable
2322  */
2323 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2324 {
2325         int ret;
2326         
2327
2328         ret = ctdb_control(ctdb, destnode, 0, 
2329                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2330                            NULL, NULL, NULL, &timeout, NULL);
2331         if (ret != 0) {
2332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2333                 return -1;
2334         }
2335
2336         
2337
2338         return 0;
2339 }
2340
2341
2342
2343 /* 
2344   sent to a node to make it take over an ip address
2345 */
2346 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2347                           uint32_t destnode, struct ctdb_public_ip *ip)
2348 {
2349         TDB_DATA data;
2350         struct ctdb_public_ipv4 ipv4;
2351         int ret;
2352         int32_t res;
2353
2354         if (ip->addr.sa.sa_family == AF_INET) {
2355                 ipv4.pnn = ip->pnn;
2356                 ipv4.sin = ip->addr.ip;
2357
2358                 data.dsize = sizeof(ipv4);
2359                 data.dptr  = (uint8_t *)&ipv4;
2360
2361                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2362                            NULL, &res, &timeout, NULL);
2363         } else {
2364                 data.dsize = sizeof(*ip);
2365                 data.dptr  = (uint8_t *)ip;
2366
2367                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2368                            NULL, &res, &timeout, NULL);
2369         }
2370
2371         if (ret != 0 || res != 0) {
2372                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2373                 return -1;
2374         }
2375
2376         return 0;       
2377 }
2378
2379
2380 /* 
2381   sent to a node to make it release an ip address
2382 */
2383 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2384                          uint32_t destnode, struct ctdb_public_ip *ip)
2385 {
2386         TDB_DATA data;
2387         struct ctdb_public_ipv4 ipv4;
2388         int ret;
2389         int32_t res;
2390
2391         if (ip->addr.sa.sa_family == AF_INET) {
2392                 ipv4.pnn = ip->pnn;
2393                 ipv4.sin = ip->addr.ip;
2394
2395                 data.dsize = sizeof(ipv4);
2396                 data.dptr  = (uint8_t *)&ipv4;
2397
2398                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2399                                    NULL, &res, &timeout, NULL);
2400         } else {
2401                 data.dsize = sizeof(*ip);
2402                 data.dptr  = (uint8_t *)ip;
2403
2404                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2405                                    NULL, &res, &timeout, NULL);
2406         }
2407
2408         if (ret != 0 || res != 0) {
2409                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2410                 return -1;
2411         }
2412
2413         return 0;       
2414 }
2415
2416
2417 /*
2418   get a tunable
2419  */
2420 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2421                           struct timeval timeout, 
2422                           uint32_t destnode,
2423                           const char *name, uint32_t *value)
2424 {
2425         struct ctdb_control_get_tunable *t;
2426         TDB_DATA data, outdata;
2427         int32_t res;
2428         int ret;
2429
2430         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2431         data.dptr  = talloc_size(ctdb, data.dsize);
2432         CTDB_NO_MEMORY(ctdb, data.dptr);
2433
2434         t = (struct ctdb_control_get_tunable *)data.dptr;
2435         t->length = strlen(name)+1;
2436         memcpy(t->name, name, t->length);
2437
2438         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2439                            &outdata, &res, &timeout, NULL);
2440         talloc_free(data.dptr);
2441         if (ret != 0 || res != 0) {
2442                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2443                 return -1;
2444         }
2445
2446         if (outdata.dsize != sizeof(uint32_t)) {
2447                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2448                 talloc_free(outdata.dptr);
2449                 return -1;
2450         }
2451         
2452         *value = *(uint32_t *)outdata.dptr;
2453         talloc_free(outdata.dptr);
2454
2455         return 0;
2456 }
2457
2458 /*
2459   set a tunable
2460  */
2461 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2462                           struct timeval timeout, 
2463                           uint32_t destnode,
2464                           const char *name, uint32_t value)
2465 {
2466         struct ctdb_control_set_tunable *t;
2467         TDB_DATA data;
2468         int32_t res;
2469         int ret;
2470
2471         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2472         data.dptr  = talloc_size(ctdb, data.dsize);
2473         CTDB_NO_MEMORY(ctdb, data.dptr);
2474
2475         t = (struct ctdb_control_set_tunable *)data.dptr;
2476         t->length = strlen(name)+1;
2477         memcpy(t->name, name, t->length);
2478         t->value = value;
2479
2480         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2481                            NULL, &res, &timeout, NULL);
2482         talloc_free(data.dptr);
2483         if (ret != 0 || res != 0) {
2484                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2485                 return -1;
2486         }
2487
2488         return 0;
2489 }
2490
2491 /*
2492   list tunables
2493  */
2494 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2495                             struct timeval timeout, 
2496                             uint32_t destnode,
2497                             TALLOC_CTX *mem_ctx,
2498                             const char ***list, uint32_t *count)
2499 {
2500         TDB_DATA outdata;
2501         int32_t res;
2502         int ret;
2503         struct ctdb_control_list_tunable *t;
2504         char *p, *s, *ptr;
2505
2506         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2507                            mem_ctx, &outdata, &res, &timeout, NULL);
2508         if (ret != 0 || res != 0) {
2509                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2510                 return -1;
2511         }
2512
2513         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2514         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2515             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2516                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2517                 talloc_free(outdata.dptr);
2518                 return -1;              
2519         }
2520         
2521         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2522         CTDB_NO_MEMORY(ctdb, p);
2523
2524         talloc_free(outdata.dptr);
2525         
2526         (*list) = NULL;
2527         (*count) = 0;
2528
2529         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2530                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2531                 CTDB_NO_MEMORY(ctdb, *list);
2532                 (*list)[*count] = talloc_strdup(*list, s);
2533                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2534                 (*count)++;
2535         }
2536
2537         talloc_free(p);
2538
2539         return 0;
2540 }
2541
2542
2543 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2544                                    struct timeval timeout, uint32_t destnode,
2545                                    TALLOC_CTX *mem_ctx,
2546                                    uint32_t flags,
2547                                    struct ctdb_all_public_ips **ips)
2548 {
2549         int ret;
2550         TDB_DATA outdata;
2551         int32_t res;
2552
2553         ret = ctdb_control(ctdb, destnode, 0, 
2554                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2555                            mem_ctx, &outdata, &res, &timeout, NULL);
2556         if (ret == 0 && res == -1) {
2557                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2558                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2559         }
2560         if (ret != 0 || res != 0) {
2561           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2562                 return -1;
2563         }
2564
2565         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2566         talloc_free(outdata.dptr);
2567                     
2568         return 0;
2569 }
2570
2571 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2572                              struct timeval timeout, uint32_t destnode,
2573                              TALLOC_CTX *mem_ctx,
2574                              struct ctdb_all_public_ips **ips)
2575 {
2576         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2577                                               destnode, mem_ctx,
2578                                               0, ips);
2579 }
2580
2581 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2582                         struct timeval timeout, uint32_t destnode, 
2583                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2584 {
2585         int ret, i, len;
2586         TDB_DATA outdata;
2587         int32_t res;
2588         struct ctdb_all_public_ipsv4 *ipsv4;
2589
2590         ret = ctdb_control(ctdb, destnode, 0, 
2591                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2592                            mem_ctx, &outdata, &res, &timeout, NULL);
2593         if (ret != 0 || res != 0) {
2594                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2595                 return -1;
2596         }
2597
2598         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2599         len = offsetof(struct ctdb_all_public_ips, ips) +
2600                 ipsv4->num*sizeof(struct ctdb_public_ip);
2601         *ips = talloc_zero_size(mem_ctx, len);
2602         CTDB_NO_MEMORY(ctdb, *ips);
2603         (*ips)->num = ipsv4->num;
2604         for (i=0; i<ipsv4->num; i++) {
2605                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2606                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2607         }
2608
2609         talloc_free(outdata.dptr);
2610                     
2611         return 0;
2612 }
2613
2614 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2615                                  struct timeval timeout, uint32_t destnode,
2616                                  TALLOC_CTX *mem_ctx,
2617                                  const ctdb_sock_addr *addr,
2618                                  struct ctdb_control_public_ip_info **_info)
2619 {
2620         int ret;
2621         TDB_DATA indata;
2622         TDB_DATA outdata;
2623         int32_t res;
2624         struct ctdb_control_public_ip_info *info;
2625         uint32_t len;
2626         uint32_t i;
2627
2628         indata.dptr = discard_const_p(uint8_t, addr);
2629         indata.dsize = sizeof(*addr);
2630
2631         ret = ctdb_control(ctdb, destnode, 0,
2632                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2633                            mem_ctx, &outdata, &res, &timeout, NULL);
2634         if (ret != 0 || res != 0) {
2635                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2636                                 "failed ret:%d res:%d\n",
2637                                 ret, res));
2638                 return -1;
2639         }
2640
2641         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2642         if (len > outdata.dsize) {
2643                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2644                                 "returned invalid data with size %u > %u\n",
2645                                 (unsigned int)outdata.dsize,
2646                                 (unsigned int)len));
2647                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2648                 return -1;
2649         }
2650
2651         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2652         len += info->num*sizeof(struct ctdb_control_iface_info);
2653
2654         if (len > outdata.dsize) {
2655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2656                                 "returned invalid data with size %u > %u\n",
2657                                 (unsigned int)outdata.dsize,
2658                                 (unsigned int)len));
2659                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2660                 return -1;
2661         }
2662
2663         /* make sure we null terminate the returned strings */
2664         for (i=0; i < info->num; i++) {
2665                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2666         }
2667
2668         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2669                                                                 outdata.dptr,
2670                                                                 outdata.dsize);
2671         talloc_free(outdata.dptr);
2672         if (*_info == NULL) {
2673                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2674                                 "talloc_memdup size %u failed\n",
2675                                 (unsigned int)outdata.dsize));
2676                 return -1;
2677         }
2678
2679         return 0;
2680 }
2681
2682 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2683                          struct timeval timeout, uint32_t destnode,
2684                          TALLOC_CTX *mem_ctx,
2685                          struct ctdb_control_get_ifaces **_ifaces)
2686 {
2687         int ret;
2688         TDB_DATA outdata;
2689         int32_t res;
2690         struct ctdb_control_get_ifaces *ifaces;
2691         uint32_t len;
2692         uint32_t i;
2693
2694         ret = ctdb_control(ctdb, destnode, 0,
2695                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2696                            mem_ctx, &outdata, &res, &timeout, NULL);
2697         if (ret != 0 || res != 0) {
2698                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2699                                 "failed ret:%d res:%d\n",
2700                                 ret, res));
2701                 return -1;
2702         }
2703
2704         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2705         if (len > outdata.dsize) {
2706                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2707                                 "returned invalid data with size %u > %u\n",
2708                                 (unsigned int)outdata.dsize,
2709                                 (unsigned int)len));
2710                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2711                 return -1;
2712         }
2713
2714         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2715         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2716
2717         if (len > outdata.dsize) {
2718                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2719                                 "returned invalid data with size %u > %u\n",
2720                                 (unsigned int)outdata.dsize,
2721                                 (unsigned int)len));
2722                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2723                 return -1;
2724         }
2725
2726         /* make sure we null terminate the returned strings */
2727         for (i=0; i < ifaces->num; i++) {
2728                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2729         }
2730
2731         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2732                                                                   outdata.dptr,
2733                                                                   outdata.dsize);
2734         talloc_free(outdata.dptr);
2735         if (*_ifaces == NULL) {
2736                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2737                                 "talloc_memdup size %u failed\n",
2738                                 (unsigned int)outdata.dsize));
2739                 return -1;
2740         }
2741
2742         return 0;
2743 }
2744
2745 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2746                              struct timeval timeout, uint32_t destnode,
2747                              TALLOC_CTX *mem_ctx,
2748                              const struct ctdb_control_iface_info *info)
2749 {
2750         int ret;
2751         TDB_DATA indata;
2752         int32_t res;
2753
2754         indata.dptr = discard_const_p(uint8_t, info);
2755         indata.dsize = sizeof(*info);
2756
2757         ret = ctdb_control(ctdb, destnode, 0,
2758                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2759                            mem_ctx, NULL, &res, &timeout, NULL);
2760         if (ret != 0 || res != 0) {
2761                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2762                                 "failed ret:%d res:%d\n",
2763                                 ret, res));
2764                 return -1;
2765         }
2766
2767         return 0;
2768 }
2769
2770 /*
2771   set/clear the permanent disabled bit on a remote node
2772  */
2773 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2774                        uint32_t set, uint32_t clear)
2775 {
2776         int ret;
2777         TDB_DATA data;
2778         struct ctdb_node_map *nodemap=NULL;
2779         struct ctdb_node_flag_change c;
2780         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2781         uint32_t recmaster;
2782         uint32_t *nodes;
2783
2784
2785         /* find the recovery master */
2786         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2787         if (ret != 0) {
2788                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2789                 talloc_free(tmp_ctx);
2790                 return ret;
2791         }
2792
2793
2794         /* read the node flags from the recmaster */
2795         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2796         if (ret != 0) {
2797                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2798                 talloc_free(tmp_ctx);
2799                 return -1;
2800         }
2801         if (destnode >= nodemap->num) {
2802                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2803                 talloc_free(tmp_ctx);
2804                 return -1;
2805         }
2806
2807         c.pnn       = destnode;
2808         c.old_flags = nodemap->nodes[destnode].flags;
2809         c.new_flags = c.old_flags;
2810         c.new_flags |= set;
2811         c.new_flags &= ~clear;
2812
2813         data.dsize = sizeof(c);
2814         data.dptr = (unsigned char *)&c;
2815
2816         /* send the flags update to all connected nodes */
2817         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2818
2819         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2820                                         nodes, 0,
2821                                         timeout, false, data,
2822                                         NULL, NULL,
2823                                         NULL) != 0) {
2824                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2825
2826                 talloc_free(tmp_ctx);
2827                 return -1;
2828         }
2829
2830         talloc_free(tmp_ctx);
2831         return 0;
2832 }
2833
2834
2835 /*
2836   get all tunables
2837  */
2838 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2839                                struct timeval timeout, 
2840                                uint32_t destnode,
2841                                struct ctdb_tunable *tunables)
2842 {
2843         TDB_DATA outdata;
2844         int ret;
2845         int32_t res;
2846
2847         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2848                            &outdata, &res, &timeout, NULL);
2849         if (ret != 0 || res != 0) {
2850                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2851                 return -1;
2852         }
2853
2854         if (outdata.dsize != sizeof(*tunables)) {
2855                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2856                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2857                 return -1;              
2858         }
2859
2860         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2861         talloc_free(outdata.dptr);
2862         return 0;
2863 }
2864
2865 /*
2866   add a public address to a node
2867  */
2868 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2869                       struct timeval timeout, 
2870                       uint32_t destnode,
2871                       struct ctdb_control_ip_iface *pub)
2872 {
2873         TDB_DATA data;
2874         int32_t res;
2875         int ret;
2876
2877         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2878         data.dptr  = (unsigned char *)pub;
2879
2880         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2881                            NULL, &res, &timeout, NULL);
2882         if (ret != 0 || res != 0) {
2883                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2884                 return -1;
2885         }
2886
2887         return 0;
2888 }
2889
2890 /*
2891   delete a public address from a node
2892  */
2893 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2894                       struct timeval timeout, 
2895                       uint32_t destnode,
2896                       struct ctdb_control_ip_iface *pub)
2897 {
2898         TDB_DATA data;
2899         int32_t res;
2900         int ret;
2901
2902         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2903         data.dptr  = (unsigned char *)pub;
2904
2905         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2906                            NULL, &res, &timeout, NULL);
2907         if (ret != 0 || res != 0) {
2908                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2909                 return -1;
2910         }
2911
2912         return 0;
2913 }
2914
2915 /*
2916   kill a tcp connection
2917  */
2918 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2919                       struct timeval timeout, 
2920                       uint32_t destnode,
2921                       struct ctdb_control_killtcp *killtcp)
2922 {
2923         TDB_DATA data;
2924         int32_t res;
2925         int ret;
2926
2927         data.dsize = sizeof(struct ctdb_control_killtcp);
2928         data.dptr  = (unsigned char *)killtcp;
2929
2930         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2931                            NULL, &res, &timeout, NULL);
2932         if (ret != 0 || res != 0) {
2933                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2934                 return -1;
2935         }
2936
2937         return 0;
2938 }
2939
2940 /*
2941   send a gratious arp
2942  */
2943 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2944                       struct timeval timeout, 
2945                       uint32_t destnode,
2946                       ctdb_sock_addr *addr,
2947                       const char *ifname)
2948 {
2949         TDB_DATA data;
2950         int32_t res;
2951         int ret, len;
2952         struct ctdb_control_gratious_arp *gratious_arp;
2953         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2954
2955
2956         len = strlen(ifname)+1;
2957         gratious_arp = talloc_size(tmp_ctx, 
2958                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2959         CTDB_NO_MEMORY(ctdb, gratious_arp);
2960
2961         gratious_arp->addr = *addr;
2962         gratious_arp->len = len;
2963         memcpy(&gratious_arp->iface[0], ifname, len);
2964
2965
2966         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2967         data.dptr  = (unsigned char *)gratious_arp;
2968
2969         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2970                            NULL, &res, &timeout, NULL);
2971         if (ret != 0 || res != 0) {
2972                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2973                 talloc_free(tmp_ctx);
2974                 return -1;
2975         }
2976
2977         talloc_free(tmp_ctx);
2978         return 0;
2979 }
2980
2981 /*
2982   get a list of all tcp tickles that a node knows about for a particular vnn
2983  */
2984 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2985                               struct timeval timeout, uint32_t destnode, 
2986                               TALLOC_CTX *mem_ctx, 
2987                               ctdb_sock_addr *addr,
2988                               struct ctdb_control_tcp_tickle_list **list)
2989 {
2990         int ret;
2991         TDB_DATA data, outdata;
2992         int32_t status;
2993
2994         data.dptr = (uint8_t*)addr;
2995         data.dsize = sizeof(ctdb_sock_addr);
2996
2997         ret = ctdb_control(ctdb, destnode, 0, 
2998                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2999                            mem_ctx, &outdata, &status, NULL, NULL);
3000         if (ret != 0 || status != 0) {
3001                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3002                 return -1;
3003         }
3004
3005         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3006
3007         return status;
3008 }
3009
3010 /*
3011   register a server id
3012  */
3013 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3014                       struct timeval timeout, 
3015                       struct ctdb_server_id *id)
3016 {
3017         TDB_DATA data;
3018         int32_t res;
3019         int ret;
3020
3021         data.dsize = sizeof(struct ctdb_server_id);
3022         data.dptr  = (unsigned char *)id;
3023
3024         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3025                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3026                         0, data, NULL,
3027                         NULL, &res, &timeout, NULL);
3028         if (ret != 0 || res != 0) {
3029                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3030                 return -1;
3031         }
3032
3033         return 0;
3034 }
3035
3036 /*
3037   unregister a server id
3038  */
3039 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3040                       struct timeval timeout, 
3041                       struct ctdb_server_id *id)
3042 {
3043         TDB_DATA data;
3044         int32_t res;
3045         int ret;
3046
3047         data.dsize = sizeof(struct ctdb_server_id);
3048         data.dptr  = (unsigned char *)id;
3049
3050         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3051                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3052                         0, data, NULL,
3053                         NULL, &res, &timeout, NULL);
3054         if (ret != 0 || res != 0) {
3055                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3056                 return -1;
3057         }
3058
3059         return 0;
3060 }
3061
3062
3063 /*
3064   check if a server id exists
3065
3066   if a server id does exist, return *status == 1, otherwise *status == 0
3067  */
3068 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3069                       struct timeval timeout, 
3070                       uint32_t destnode,
3071                       struct ctdb_server_id *id,
3072                       uint32_t *status)
3073 {
3074         TDB_DATA data;
3075         int32_t res;
3076         int ret;
3077
3078         data.dsize = sizeof(struct ctdb_server_id);
3079         data.dptr  = (unsigned char *)id;
3080
3081         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3082                         0, data, NULL,
3083                         NULL, &res, &timeout, NULL);
3084         if (ret != 0) {
3085                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3086                 return -1;
3087         }
3088
3089         if (res) {
3090                 *status = 1;
3091         } else {
3092                 *status = 0;
3093         }
3094
3095         return 0;
3096 }
3097
3098 /*
3099    get the list of server ids that are registered on a node
3100 */
3101 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3102                 TALLOC_CTX *mem_ctx,
3103                 struct timeval timeout, uint32_t destnode, 
3104                 struct ctdb_server_id_list **svid_list)
3105 {
3106         int ret;
3107         TDB_DATA outdata;
3108         int32_t res;
3109
3110         ret = ctdb_control(ctdb, destnode, 0, 
3111                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3112                            mem_ctx, &outdata, &res, &timeout, NULL);
3113         if (ret != 0 || res != 0) {
3114                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3115                 return -1;
3116         }
3117
3118         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3119                     
3120         return 0;
3121 }
3122
3123 /*
3124   initialise the ctdb daemon for client applications
3125
3126   NOTE: In current code the daemon does not fork. This is for testing purposes only
3127   and to simplify the code.
3128 */
3129 struct ctdb_context *ctdb_init(struct event_context *ev)
3130 {
3131         int ret;
3132         struct ctdb_context *ctdb;
3133
3134         ctdb = talloc_zero(ev, struct ctdb_context);
3135         if (ctdb == NULL) {
3136                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3137                 return NULL;
3138         }
3139         ctdb->ev  = ev;
3140         ctdb->idr = idr_init(ctdb);
3141         /* Wrap early to exercise code. */
3142         ctdb->lastid = INT_MAX-200;
3143         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3144
3145         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3146         if (ret != 0) {
3147                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3148                 talloc_free(ctdb);
3149                 return NULL;
3150         }
3151
3152         ctdb->statistics.statistics_start_time = timeval_current();
3153
3154         return ctdb;
3155 }
3156
3157
3158 /*
3159   set some ctdb flags
3160 */
3161 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3162 {
3163         ctdb->flags |= flags;
3164 }
3165
3166 /*
3167   setup the local socket name
3168 */
3169 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3170 {
3171         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3172         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3173
3174         return 0;
3175 }
3176
3177 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3178 {
3179         return ctdb->daemon.name;
3180 }
3181
3182 /*
3183   return the pnn of this node
3184 */
3185 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3186 {
3187         return ctdb->pnn;
3188 }
3189
3190
3191 /*
3192   get the uptime of a remote node
3193  */
3194 struct ctdb_client_control_state *
3195 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3196 {
3197         return ctdb_control_send(ctdb, destnode, 0, 
3198                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3199                            mem_ctx, &timeout, NULL);
3200 }
3201
3202 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3203 {
3204         int ret;
3205         int32_t res;
3206         TDB_DATA outdata;
3207
3208         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3209         if (ret != 0 || res != 0) {
3210                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3211                 return -1;
3212         }
3213
3214         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3215
3216         return 0;
3217 }
3218
3219 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3220 {
3221         struct ctdb_client_control_state *state;
3222
3223         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3224         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3225 }
3226
3227 /*
3228   send a control to execute the "recovered" event script on a node
3229  */
3230 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3231 {
3232         int ret;
3233         int32_t status;
3234
3235         ret = ctdb_control(ctdb, destnode, 0, 
3236                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3237                            NULL, NULL, &status, &timeout, NULL);
3238         if (ret != 0 || status != 0) {
3239                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3240                 return -1;
3241         }
3242
3243         return 0;
3244 }
3245
3246 /* 
3247   callback for the async helpers used when sending the same control
3248   to multiple nodes in parallell.
3249 */
3250 static void async_callback(struct ctdb_client_control_state *state)
3251 {
3252         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3253         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3254         int ret;
3255         TDB_DATA outdata;
3256         int32_t res;
3257         uint32_t destnode = state->c->hdr.destnode;
3258
3259         /* one more node has responded with recmode data */
3260         data->count--;
3261
3262         /* if we failed to push the db, then return an error and let
3263            the main loop try again.
3264         */
3265         if (state->state != CTDB_CONTROL_DONE) {
3266                 if ( !data->dont_log_errors) {
3267                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3268                 }
3269                 data->fail_count++;
3270                 if (data->fail_callback) {
3271                         data->fail_callback(ctdb, destnode, res, outdata,
3272                                         data->callback_data);
3273                 }
3274                 return;
3275         }
3276         
3277         state->async.fn = NULL;
3278
3279         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3280         if ((ret != 0) || (res != 0)) {
3281                 if ( !data->dont_log_errors) {
3282                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3283                 }
3284                 data->fail_count++;
3285                 if (data->fail_callback) {
3286                         data->fail_callback(ctdb, destnode, res, outdata,
3287                                         data->callback_data);
3288                 }
3289         }
3290         if ((ret == 0) && (data->callback != NULL)) {
3291                 data->callback(ctdb, destnode, res, outdata,
3292                                         data->callback_data);
3293         }
3294 }
3295
3296
3297 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3298 {
3299         /* set up the callback functions */
3300         state->async.fn = async_callback;
3301         state->async.private_data = data;
3302         
3303         /* one more control to wait for to complete */
3304         data->count++;
3305 }
3306
3307
3308 /* wait for up to the maximum number of seconds allowed
3309    or until all nodes we expect a response from has replied
3310 */
3311 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3312 {
3313         while (data->count > 0) {
3314                 event_loop_once(ctdb->ev);
3315         }
3316         if (data->fail_count != 0) {
3317                 if (!data->dont_log_errors) {
3318                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3319                                  data->fail_count));
3320                 }
3321                 return -1;
3322         }
3323         return 0;
3324 }
3325
3326
3327 /* 
3328    perform a simple control on the listed nodes
3329    The control cannot return data
3330  */
3331 int ctdb_client_async_control(struct ctdb_context *ctdb,
3332                                 enum ctdb_controls opcode,
3333                                 uint32_t *nodes,
3334                                 uint64_t srvid,
3335                                 struct timeval timeout,
3336                                 bool dont_log_errors,
3337                                 TDB_DATA data,
3338                                 client_async_callback client_callback,
3339                                 client_async_callback fail_callback,
3340                                 void *callback_data)
3341 {
3342         struct client_async_data *async_data;
3343         struct ctdb_client_control_state *state;
3344         int j, num_nodes;
3345
3346         async_data = talloc_zero(ctdb, struct client_async_data);
3347         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3348         async_data->dont_log_errors = dont_log_errors;
3349         async_data->callback = client_callback;
3350         async_data->fail_callback = fail_callback;
3351         async_data->callback_data = callback_data;
3352         async_data->opcode        = opcode;
3353
3354         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3355
3356         /* loop over all nodes and send an async control to each of them */
3357         for (j=0; j<num_nodes; j++) {
3358                 uint32_t pnn = nodes[j];
3359
3360                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3361                                           0, data, async_data, &timeout, NULL);
3362                 if (state == NULL) {
3363                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3364                         talloc_free(async_data);
3365                         return -1;
3366                 }
3367                 
3368                 ctdb_client_async_add(async_data, state);
3369         }
3370
3371         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3372                 talloc_free(async_data);
3373                 return -1;
3374         }
3375
3376         talloc_free(async_data);
3377         return 0;
3378 }
3379
3380 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3381                                 struct ctdb_vnn_map *vnn_map,
3382                                 TALLOC_CTX *mem_ctx,
3383                                 bool include_self)
3384 {
3385         int i, j, num_nodes;
3386         uint32_t *nodes;
3387
3388         for (i=num_nodes=0;i<vnn_map->size;i++) {
3389                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3390                         continue;
3391                 }
3392                 num_nodes++;
3393         } 
3394
3395         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3396         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3397
3398         for (i=j=0;i<vnn_map->size;i++) {
3399                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3400                         continue;
3401                 }
3402                 nodes[j++] = vnn_map->map[i];
3403         } 
3404
3405         return nodes;
3406 }
3407
3408 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3409                                 struct ctdb_node_map *node_map,
3410                                 TALLOC_CTX *mem_ctx,
3411                                 bool include_self)
3412 {
3413         int i, j, num_nodes;
3414         uint32_t *nodes;
3415
3416         for (i=num_nodes=0;i<node_map->num;i++) {
3417                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3418                         continue;
3419                 }
3420                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3421                         continue;
3422                 }
3423                 num_nodes++;
3424         } 
3425
3426         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3427         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3428
3429         for (i=j=0;i<node_map->num;i++) {
3430                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3431                         continue;
3432                 }
3433                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3434                         continue;
3435                 }
3436                 nodes[j++] = node_map->nodes[i].pnn;
3437         } 
3438
3439         return nodes;
3440 }
3441
3442 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3443                                 struct ctdb_node_map *node_map,
3444                                 TALLOC_CTX *mem_ctx,
3445                                 uint32_t pnn)
3446 {
3447         int i, j, num_nodes;
3448         uint32_t *nodes;
3449
3450         for (i=num_nodes=0;i<node_map->num;i++) {
3451                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3452                         continue;
3453                 }
3454                 if (node_map->nodes[i].pnn == pnn) {
3455                         continue;
3456                 }
3457                 num_nodes++;
3458         } 
3459
3460         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3461         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3462
3463         for (i=j=0;i<node_map->num;i++) {
3464                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3465                         continue;
3466                 }
3467                 if (node_map->nodes[i].pnn == pnn) {
3468                         continue;
3469                 }
3470                 nodes[j++] = node_map->nodes[i].pnn;
3471         } 
3472
3473         return nodes;
3474 }
3475
3476 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3477                                 struct ctdb_node_map *node_map,
3478                                 TALLOC_CTX *mem_ctx,
3479                                 bool include_self)
3480 {
3481         int i, j, num_nodes;
3482         uint32_t *nodes;
3483
3484         for (i=num_nodes=0;i<node_map->num;i++) {
3485                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3486                         continue;
3487                 }
3488                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3489                         continue;
3490                 }
3491                 num_nodes++;
3492         } 
3493
3494         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3495         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3496
3497         for (i=j=0;i<node_map->num;i++) {
3498                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3499                         continue;
3500                 }
3501                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3502                         continue;
3503                 }
3504                 nodes[j++] = node_map->nodes[i].pnn;
3505         } 
3506
3507         return nodes;
3508 }
3509
3510 /* 
3511   this is used to test if a pnn lock exists and if it exists will return
3512   the number of connections that pnn has reported or -1 if that recovery
3513   daemon is not running.
3514 */
3515 int
3516 ctdb_read_pnn_lock(int fd, int32_t pnn)
3517 {
3518         struct flock lock;
3519         char c;
3520
3521         lock.l_type = F_WRLCK;
3522         lock.l_whence = SEEK_SET;
3523         lock.l_start = pnn;
3524         lock.l_len = 1;
3525         lock.l_pid = 0;
3526
3527         if (fcntl(fd, F_GETLK, &lock) != 0) {
3528                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3529                 return -1;
3530         }
3531
3532         if (lock.l_type == F_UNLCK) {
3533                 return -1;
3534         }
3535
3536         if (pread(fd, &c, 1, pnn) == -1) {
3537                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3538                 return -1;
3539         }
3540
3541         return c;
3542 }
3543
3544 /*
3545   get capabilities of a remote node
3546  */
3547 struct ctdb_client_control_state *
3548 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3549 {
3550         return ctdb_control_send(ctdb, destnode, 0, 
3551                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3552                            mem_ctx, &timeout, NULL);
3553 }
3554
3555 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3556 {
3557         int ret;
3558         int32_t res;
3559         TDB_DATA outdata;
3560
3561         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3562         if ( (ret != 0) || (res != 0) ) {
3563                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3564                 return -1;
3565         }
3566
3567         if (capabilities) {
3568                 *capabilities = *((uint32_t *)outdata.dptr);
3569         }
3570
3571         return 0;
3572 }
3573
3574 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3575 {
3576         struct ctdb_client_control_state *state;
3577         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3578         int ret;
3579
3580         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3581         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3582         talloc_free(tmp_ctx);
3583         return ret;
3584 }
3585
3586 /**
3587  * check whether a transaction is active on a given db on a given node
3588  */
3589 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3590                                      uint32_t destnode,
3591                                      uint32_t db_id)
3592 {
3593         int32_t status;
3594         int ret;
3595         TDB_DATA indata;
3596
3597         indata.dptr = (uint8_t *)&db_id;
3598         indata.dsize = sizeof(db_id);
3599
3600         ret = ctdb_control(ctdb, destnode, 0,
3601                            CTDB_CONTROL_TRANS2_ACTIVE,
3602                            0, indata, NULL, NULL, &status,
3603                            NULL, NULL);
3604
3605         if (ret != 0) {
3606                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3607                 return -1;
3608         }
3609
3610         return status;
3611 }
3612
3613
3614 struct ctdb_transaction_handle {
3615         struct ctdb_db_context *ctdb_db;
3616         bool in_replay;
3617         /*
3618          * we store the reads and writes done under a transaction:
3619          * - one list stores both reads and writes (m_all),
3620          * - the other just writes (m_write)
3621          */
3622         struct ctdb_marshall_buffer *m_all;
3623         struct ctdb_marshall_buffer *m_write;
3624 };
3625
3626 /* start a transaction on a database */
3627 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3628 {
3629         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3630         return 0;
3631 }
3632
3633 /* start a transaction on a database */
3634 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3635 {
3636         struct ctdb_record_handle *rh;
3637         TDB_DATA key;
3638         TDB_DATA data;
3639         struct ctdb_ltdb_header header;
3640         TALLOC_CTX *tmp_ctx;
3641         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3642         int ret;
3643         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3644         pid_t pid;
3645         int32_t status;
3646
3647         key.dptr = discard_const(keyname);
3648         key.dsize = strlen(keyname);
3649
3650         if (!ctdb_db->persistent) {
3651                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3652                 return -1;
3653         }
3654
3655 again:
3656         tmp_ctx = talloc_new(h);
3657
3658         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3659         if (rh == NULL) {
3660                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3661                 talloc_free(tmp_ctx);
3662                 return -1;
3663         }
3664
3665         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3666                                               CTDB_CURRENT_NODE,
3667                                               ctdb_db->db_id);
3668         if (status == 1) {
3669                 unsigned long int usec = (1000 + random()) % 100000;
3670                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3671                                     "on db_id[0x%08x]. waiting for %lu "
3672                                     "microseconds\n",
3673                                     ctdb_db->db_id, usec));
3674                 talloc_free(tmp_ctx);
3675                 usleep(usec);
3676                 goto again;
3677         }
3678
3679         /*
3680          * store the pid in the database:
3681          * it is not enough that the node is dmaster...
3682          */
3683         pid = getpid();
3684         data.dptr = (unsigned char *)&pid;
3685         data.dsize = sizeof(pid_t);
3686         rh->header.rsn++;
3687         rh->header.dmaster = ctdb_db->ctdb->pnn;
3688         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3689         if (ret != 0) {
3690                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3691                                   "transaction record\n"));
3692                 talloc_free(tmp_ctx);
3693                 return -1;
3694         }
3695
3696         talloc_free(rh);
3697
3698         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3699         if (ret != 0) {
3700                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3701                 talloc_free(tmp_ctx);
3702                 return -1;
3703         }
3704
3705         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3706         if (ret != 0) {
3707                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3708                                  "lock record inside transaction\n"));
3709                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3710                 talloc_free(tmp_ctx);
3711                 goto again;
3712         }
3713
3714         if (header.dmaster != ctdb_db->ctdb->pnn) {
3715                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3716                                    "transaction lock record\n"));
3717                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3718                 talloc_free(tmp_ctx);
3719                 goto again;
3720         }
3721
3722         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3723                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3724                                     "the transaction lock record\n"));
3725                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3726                 talloc_free(tmp_ctx);
3727                 goto again;
3728         }
3729
3730         talloc_free(tmp_ctx);
3731
3732         return 0;
3733 }
3734
3735
3736 /* start a transaction on a database */
3737 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3738                                                        TALLOC_CTX *mem_ctx)
3739 {
3740         struct ctdb_transaction_handle *h;
3741         int ret;
3742
3743         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3744         if (h == NULL) {
3745                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3746                 return NULL;
3747         }
3748
3749         h->ctdb_db = ctdb_db;
3750
3751         ret = ctdb_transaction_fetch_start(h);
3752         if (ret != 0) {
3753                 talloc_free(h);
3754                 return NULL;
3755         }
3756
3757         talloc_set_destructor(h, ctdb_transaction_destructor);
3758
3759         return h;
3760 }
3761
3762
3763
3764 /*
3765   fetch a record inside a transaction
3766  */
3767 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3768                            TALLOC_CTX *mem_ctx, 
3769                            TDB_DATA key, TDB_DATA *data)
3770 {
3771         struct ctdb_ltdb_header header;
3772         int ret;
3773
3774         ZERO_STRUCT(header);
3775
3776         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3777         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3778                 /* record doesn't exist yet */
3779                 *data = tdb_null;
3780                 ret = 0;
3781         }
3782         
3783         if (ret != 0) {
3784                 return ret;
3785         }
3786
3787         if (!h->in_replay) {
3788                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3789                 if (h->m_all == NULL) {
3790                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3791                         return -1;
3792                 }
3793         }
3794
3795         return 0;
3796 }
3797
3798 /*
3799   stores a record inside a transaction
3800  */
3801 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3802                            TDB_DATA key, TDB_DATA data)
3803 {
3804         TALLOC_CTX *tmp_ctx = talloc_new(h);
3805         struct ctdb_ltdb_header header;
3806         TDB_DATA olddata;
3807         int ret;
3808
3809         ZERO_STRUCT(header);
3810
3811         /* we need the header so we can update the RSN */
3812         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3813         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3814                 /* the record doesn't exist - create one with us as dmaster.
3815                    This is only safe because we are in a transaction and this
3816                    is a persistent database */
3817                 ZERO_STRUCT(header);
3818         } else if (ret != 0) {
3819                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3820                 talloc_free(tmp_ctx);
3821                 return ret;
3822         }
3823
3824         if (data.dsize == olddata.dsize &&
3825             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3826                 /* save writing the same data */
3827                 talloc_free(tmp_ctx);
3828                 return 0;
3829         }
3830
3831         header.dmaster = h->ctdb_db->ctdb->pnn;
3832         header.rsn++;
3833
3834         if (!h->in_replay) {
3835                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3836                 if (h->m_all == NULL) {
3837                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3838                         talloc_free(tmp_ctx);
3839                         return -1;
3840                 }
3841         }               
3842
3843         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3844         if (h->m_write == NULL) {
3845                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3846                 talloc_free(tmp_ctx);
3847                 return -1;
3848         }
3849         
3850         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3851
3852         talloc_free(tmp_ctx);
3853         
3854         return ret;
3855 }
3856
3857 /*
3858   replay a transaction
3859  */
3860 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3861 {
3862         int ret, i;
3863         struct ctdb_rec_data *rec = NULL;
3864
3865         h->in_replay = true;
3866         talloc_free(h->m_write);
3867         h->m_write = NULL;
3868
3869         ret = ctdb_transaction_fetch_start(h);
3870         if (ret != 0) {
3871                 return ret;
3872         }
3873
3874         for (i=0;i<h->m_all->count;i++) {
3875                 TDB_DATA key, data;
3876
3877                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3878                 if (rec == NULL) {
3879                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3880                         goto failed;
3881                 }
3882
3883                 if (rec->reqid == 0) {
3884                         /* its a store */
3885                         if (ctdb_transaction_store(h, key, data) != 0) {
3886                                 goto failed;
3887                         }
3888                 } else {
3889                         TDB_DATA data2;
3890                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3891
3892                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3893                                 talloc_free(tmp_ctx);
3894                                 goto failed;
3895                         }
3896                         if (data2.dsize != data.dsize ||
3897                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3898                                 /* the record has changed on us - we have to give up */
3899                                 talloc_free(tmp_ctx);
3900                                 goto failed;
3901                         }
3902                         talloc_free(tmp_ctx);
3903                 }
3904         }
3905         
3906         return 0;
3907
3908 failed:
3909         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3910         return -1;
3911 }
3912
3913
3914 /*
3915   commit a transaction
3916  */
3917 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3918 {
3919         int ret, retries=0;
3920         int32_t status;
3921         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3922         struct timeval timeout;
3923         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3924
3925         talloc_set_destructor(h, NULL);
3926
3927         /* our commit strategy is quite complex.
3928
3929            - we first try to commit the changes to all other nodes
3930
3931            - if that works, then we commit locally and we are done
3932
3933            - if a commit on another node fails, then we need to cancel
3934              the transaction, then restart the transaction (thus
3935              opening a window of time for a pending recovery to
3936              complete), then replay the transaction, checking all the
3937              reads and writes (checking that reads give the same data,
3938              and writes succeed). Then we retry the transaction to the
3939              other nodes
3940         */
3941
3942 again:
3943         if (h->m_write == NULL) {
3944                 /* no changes were made */
3945                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3946                 talloc_free(h);
3947                 return 0;
3948         }
3949
3950         /* tell ctdbd to commit to the other nodes */
3951         timeout = timeval_current_ofs(1, 0);
3952         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3953                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3954                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3955                            &timeout, NULL);
3956         if (ret != 0 || status != 0) {
3957                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3958                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3959                                      ", retrying after 1 second...\n",
3960                                      (retries==0)?"":"retry "));
3961                 sleep(1);
3962
3963                 if (ret != 0) {
3964                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3965                 } else {
3966                         /* work out what error code we will give if we 
3967                            have to fail the operation */
3968                         switch ((enum ctdb_trans2_commit_error)status) {
3969                         case CTDB_TRANS2_COMMIT_SUCCESS:
3970                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3971                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3972                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3973                                 break;
3974                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3975                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3976                                 break;
3977                         }
3978                 }
3979
3980                 if (++retries == 100) {
3981                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3982                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3983                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3984                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3985                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3986                         talloc_free(h);
3987                         return -1;
3988                 }               
3989
3990                 if (ctdb_replay_transaction(h) != 0) {
3991                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3992                                           "transaction on db 0x%08x, "
3993                                           "failure control =%u\n",
3994                                           h->ctdb_db->db_id,
3995                                           (unsigned)failure_control));
3996                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3997                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3998                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3999                         talloc_free(h);
4000                         return -1;
4001                 }
4002                 goto again;
4003         } else {
4004                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4005         }
4006
4007         /* do the real commit locally */
4008         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4009         if (ret != 0) {
4010                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4011                                   "on db id 0x%08x locally, "
4012                                   "failure_control=%u\n",
4013                                   h->ctdb_db->db_id,
4014                                   (unsigned)failure_control));
4015                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4016                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4017                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4018                 talloc_free(h);
4019                 return ret;
4020         }
4021
4022         /* tell ctdbd that we are finished with our local commit */
4023         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4024                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4025                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4026         talloc_free(h);
4027         return 0;
4028 }
4029
4030 /*
4031   recovery daemon ping to main daemon
4032  */
4033 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4034 {
4035         int ret;
4036         int32_t res;
4037
4038         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4039                            ctdb, NULL, &res, NULL, NULL);
4040         if (ret != 0 || res != 0) {
4041                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4042                 return -1;
4043         }
4044
4045         return 0;
4046 }
4047
4048 /* when forking the main daemon and the child process needs to connect back
4049  * to the daemon as a client process, this function can be used to change
4050  * the ctdb context from daemon into client mode
4051  */
4052 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4053 {
4054         int ret;
4055         va_list ap;
4056
4057         /* Add extra information so we can identify this in the logs */
4058         va_start(ap, fmt);
4059         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4060         va_end(ap);
4061
4062         /* shutdown the transport */
4063         if (ctdb->methods) {
4064                 ctdb->methods->shutdown(ctdb);
4065         }
4066
4067         /* get a new event context */
4068         talloc_free(ctdb->ev);
4069         ctdb->ev = event_context_init(ctdb);
4070         tevent_loop_allow_nesting(ctdb->ev);
4071
4072         close(ctdb->daemon.sd);
4073         ctdb->daemon.sd = -1;
4074
4075         /* the client does not need to be realtime */
4076         if (ctdb->do_setsched) {
4077                 ctdb_restore_scheduler(ctdb);
4078         }
4079
4080         /* initialise ctdb */
4081         ret = ctdb_socket_connect(ctdb);
4082         if (ret != 0) {
4083                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4084                 return -1;
4085         }
4086
4087          return 0;
4088 }
4089
4090 /*
4091   get the status of running the monitor eventscripts: NULL means never run.
4092  */
4093 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4094                 struct timeval timeout, uint32_t destnode, 
4095                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4096                 struct ctdb_scripts_wire **scripts)
4097 {
4098         int ret;
4099         TDB_DATA outdata, indata;
4100         int32_t res;
4101         uint32_t uinttype = type;
4102
4103         indata.dptr = (uint8_t *)&uinttype;
4104         indata.dsize = sizeof(uinttype);
4105
4106         ret = ctdb_control(ctdb, destnode, 0, 
4107                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4108                            mem_ctx, &outdata, &res, &timeout, NULL);
4109         if (ret != 0 || res != 0) {
4110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4111                 return -1;
4112         }
4113
4114         if (outdata.dsize == 0) {
4115                 *scripts = NULL;
4116         } else {
4117                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4118                 talloc_free(outdata.dptr);
4119         }
4120                     
4121         return 0;
4122 }
4123
4124 /*
4125   tell the main daemon how long it took to lock the reclock file
4126  */
4127 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4128 {
4129         int ret;
4130         int32_t res;
4131         TDB_DATA data;
4132
4133         data.dptr = (uint8_t *)&latency;
4134         data.dsize = sizeof(latency);
4135
4136         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4137                            ctdb, NULL, &res, NULL, NULL);
4138         if (ret != 0 || res != 0) {
4139                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4140                 return -1;
4141         }
4142
4143         return 0;
4144 }
4145
4146 /*
4147   get the name of the reclock file
4148  */
4149 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4150                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4151                          const char **name)
4152 {
4153         int ret;
4154         int32_t res;
4155         TDB_DATA data;
4156
4157         ret = ctdb_control(ctdb, destnode, 0, 
4158                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4159                            mem_ctx, &data, &res, &timeout, NULL);
4160         if (ret != 0 || res != 0) {
4161                 return -1;
4162         }
4163
4164         if (data.dsize == 0) {
4165                 *name = NULL;
4166         } else {
4167                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4168         }
4169         talloc_free(data.dptr);
4170
4171         return 0;
4172 }
4173
4174 /*
4175   set the reclock filename for a node
4176  */
4177 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4178 {
4179         int ret;
4180         TDB_DATA data;
4181         int32_t res;
4182
4183         if (reclock == NULL) {
4184                 data.dsize = 0;
4185                 data.dptr  = NULL;
4186         } else {
4187                 data.dsize = strlen(reclock) + 1;
4188                 data.dptr  = discard_const(reclock);
4189         }
4190
4191         ret = ctdb_control(ctdb, destnode, 0, 
4192                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4193                            NULL, NULL, &res, &timeout, NULL);
4194         if (ret != 0 || res != 0) {
4195                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4196                 return -1;
4197         }
4198
4199         return 0;
4200 }
4201
4202 /*
4203   stop a node
4204  */
4205 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4206 {
4207         int ret;
4208         int32_t res;
4209
4210         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4211                            ctdb, NULL, &res, &timeout, NULL);
4212         if (ret != 0 || res != 0) {
4213                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4214                 return -1;
4215         }
4216
4217         return 0;
4218 }
4219
4220 /*
4221   continue a node
4222  */
4223 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4224 {
4225         int ret;
4226
4227         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4228                            ctdb, NULL, NULL, &timeout, NULL);
4229         if (ret != 0) {
4230                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4231                 return -1;
4232         }
4233
4234         return 0;
4235 }
4236
4237 /*
4238   set the natgw state for a node
4239  */
4240 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4241 {
4242         int ret;
4243         TDB_DATA data;
4244         int32_t res;
4245
4246         data.dsize = sizeof(natgwstate);
4247         data.dptr  = (uint8_t *)&natgwstate;
4248
4249         ret = ctdb_control(ctdb, destnode, 0, 
4250                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4251                            NULL, NULL, &res, &timeout, NULL);
4252         if (ret != 0 || res != 0) {
4253                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4254                 return -1;
4255         }
4256
4257         return 0;
4258 }
4259
4260 /*
4261   set the lmaster role for a node
4262  */
4263 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4264 {
4265         int ret;
4266         TDB_DATA data;
4267         int32_t res;
4268
4269         data.dsize = sizeof(lmasterrole);
4270         data.dptr  = (uint8_t *)&lmasterrole;
4271
4272         ret = ctdb_control(ctdb, destnode, 0, 
4273                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4274                            NULL, NULL, &res, &timeout, NULL);
4275         if (ret != 0 || res != 0) {
4276                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4277                 return -1;
4278         }
4279
4280         return 0;
4281 }
4282
4283 /*
4284   set the recmaster role for a node
4285  */
4286 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4287 {
4288         int ret;
4289         TDB_DATA data;
4290         int32_t res;
4291
4292         data.dsize = sizeof(recmasterrole);
4293         data.dptr  = (uint8_t *)&recmasterrole;
4294
4295         ret = ctdb_control(ctdb, destnode, 0, 
4296                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4297                            NULL, NULL, &res, &timeout, NULL);
4298         if (ret != 0 || res != 0) {
4299                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4300                 return -1;
4301         }
4302
4303         return 0;
4304 }
4305
4306 /* enable an eventscript
4307  */
4308 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4309 {
4310         int ret;
4311         TDB_DATA data;
4312         int32_t res;
4313
4314         data.dsize = strlen(script) + 1;
4315         data.dptr  = discard_const(script);
4316
4317         ret = ctdb_control(ctdb, destnode, 0, 
4318                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4319                            NULL, NULL, &res, &timeout, NULL);
4320         if (ret != 0 || res != 0) {
4321                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4322                 return -1;
4323         }
4324
4325         return 0;
4326 }
4327
4328 /* disable an eventscript
4329  */
4330 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4331 {
4332         int ret;
4333         TDB_DATA data;
4334         int32_t res;
4335
4336         data.dsize = strlen(script) + 1;
4337         data.dptr  = discard_const(script);
4338
4339         ret = ctdb_control(ctdb, destnode, 0, 
4340                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4341                            NULL, NULL, &res, &timeout, NULL);
4342         if (ret != 0 || res != 0) {
4343                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4344                 return -1;
4345         }
4346
4347         return 0;
4348 }
4349
4350
4351 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4352 {
4353         int ret;
4354         TDB_DATA data;
4355         int32_t res;
4356
4357         data.dsize = sizeof(*bantime);
4358         data.dptr  = (uint8_t *)bantime;
4359
4360         ret = ctdb_control(ctdb, destnode, 0, 
4361                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4362                            NULL, NULL, &res, &timeout, NULL);
4363         if (ret != 0 || res != 0) {
4364                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4365                 return -1;
4366         }
4367
4368         return 0;
4369 }
4370
4371
4372 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4373 {
4374         int ret;
4375         TDB_DATA outdata;
4376         int32_t res;
4377         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4378
4379         ret = ctdb_control(ctdb, destnode, 0, 
4380                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4381                            tmp_ctx, &outdata, &res, &timeout, NULL);
4382         if (ret != 0 || res != 0) {
4383                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4384                 talloc_free(tmp_ctx);
4385                 return -1;
4386         }
4387
4388         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4389         talloc_free(tmp_ctx);
4390
4391         return 0;
4392 }
4393
4394
4395 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4396 {
4397         int ret;
4398         int32_t res;
4399         TDB_DATA data;
4400         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4401
4402         data.dptr = (uint8_t*)db_prio;
4403         data.dsize = sizeof(*db_prio);
4404
4405         ret = ctdb_control(ctdb, destnode, 0, 
4406                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4407                            tmp_ctx, NULL, &res, &timeout, NULL);
4408         if (ret != 0 || res != 0) {
4409                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4410                 talloc_free(tmp_ctx);
4411                 return -1;
4412         }
4413
4414         talloc_free(tmp_ctx);
4415
4416         return 0;
4417 }
4418
4419 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4420 {
4421         int ret;
4422         int32_t res;
4423         TDB_DATA data;
4424         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4425
4426         data.dptr = (uint8_t*)&db_id;
4427         data.dsize = sizeof(db_id);
4428
4429         ret = ctdb_control(ctdb, destnode, 0, 
4430                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4431                            tmp_ctx, NULL, &res, &timeout, NULL);
4432         if (ret != 0 || res < 0) {
4433                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4434                 talloc_free(tmp_ctx);
4435                 return -1;
4436         }
4437
4438         if (priority) {
4439                 *priority = res;
4440         }
4441
4442         talloc_free(tmp_ctx);
4443
4444         return 0;
4445 }
4446
4447 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4448 {
4449         int ret;
4450         TDB_DATA outdata;
4451         int32_t res;
4452
4453         ret = ctdb_control(ctdb, destnode, 0, 
4454                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4455                            mem_ctx, &outdata, &res, &timeout, NULL);
4456         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4457                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4458                 return -1;
4459         }
4460
4461         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4462         talloc_free(outdata.dptr);
4463                     
4464         return 0;
4465 }
4466
4467 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4468 {
4469         if (h == NULL) {
4470                 return NULL;
4471         }
4472
4473         return &h->header;
4474 }
4475
4476
4477 struct ctdb_client_control_state *
4478 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4479 {
4480         struct ctdb_client_control_state *handle;
4481         struct ctdb_marshall_buffer *m;
4482         struct ctdb_rec_data *rec;
4483         TDB_DATA outdata;
4484
4485         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4486         if (m == NULL) {
4487                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4488                 return NULL;
4489         }
4490
4491         m->db_id = ctdb_db->db_id;
4492
4493         rec = ctdb_marshall_record(m, 0, key, header, data);
4494         if (rec == NULL) {
4495                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4496                 talloc_free(m);
4497                 return NULL;
4498         }
4499         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4500         if (m == NULL) {
4501                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4502                 talloc_free(m);
4503                 return NULL;
4504         }
4505         m->count++;
4506         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4507
4508
4509         outdata.dptr = (uint8_t *)m;
4510         outdata.dsize = talloc_get_size(m);
4511
4512         handle = ctdb_control_send(ctdb, destnode, 0, 
4513                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4514                            mem_ctx, &timeout, NULL);
4515         talloc_free(m);
4516         return handle;
4517 }
4518
4519 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4520 {
4521         int ret;
4522         int32_t res;
4523
4524         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4525         if ( (ret != 0) || (res != 0) ){
4526                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4527                 return -1;
4528         }
4529
4530         return 0;
4531 }
4532
4533 int
4534 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4535 {
4536         struct ctdb_client_control_state *state;
4537
4538         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4539         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4540 }
4541
4542
4543
4544
4545
4546
4547 /*
4548   set a database to be readonly
4549  */
4550 struct ctdb_client_control_state *
4551 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4552 {
4553         TDB_DATA data;
4554
4555         data.dptr = (uint8_t *)&dbid;
4556         data.dsize = sizeof(dbid);
4557
4558         return ctdb_control_send(ctdb, destnode, 0, 
4559                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4560                            ctdb, NULL, NULL);
4561 }
4562
4563 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4564 {
4565         int ret;
4566         int32_t res;
4567
4568         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4569         if (ret != 0 || res != 0) {
4570           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4571                 return -1;
4572         }
4573
4574         return 0;
4575 }
4576
4577 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4578 {
4579         struct ctdb_client_control_state *state;
4580
4581         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4582         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4583 }