ctdb: add an option --print-lmaster to enable printing of lmaster in "ctdb catdb"
[amitay/samba.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, bool updatetdb)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data && updatetdb) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return call->status;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
391         if (ret != 0) {
392                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
393         }
394
395         return state;
396 }
397
398 /*
399   make a ctdb call to the local daemon - async send. Called from client context.
400
401   This constructs a ctdb_call request and queues it for processing. 
402   This call never blocks.
403 */
404 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
405                                               struct ctdb_call *call)
406 {
407         struct ctdb_client_call_state *state;
408         struct ctdb_context *ctdb = ctdb_db->ctdb;
409         struct ctdb_ltdb_header header;
410         TDB_DATA data;
411         int ret;
412         size_t len;
413         struct ctdb_req_call *c;
414
415         /* if the domain socket is not yet open, open it */
416         if (ctdb->daemon.sd==-1) {
417                 ctdb_socket_connect(ctdb);
418         }
419
420         ret = ctdb_ltdb_lock(ctdb_db, call->key);
421         if (ret != 0) {
422                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
423                 return NULL;
424         }
425
426         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
427
428         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
429                 ret = -1;
430         }
431
432         if (ret == 0 && header.dmaster == ctdb->pnn) {
433                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
434                 talloc_free(data.dptr);
435                 ctdb_ltdb_unlock(ctdb_db, call->key);
436                 return state;
437         }
438
439         ctdb_ltdb_unlock(ctdb_db, call->key);
440         talloc_free(data.dptr);
441
442         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
443         if (state == NULL) {
444                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
445                 return NULL;
446         }
447         state->call = talloc_zero(state, struct ctdb_call);
448         if (state->call == NULL) {
449                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
450                 return NULL;
451         }
452
453         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
454         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
455         if (c == NULL) {
456                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
457                 return NULL;
458         }
459
460         state->reqid     = ctdb_reqid_new(ctdb, state);
461         state->ctdb_db = ctdb_db;
462         talloc_set_destructor(state, ctdb_client_call_destructor);
463
464         c->hdr.reqid     = state->reqid;
465         c->flags         = call->flags;
466         c->db_id         = ctdb_db->db_id;
467         c->callid        = call->call_id;
468         c->hopcount      = 0;
469         c->keylen        = call->key.dsize;
470         c->calldatalen   = call->call_data.dsize;
471         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
472         memcpy(&c->data[call->key.dsize], 
473                call->call_data.dptr, call->call_data.dsize);
474         *(state->call)              = *call;
475         state->call->call_data.dptr = &c->data[call->key.dsize];
476         state->call->key.dptr       = &c->data[0];
477
478         state->state  = CTDB_CALL_WAIT;
479
480
481         ctdb_client_queue_pkt(ctdb, &c->hdr);
482
483         return state;
484 }
485
486
487 /*
488   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
489 */
490 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
491 {
492         struct ctdb_client_call_state *state;
493
494         state = ctdb_call_send(ctdb_db, call);
495         return ctdb_call_recv(state, call);
496 }
497
498
499 /*
500   tell the daemon what messaging srvid we will use, and register the message
501   handler function in the client
502 */
503 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
504                              ctdb_msg_fn_t handler,
505                              void *private_data)
506                                     
507 {
508         int res;
509         int32_t status;
510         
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
512                            tdb_null, NULL, NULL, &status, NULL, NULL);
513         if (res != 0 || status != 0) {
514                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
515                 return -1;
516         }
517
518         /* also need to register the handler with our own ctdb structure */
519         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
520 }
521
522 /*
523   tell the daemon we no longer want a srvid
524 */
525 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
526 {
527         int res;
528         int32_t status;
529         
530         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
531                            tdb_null, NULL, NULL, &status, NULL, NULL);
532         if (res != 0 || status != 0) {
533                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
534                 return -1;
535         }
536
537         /* also need to register the handler with our own ctdb structure */
538         ctdb_deregister_message_handler(ctdb, srvid, private_data);
539         return 0;
540 }
541
542
543 /*
544   send a message - from client context
545  */
546 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
547                       uint64_t srvid, TDB_DATA data)
548 {
549         struct ctdb_req_message *r;
550         int len, res;
551
552         len = offsetof(struct ctdb_req_message, data) + data.dsize;
553         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
554                                len, struct ctdb_req_message);
555         CTDB_NO_MEMORY(ctdb, r);
556
557         r->hdr.destnode  = pnn;
558         r->srvid         = srvid;
559         r->datalen       = data.dsize;
560         memcpy(&r->data[0], data.dptr, data.dsize);
561         
562         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
563         if (res != 0) {
564                 return res;
565         }
566
567         talloc_free(r);
568         return 0;
569 }
570
571
572 /*
573   cancel a ctdb_fetch_lock operation, releasing the lock
574  */
575 static int fetch_lock_destructor(struct ctdb_record_handle *h)
576 {
577         ctdb_ltdb_unlock(h->ctdb_db, h->key);
578         return 0;
579 }
580
581 /*
582   force the migration of a record to this node
583  */
584 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
585 {
586         struct ctdb_call call;
587         ZERO_STRUCT(call);
588         call.call_id = CTDB_NULL_FUNC;
589         call.key = key;
590         call.flags = CTDB_IMMEDIATE_MIGRATION;
591         return ctdb_call(ctdb_db, &call);
592 }
593
594 /*
595   try to fetch a readonly copy of a record
596  */
597 static int
598 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
599 {
600         int ret;
601
602         struct ctdb_call call;
603         ZERO_STRUCT(call);
604
605         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
606         call.call_data.dptr = NULL;
607         call.call_data.dsize = 0;
608         call.key = key;
609         call.flags = CTDB_WANT_READONLY;
610         ret = ctdb_call(ctdb_db, &call);
611
612         if (ret != 0) {
613                 return -1;
614         }
615         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
616                 return -1;
617         }
618
619         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
620         if (*hdr == NULL) {
621                 talloc_free(call.reply_data.dptr);
622                 return -1;
623         }
624
625         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
626         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
627         if (data->dptr == NULL) {
628                 talloc_free(call.reply_data.dptr);
629                 talloc_free(hdr);
630                 return -1;
631         }
632
633         return 0;
634 }
635
636 /*
637   get a lock on a record, and return the records data. Blocks until it gets the lock
638  */
639 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
640                                            TDB_DATA key, TDB_DATA *data)
641 {
642         int ret;
643         struct ctdb_record_handle *h;
644
645         /*
646           procedure is as follows:
647
648           1) get the chain lock. 
649           2) check if we are dmaster
650           3) if we are the dmaster then return handle 
651           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
652              reply from ctdbd
653           5) when we get the reply, goto (1)
654          */
655
656         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
657         if (h == NULL) {
658                 return NULL;
659         }
660
661         h->ctdb_db = ctdb_db;
662         h->key     = key;
663         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
664         if (h->key.dptr == NULL) {
665                 talloc_free(h);
666                 return NULL;
667         }
668         h->data    = data;
669
670         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
671                  (const char *)key.dptr));
672
673 again:
674         /* step 1 - get the chain lock */
675         ret = ctdb_ltdb_lock(ctdb_db, key);
676         if (ret != 0) {
677                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
678                 talloc_free(h);
679                 return NULL;
680         }
681
682         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
683
684         talloc_set_destructor(h, fetch_lock_destructor);
685
686         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
687
688         /* when torturing, ensure we test the remote path */
689         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
690             random() % 5 == 0) {
691                 h->header.dmaster = (uint32_t)-1;
692         }
693
694
695         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
696
697         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
698                 ctdb_ltdb_unlock(ctdb_db, key);
699                 ret = ctdb_client_force_migration(ctdb_db, key);
700                 if (ret != 0) {
701                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
702                         talloc_free(h);
703                         return NULL;
704                 }
705                 goto again;
706         }
707
708         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
709         return h;
710 }
711
712 /*
713   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
714  */
715 struct ctdb_record_handle *
716 ctdb_fetch_readonly_lock(
717         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
718         TDB_DATA key, TDB_DATA *data,
719         int read_only)
720 {
721         int ret;
722         struct ctdb_record_handle *h;
723         struct ctdb_ltdb_header *roheader = NULL;
724
725         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
726         if (h == NULL) {
727                 return NULL;
728         }
729
730         h->ctdb_db = ctdb_db;
731         h->key     = key;
732         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
733         if (h->key.dptr == NULL) {
734                 talloc_free(h);
735                 return NULL;
736         }
737         h->data    = data;
738
739         data->dptr = NULL;
740         data->dsize = 0;
741
742
743 again:
744         talloc_free(roheader);
745         roheader = NULL;
746
747         talloc_free(data->dptr);
748         data->dptr = NULL;
749         data->dsize = 0;
750
751         /* Lock the record/chain */
752         ret = ctdb_ltdb_lock(ctdb_db, key);
753         if (ret != 0) {
754                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
755                 talloc_free(h);
756                 return NULL;
757         }
758
759         talloc_set_destructor(h, fetch_lock_destructor);
760
761         /* Check if record exists yet in the TDB */
762         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
763         if (ret != 0) {
764                 ctdb_ltdb_unlock(ctdb_db, key);
765                 ret = ctdb_client_force_migration(ctdb_db, key);
766                 if (ret != 0) {
767                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
768                         talloc_free(h);
769                         return NULL;
770                 }
771                 goto again;
772         }
773
774         /* if this is a request for read/write and we have delegations
775            we have to revoke all delegations first
776         */
777         if ((read_only == 0) 
778         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
779         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
780                 ctdb_ltdb_unlock(ctdb_db, key);
781                 ret = ctdb_client_force_migration(ctdb_db, key);
782                 if (ret != 0) {
783                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
784                         talloc_free(h);
785                         return NULL;
786                 }
787                 goto again;
788         }
789
790         /* if we are dmaster, just return the handle */
791         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
792                 return h;
793         }
794
795         if (read_only != 0) {
796                 TDB_DATA rodata = {NULL, 0};
797
798                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
799                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
800                         return h;
801                 }
802
803                 ctdb_ltdb_unlock(ctdb_db, key);
804                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
805                 if (ret != 0) {
806                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
807                         ret = ctdb_client_force_migration(ctdb_db, key);
808                         if (ret != 0) {
809                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
810                                 talloc_free(h);
811                                 return NULL;
812                         }
813
814                         goto again;
815                 }
816
817                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
818                         ret = ctdb_client_force_migration(ctdb_db, key);
819                         if (ret != 0) {
820                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
821                                 talloc_free(h);
822                                 return NULL;
823                         }
824
825                         goto again;
826                 }
827
828                 ret = ctdb_ltdb_lock(ctdb_db, key);
829                 if (ret != 0) {
830                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
831                         talloc_free(h);
832                         return NULL;
833                 }
834
835                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
836                 if (ret != 0) {
837                         ctdb_ltdb_unlock(ctdb_db, key);
838
839                         ret = ctdb_client_force_migration(ctdb_db, key);
840                         if (ret != 0) {
841                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
842                                 talloc_free(h);
843                                 return NULL;
844                         }
845
846                         goto again;
847                 }
848
849                 return h;
850         }
851
852         /* we are not dmaster and this was not a request for a readonly lock
853          * so unlock the record, migrate it and try again
854          */
855         ctdb_ltdb_unlock(ctdb_db, key);
856         ret = ctdb_client_force_migration(ctdb_db, key);
857         if (ret != 0) {
858                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
859                 talloc_free(h);
860                 return NULL;
861         }
862         goto again;
863 }
864
865 /*
866   store some data to the record that was locked with ctdb_fetch_lock()
867 */
868 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
869 {
870         if (h->ctdb_db->persistent) {
871                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
872                 return -1;
873         }
874
875         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
876 }
877
878 /*
879   non-locking fetch of a record
880  */
881 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
882                TDB_DATA key, TDB_DATA *data)
883 {
884         struct ctdb_call call;
885         int ret;
886
887         call.call_id = CTDB_FETCH_FUNC;
888         call.call_data.dptr = NULL;
889         call.call_data.dsize = 0;
890         call.key = key;
891
892         ret = ctdb_call(ctdb_db, &call);
893
894         if (ret == 0) {
895                 *data = call.reply_data;
896                 talloc_steal(mem_ctx, data->dptr);
897         }
898
899         return ret;
900 }
901
902
903
904 /*
905    called when a control completes or timesout to invoke the callback
906    function the user provided
907 */
908 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
909         struct timeval t, void *private_data)
910 {
911         struct ctdb_client_control_state *state;
912         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
913         int ret;
914
915         state = talloc_get_type(private_data, struct ctdb_client_control_state);
916         talloc_steal(tmp_ctx, state);
917
918         ret = ctdb_control_recv(state->ctdb, state, state,
919                         NULL, 
920                         NULL, 
921                         NULL);
922         if (ret != 0) {
923                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
924         }
925
926         talloc_free(tmp_ctx);
927 }
928
929 /*
930   called when a CTDB_REPLY_CONTROL packet comes in in the client
931
932   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
933   contains any reply data from the control
934 */
935 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
936                                       struct ctdb_req_header *hdr)
937 {
938         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
939         struct ctdb_client_control_state *state;
940
941         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
942         if (state == NULL) {
943                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
944                 return;
945         }
946
947         if (hdr->reqid != state->reqid) {
948                 /* we found a record  but it was the wrong one */
949                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
950                 return;
951         }
952
953         state->outdata.dptr = c->data;
954         state->outdata.dsize = c->datalen;
955         state->status = c->status;
956         if (c->errorlen) {
957                 state->errormsg = talloc_strndup(state, 
958                                                  (char *)&c->data[c->datalen], 
959                                                  c->errorlen);
960         }
961
962         /* state->outdata now uses resources from c so we dont want c
963            to just dissappear from under us while state is still alive
964         */
965         talloc_steal(state, c);
966
967         state->state = CTDB_CONTROL_DONE;
968
969         /* if we had a callback registered for this control, pull the response
970            and call the callback.
971         */
972         if (state->async.fn) {
973                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
974         }
975 }
976
977
978 /*
979   destroy a ctdb_control in client
980 */
981 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
982 {
983         ctdb_reqid_remove(state->ctdb, state->reqid);
984         return 0;
985 }
986
987
988 /* time out handler for ctdb_control */
989 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
990         struct timeval t, void *private_data)
991 {
992         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
993
994         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
995                          "dstnode:%u\n", state->reqid, state->c->opcode,
996                          state->c->hdr.destnode));
997
998         state->state = CTDB_CONTROL_TIMEOUT;
999
1000         /* if we had a callback registered for this control, pull the response
1001            and call the callback.
1002         */
1003         if (state->async.fn) {
1004                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1005         }
1006 }
1007
1008 /* async version of send control request */
1009 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1010                 uint32_t destnode, uint64_t srvid, 
1011                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1012                 TALLOC_CTX *mem_ctx,
1013                 struct timeval *timeout,
1014                 char **errormsg)
1015 {
1016         struct ctdb_client_control_state *state;
1017         size_t len;
1018         struct ctdb_req_control *c;
1019         int ret;
1020
1021         if (errormsg) {
1022                 *errormsg = NULL;
1023         }
1024
1025         /* if the domain socket is not yet open, open it */
1026         if (ctdb->daemon.sd==-1) {
1027                 ctdb_socket_connect(ctdb);
1028         }
1029
1030         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1031         CTDB_NO_MEMORY_NULL(ctdb, state);
1032
1033         state->ctdb       = ctdb;
1034         state->reqid      = ctdb_reqid_new(ctdb, state);
1035         state->state      = CTDB_CONTROL_WAIT;
1036         state->errormsg   = NULL;
1037
1038         talloc_set_destructor(state, ctdb_client_control_destructor);
1039
1040         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1041         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1042                                len, struct ctdb_req_control);
1043         state->c            = c;        
1044         CTDB_NO_MEMORY_NULL(ctdb, c);
1045         c->hdr.reqid        = state->reqid;
1046         c->hdr.destnode     = destnode;
1047         c->opcode           = opcode;
1048         c->client_id        = 0;
1049         c->flags            = flags;
1050         c->srvid            = srvid;
1051         c->datalen          = data.dsize;
1052         if (data.dsize) {
1053                 memcpy(&c->data[0], data.dptr, data.dsize);
1054         }
1055
1056         /* timeout */
1057         if (timeout && !timeval_is_zero(timeout)) {
1058                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1059         }
1060
1061         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1062         if (ret != 0) {
1063                 talloc_free(state);
1064                 return NULL;
1065         }
1066
1067         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1068                 talloc_free(state);
1069                 return NULL;
1070         }
1071
1072         return state;
1073 }
1074
1075
1076 /* async version of receive control reply */
1077 int ctdb_control_recv(struct ctdb_context *ctdb, 
1078                 struct ctdb_client_control_state *state, 
1079                 TALLOC_CTX *mem_ctx,
1080                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1081 {
1082         TALLOC_CTX *tmp_ctx;
1083
1084         if (status != NULL) {
1085                 *status = -1;
1086         }
1087         if (errormsg != NULL) {
1088                 *errormsg = NULL;
1089         }
1090
1091         if (state == NULL) {
1092                 return -1;
1093         }
1094
1095         /* prevent double free of state */
1096         tmp_ctx = talloc_new(ctdb);
1097         talloc_steal(tmp_ctx, state);
1098
1099         /* loop one event at a time until we either timeout or the control
1100            completes.
1101         */
1102         while (state->state == CTDB_CONTROL_WAIT) {
1103                 event_loop_once(ctdb->ev);
1104         }
1105
1106         if (state->state != CTDB_CONTROL_DONE) {
1107                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1108                 if (state->async.fn) {
1109                         state->async.fn(state);
1110                 }
1111                 talloc_free(tmp_ctx);
1112                 return -1;
1113         }
1114
1115         if (state->errormsg) {
1116                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1117                 if (errormsg) {
1118                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1119                 }
1120                 if (state->async.fn) {
1121                         state->async.fn(state);
1122                 }
1123                 talloc_free(tmp_ctx);
1124                 return -1;
1125         }
1126
1127         if (outdata) {
1128                 *outdata = state->outdata;
1129                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1130         }
1131
1132         if (status) {
1133                 *status = state->status;
1134         }
1135
1136         if (state->async.fn) {
1137                 state->async.fn(state);
1138         }
1139
1140         talloc_free(tmp_ctx);
1141         return 0;
1142 }
1143
1144
1145
1146 /*
1147   send a ctdb control message
1148   timeout specifies how long we should wait for a reply.
1149   if timeout is NULL we wait indefinitely
1150  */
1151 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1152                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1153                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1154                  struct timeval *timeout,
1155                  char **errormsg)
1156 {
1157         struct ctdb_client_control_state *state;
1158
1159         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1160                         flags, data, mem_ctx,
1161                         timeout, errormsg);
1162         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1163                         errormsg);
1164 }
1165
1166
1167
1168
1169 /*
1170   a process exists call. Returns 0 if process exists, -1 otherwise
1171  */
1172 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1173 {
1174         int ret;
1175         TDB_DATA data;
1176         int32_t status;
1177
1178         data.dptr = (uint8_t*)&pid;
1179         data.dsize = sizeof(pid);
1180
1181         ret = ctdb_control(ctdb, destnode, 0, 
1182                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1183                            NULL, NULL, &status, NULL, NULL);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1186                 return -1;
1187         }
1188
1189         return status;
1190 }
1191
1192 /*
1193   get remote statistics
1194  */
1195 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1196 {
1197         int ret;
1198         TDB_DATA data;
1199         int32_t res;
1200
1201         ret = ctdb_control(ctdb, destnode, 0, 
1202                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1203                            ctdb, &data, &res, NULL, NULL);
1204         if (ret != 0 || res != 0) {
1205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1206                 return -1;
1207         }
1208
1209         if (data.dsize != sizeof(struct ctdb_statistics)) {
1210                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1211                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1212                       return -1;
1213         }
1214
1215         *status = *(struct ctdb_statistics *)data.dptr;
1216         talloc_free(data.dptr);
1217                         
1218         return 0;
1219 }
1220
1221 /*
1222   shutdown a remote ctdb node
1223  */
1224 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1225 {
1226         struct ctdb_client_control_state *state;
1227
1228         state = ctdb_control_send(ctdb, destnode, 0, 
1229                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1230                            NULL, &timeout, NULL);
1231         if (state == NULL) {
1232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1233                 return -1;
1234         }
1235
1236         return 0;
1237 }
1238
1239 /*
1240   get vnn map from a remote node
1241  */
1242 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1243 {
1244         int ret;
1245         TDB_DATA outdata;
1246         int32_t res;
1247         struct ctdb_vnn_map_wire *map;
1248
1249         ret = ctdb_control(ctdb, destnode, 0, 
1250                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1251                            mem_ctx, &outdata, &res, &timeout, NULL);
1252         if (ret != 0 || res != 0) {
1253                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1254                 return -1;
1255         }
1256         
1257         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1258         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1259             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1260                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1261                 return -1;
1262         }
1263
1264         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1265         CTDB_NO_MEMORY(ctdb, *vnnmap);
1266         (*vnnmap)->generation = map->generation;
1267         (*vnnmap)->size       = map->size;
1268         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1269
1270         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1271         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1272         talloc_free(outdata.dptr);
1273                     
1274         return 0;
1275 }
1276
1277
1278 /*
1279   get the recovery mode of a remote node
1280  */
1281 struct ctdb_client_control_state *
1282 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1283 {
1284         return ctdb_control_send(ctdb, destnode, 0, 
1285                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1286                            mem_ctx, &timeout, NULL);
1287 }
1288
1289 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1290 {
1291         int ret;
1292         int32_t res;
1293
1294         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1295         if (ret != 0) {
1296                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1297                 return -1;
1298         }
1299
1300         if (recmode) {
1301                 *recmode = (uint32_t)res;
1302         }
1303
1304         return 0;
1305 }
1306
1307 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1312         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1313 }
1314
1315
1316
1317
1318 /*
1319   set the recovery mode of a remote node
1320  */
1321 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1322 {
1323         int ret;
1324         TDB_DATA data;
1325         int32_t res;
1326
1327         data.dsize = sizeof(uint32_t);
1328         data.dptr = (unsigned char *)&recmode;
1329
1330         ret = ctdb_control(ctdb, destnode, 0, 
1331                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1332                            NULL, NULL, &res, &timeout, NULL);
1333         if (ret != 0 || res != 0) {
1334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1335                 return -1;
1336         }
1337
1338         return 0;
1339 }
1340
1341
1342
1343 /*
1344   get the recovery master of a remote node
1345  */
1346 struct ctdb_client_control_state *
1347 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1348                         struct timeval timeout, uint32_t destnode)
1349 {
1350         return ctdb_control_send(ctdb, destnode, 0, 
1351                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1352                            mem_ctx, &timeout, NULL);
1353 }
1354
1355 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1356 {
1357         int ret;
1358         int32_t res;
1359
1360         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1361         if (ret != 0) {
1362                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1363                 return -1;
1364         }
1365
1366         if (recmaster) {
1367                 *recmaster = (uint32_t)res;
1368         }
1369
1370         return 0;
1371 }
1372
1373 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1374 {
1375         struct ctdb_client_control_state *state;
1376
1377         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1378         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1379 }
1380
1381
1382 /*
1383   set the recovery master of a remote node
1384  */
1385 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1386 {
1387         int ret;
1388         TDB_DATA data;
1389         int32_t res;
1390
1391         ZERO_STRUCT(data);
1392         data.dsize = sizeof(uint32_t);
1393         data.dptr = (unsigned char *)&recmaster;
1394
1395         ret = ctdb_control(ctdb, destnode, 0, 
1396                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1397                            NULL, NULL, &res, &timeout, NULL);
1398         if (ret != 0 || res != 0) {
1399                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1400                 return -1;
1401         }
1402
1403         return 0;
1404 }
1405
1406
1407 /*
1408   get a list of databases off a remote node
1409  */
1410 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1411                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1412 {
1413         int ret;
1414         TDB_DATA outdata;
1415         int32_t res;
1416
1417         ret = ctdb_control(ctdb, destnode, 0, 
1418                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1419                            mem_ctx, &outdata, &res, &timeout, NULL);
1420         if (ret != 0 || res != 0) {
1421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1422                 return -1;
1423         }
1424
1425         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1426         talloc_free(outdata.dptr);
1427                     
1428         return 0;
1429 }
1430
1431 /*
1432   get a list of nodes (vnn and flags ) from a remote node
1433  */
1434 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1435                 struct timeval timeout, uint32_t destnode, 
1436                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1437 {
1438         int ret;
1439         TDB_DATA outdata;
1440         int32_t res;
1441
1442         ret = ctdb_control(ctdb, destnode, 0, 
1443                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1444                            mem_ctx, &outdata, &res, &timeout, NULL);
1445         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1446                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1447                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1448         }
1449         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1450                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1451                 return -1;
1452         }
1453
1454         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1455         talloc_free(outdata.dptr);
1456                     
1457         return 0;
1458 }
1459
1460 /*
1461   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1462  */
1463 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1464                 struct timeval timeout, uint32_t destnode, 
1465                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1466 {
1467         int ret, i, len;
1468         TDB_DATA outdata;
1469         struct ctdb_node_mapv4 *nodemapv4;
1470         int32_t res;
1471
1472         ret = ctdb_control(ctdb, destnode, 0, 
1473                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1474                            mem_ctx, &outdata, &res, &timeout, NULL);
1475         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1476                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1477                 return -1;
1478         }
1479
1480         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1481
1482         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1483         (*nodemap) = talloc_zero_size(mem_ctx, len);
1484         CTDB_NO_MEMORY(ctdb, (*nodemap));
1485
1486         (*nodemap)->num = nodemapv4->num;
1487         for (i=0; i<nodemapv4->num; i++) {
1488                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1489                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1490                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1491                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1492         }
1493                 
1494         talloc_free(outdata.dptr);
1495                     
1496         return 0;
1497 }
1498
1499 /*
1500   drop the transport, reload the nodes file and restart the transport
1501  */
1502 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1503                     struct timeval timeout, uint32_t destnode)
1504 {
1505         int ret;
1506         int32_t res;
1507
1508         ret = ctdb_control(ctdb, destnode, 0, 
1509                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1510                            NULL, NULL, &res, &timeout, NULL);
1511         if (ret != 0 || res != 0) {
1512                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1513                 return -1;
1514         }
1515
1516         return 0;
1517 }
1518
1519
1520 /*
1521   set vnn map on a node
1522  */
1523 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1524                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1525 {
1526         int ret;
1527         TDB_DATA data;
1528         int32_t res;
1529         struct ctdb_vnn_map_wire *map;
1530         size_t len;
1531
1532         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1533         map = talloc_size(mem_ctx, len);
1534         CTDB_NO_MEMORY(ctdb, map);
1535
1536         map->generation = vnnmap->generation;
1537         map->size = vnnmap->size;
1538         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1539         
1540         data.dsize = len;
1541         data.dptr  = (uint8_t *)map;
1542
1543         ret = ctdb_control(ctdb, destnode, 0, 
1544                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1545                            NULL, NULL, &res, &timeout, NULL);
1546         if (ret != 0 || res != 0) {
1547                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1548                 return -1;
1549         }
1550
1551         talloc_free(map);
1552
1553         return 0;
1554 }
1555
1556
1557 /*
1558   async send for pull database
1559  */
1560 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1561         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1562         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1563 {
1564         TDB_DATA indata;
1565         struct ctdb_control_pulldb *pull;
1566         struct ctdb_client_control_state *state;
1567
1568         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1569         CTDB_NO_MEMORY_NULL(ctdb, pull);
1570
1571         pull->db_id   = dbid;
1572         pull->lmaster = lmaster;
1573
1574         indata.dsize = sizeof(struct ctdb_control_pulldb);
1575         indata.dptr  = (unsigned char *)pull;
1576
1577         state = ctdb_control_send(ctdb, destnode, 0, 
1578                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1579                                   mem_ctx, &timeout, NULL);
1580         talloc_free(pull);
1581
1582         return state;
1583 }
1584
1585 /*
1586   async recv for pull database
1587  */
1588 int ctdb_ctrl_pulldb_recv(
1589         struct ctdb_context *ctdb, 
1590         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1591         TDB_DATA *outdata)
1592 {
1593         int ret;
1594         int32_t res;
1595
1596         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1597         if ( (ret != 0) || (res != 0) ){
1598                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1599                 return -1;
1600         }
1601
1602         return 0;
1603 }
1604
1605 /*
1606   pull all keys and records for a specific database on a node
1607  */
1608 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1609                 uint32_t dbid, uint32_t lmaster, 
1610                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1611                 TDB_DATA *outdata)
1612 {
1613         struct ctdb_client_control_state *state;
1614
1615         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1616                                       timeout);
1617         
1618         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1619 }
1620
1621
1622 /*
1623   change dmaster for all keys in the database to the new value
1624  */
1625 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1626                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1627 {
1628         int ret;
1629         TDB_DATA indata;
1630         int32_t res;
1631
1632         indata.dsize = 2*sizeof(uint32_t);
1633         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1634
1635         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1636         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1637
1638         ret = ctdb_control(ctdb, destnode, 0, 
1639                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1640                            NULL, NULL, &res, &timeout, NULL);
1641         if (ret != 0 || res != 0) {
1642                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1643                 return -1;
1644         }
1645
1646         return 0;
1647 }
1648
1649 /*
1650   ping a node, return number of clients connected
1651  */
1652 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1653 {
1654         int ret;
1655         int32_t res;
1656
1657         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1658                            tdb_null, NULL, NULL, &res, NULL, NULL);
1659         if (ret != 0) {
1660                 return -1;
1661         }
1662         return res;
1663 }
1664
1665 /*
1666   find the real path to a ltdb 
1667  */
1668 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1669                    const char **path)
1670 {
1671         int ret;
1672         int32_t res;
1673         TDB_DATA data;
1674
1675         data.dptr = (uint8_t *)&dbid;
1676         data.dsize = sizeof(dbid);
1677
1678         ret = ctdb_control(ctdb, destnode, 0, 
1679                            CTDB_CONTROL_GETDBPATH, 0, data, 
1680                            mem_ctx, &data, &res, &timeout, NULL);
1681         if (ret != 0 || res != 0) {
1682                 return -1;
1683         }
1684
1685         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1686         if ((*path) == NULL) {
1687                 return -1;
1688         }
1689
1690         talloc_free(data.dptr);
1691
1692         return 0;
1693 }
1694
1695 /*
1696   find the name of a db 
1697  */
1698 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1699                    const char **name)
1700 {
1701         int ret;
1702         int32_t res;
1703         TDB_DATA data;
1704
1705         data.dptr = (uint8_t *)&dbid;
1706         data.dsize = sizeof(dbid);
1707
1708         ret = ctdb_control(ctdb, destnode, 0, 
1709                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1710                            mem_ctx, &data, &res, &timeout, NULL);
1711         if (ret != 0 || res != 0) {
1712                 return -1;
1713         }
1714
1715         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1716         if ((*name) == NULL) {
1717                 return -1;
1718         }
1719
1720         talloc_free(data.dptr);
1721
1722         return 0;
1723 }
1724
1725 /*
1726   get the health status of a db
1727  */
1728 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1729                           struct timeval timeout,
1730                           uint32_t destnode,
1731                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1732                           const char **reason)
1733 {
1734         int ret;
1735         int32_t res;
1736         TDB_DATA data;
1737
1738         data.dptr = (uint8_t *)&dbid;
1739         data.dsize = sizeof(dbid);
1740
1741         ret = ctdb_control(ctdb, destnode, 0,
1742                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1743                            mem_ctx, &data, &res, &timeout, NULL);
1744         if (ret != 0 || res != 0) {
1745                 return -1;
1746         }
1747
1748         if (data.dsize == 0) {
1749                 (*reason) = NULL;
1750                 return 0;
1751         }
1752
1753         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1754         if ((*reason) == NULL) {
1755                 return -1;
1756         }
1757
1758         talloc_free(data.dptr);
1759
1760         return 0;
1761 }
1762
1763 /*
1764   create a database
1765  */
1766 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1767                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1768 {
1769         int ret;
1770         int32_t res;
1771         TDB_DATA data;
1772
1773         data.dptr = discard_const(name);
1774         data.dsize = strlen(name)+1;
1775
1776         ret = ctdb_control(ctdb, destnode, 0, 
1777                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1778                            0, data, 
1779                            mem_ctx, &data, &res, &timeout, NULL);
1780
1781         if (ret != 0 || res != 0) {
1782                 return -1;
1783         }
1784
1785         return 0;
1786 }
1787
1788 /*
1789   get debug level on a node
1790  */
1791 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1792 {
1793         int ret;
1794         int32_t res;
1795         TDB_DATA data;
1796
1797         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1798                            ctdb, &data, &res, NULL, NULL);
1799         if (ret != 0 || res != 0) {
1800                 return -1;
1801         }
1802         if (data.dsize != sizeof(int32_t)) {
1803                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1804                          (unsigned)data.dsize));
1805                 return -1;
1806         }
1807         *level = *(int32_t *)data.dptr;
1808         talloc_free(data.dptr);
1809         return 0;
1810 }
1811
1812 /*
1813   set debug level on a node
1814  */
1815 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1816 {
1817         int ret;
1818         int32_t res;
1819         TDB_DATA data;
1820
1821         data.dptr = (uint8_t *)&level;
1822         data.dsize = sizeof(level);
1823
1824         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1825                            NULL, NULL, &res, NULL, NULL);
1826         if (ret != 0 || res != 0) {
1827                 return -1;
1828         }
1829         return 0;
1830 }
1831
1832
1833 /*
1834   get a list of connected nodes
1835  */
1836 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1837                                 struct timeval timeout,
1838                                 TALLOC_CTX *mem_ctx,
1839                                 uint32_t *num_nodes)
1840 {
1841         struct ctdb_node_map *map=NULL;
1842         int ret, i;
1843         uint32_t *nodes;
1844
1845         *num_nodes = 0;
1846
1847         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1848         if (ret != 0) {
1849                 return NULL;
1850         }
1851
1852         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1853         if (nodes == NULL) {
1854                 return NULL;
1855         }
1856
1857         for (i=0;i<map->num;i++) {
1858                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1859                         nodes[*num_nodes] = map->nodes[i].pnn;
1860                         (*num_nodes)++;
1861                 }
1862         }
1863
1864         return nodes;
1865 }
1866
1867
1868 /*
1869   reset remote status
1870  */
1871 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1872 {
1873         int ret;
1874         int32_t res;
1875
1876         ret = ctdb_control(ctdb, destnode, 0, 
1877                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1878                            NULL, NULL, &res, NULL, NULL);
1879         if (ret != 0 || res != 0) {
1880                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1881                 return -1;
1882         }
1883         return 0;
1884 }
1885
1886 /*
1887   attach to a specific database - client call
1888 */
1889 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1890                                     struct timeval timeout,
1891                                     const char *name,
1892                                     bool persistent,
1893                                     uint32_t tdb_flags)
1894 {
1895         struct ctdb_db_context *ctdb_db;
1896         TDB_DATA data;
1897         int ret;
1898         int32_t res;
1899
1900         ctdb_db = ctdb_db_handle(ctdb, name);
1901         if (ctdb_db) {
1902                 return ctdb_db;
1903         }
1904
1905         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1906         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1907
1908         ctdb_db->ctdb = ctdb;
1909         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1910         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1911
1912         data.dptr = discard_const(name);
1913         data.dsize = strlen(name)+1;
1914
1915         /* tell ctdb daemon to attach */
1916         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1918                            0, data, ctdb_db, &data, &res, NULL, NULL);
1919         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1920                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1921                 talloc_free(ctdb_db);
1922                 return NULL;
1923         }
1924         
1925         ctdb_db->db_id = *(uint32_t *)data.dptr;
1926         talloc_free(data.dptr);
1927
1928         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1929         if (ret != 0) {
1930                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1931                 talloc_free(ctdb_db);
1932                 return NULL;
1933         }
1934
1935         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1936         if (ctdb->valgrinding) {
1937                 tdb_flags |= TDB_NOMMAP;
1938         }
1939         tdb_flags |= TDB_DISALLOW_NESTING;
1940
1941         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1942         if (ctdb_db->ltdb == NULL) {
1943                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1944                 talloc_free(ctdb_db);
1945                 return NULL;
1946         }
1947
1948         ctdb_db->persistent = persistent;
1949
1950         DLIST_ADD(ctdb->db_list, ctdb_db);
1951
1952         /* add well known functions */
1953         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1954         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1955         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1956
1957         return ctdb_db;
1958 }
1959
1960
1961 /*
1962   setup a call for a database
1963  */
1964 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1965 {
1966         struct ctdb_registered_call *call;
1967
1968 #if 0
1969         TDB_DATA data;
1970         int32_t status;
1971         struct ctdb_control_set_call c;
1972         int ret;
1973
1974         /* this is no longer valid with the separate daemon architecture */
1975         c.db_id = ctdb_db->db_id;
1976         c.fn    = fn;
1977         c.id    = id;
1978
1979         data.dptr = (uint8_t *)&c;
1980         data.dsize = sizeof(c);
1981
1982         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1983                            data, NULL, NULL, &status, NULL, NULL);
1984         if (ret != 0 || status != 0) {
1985                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1986                 return -1;
1987         }
1988 #endif
1989
1990         /* also register locally */
1991         call = talloc(ctdb_db, struct ctdb_registered_call);
1992         call->fn = fn;
1993         call->id = id;
1994
1995         DLIST_ADD(ctdb_db->calls, call);        
1996         return 0;
1997 }
1998
1999
2000 struct traverse_state {
2001         bool done;
2002         uint32_t count;
2003         ctdb_traverse_func fn;
2004         void *private_data;
2005         bool listemptyrecords;
2006 };
2007
2008 /*
2009   called on each key during a ctdb_traverse
2010  */
2011 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2012 {
2013         struct traverse_state *state = (struct traverse_state *)p;
2014         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2015         TDB_DATA key;
2016
2017         if (data.dsize < sizeof(uint32_t) ||
2018             d->length != data.dsize) {
2019                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2020                 state->done = True;
2021                 return;
2022         }
2023
2024         key.dsize = d->keylen;
2025         key.dptr  = &d->data[0];
2026         data.dsize = d->datalen;
2027         data.dptr = &d->data[d->keylen];
2028
2029         if (key.dsize == 0 && data.dsize == 0) {
2030                 /* end of traverse */
2031                 state->done = True;
2032                 return;
2033         }
2034
2035         if (!state->listemptyrecords &&
2036             data.dsize == sizeof(struct ctdb_ltdb_header))
2037         {
2038                 /* empty records are deleted records in ctdb */
2039                 return;
2040         }
2041
2042         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2043                 state->done = True;
2044         }
2045
2046         state->count++;
2047 }
2048
2049 /**
2050  * start a cluster wide traverse, calling the supplied fn on each record
2051  * return the number of records traversed, or -1 on error
2052  *
2053  * Extendet variant with a flag to signal whether empty records should
2054  * be listed.
2055  */
2056 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2057                              ctdb_traverse_func fn,
2058                              bool withemptyrecords,
2059                              void *private_data)
2060 {
2061         TDB_DATA data;
2062         struct ctdb_traverse_start t;
2063         int32_t status;
2064         int ret;
2065         uint64_t srvid = (getpid() | 0xFLL<<60);
2066         struct traverse_state state;
2067
2068         state.done = False;
2069         state.count = 0;
2070         state.private_data = private_data;
2071         state.fn = fn;
2072         state.listemptyrecords = withemptyrecords;
2073
2074         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2075         if (ret != 0) {
2076                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2077                 return -1;
2078         }
2079
2080         t.db_id = ctdb_db->db_id;
2081         t.srvid = srvid;
2082         t.reqid = 0;
2083         t.withemptyrecords = withemptyrecords;
2084
2085         data.dptr = (uint8_t *)&t;
2086         data.dsize = sizeof(t);
2087
2088         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
2089                            data, NULL, NULL, &status, NULL, NULL);
2090         if (ret != 0 || status != 0) {
2091                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2092                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2093                 return -1;
2094         }
2095
2096         while (!state.done) {
2097                 event_loop_once(ctdb_db->ctdb->ev);
2098         }
2099
2100         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2101         if (ret != 0) {
2102                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2103                 return -1;
2104         }
2105
2106         return state.count;
2107 }
2108
2109 /**
2110  * start a cluster wide traverse, calling the supplied fn on each record
2111  * return the number of records traversed, or -1 on error
2112  *
2113  * Standard version which does not list the empty records:
2114  * These are considered deleted.
2115  */
2116 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2117 {
2118         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2119 }
2120
2121 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2122 /*
2123   called on each key during a catdb
2124  */
2125 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2126 {
2127         int i;
2128         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2129         FILE *f = c->f;
2130         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2131
2132         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2133         for (i=0;i<key.dsize;i++) {
2134                 if (ISASCII(key.dptr[i])) {
2135                         fprintf(f, "%c", key.dptr[i]);
2136                 } else {
2137                         fprintf(f, "\\%02X", key.dptr[i]);
2138                 }
2139         }
2140         fprintf(f, "\"\n");
2141
2142         fprintf(f, "dmaster: %u\n", h->dmaster);
2143         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2144
2145         if (c->printlmaster && ctdb->vnn_map != NULL) {
2146                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2147         }
2148
2149         fprintf(f, "flags: 0x%08x", h->flags);
2150         if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2151         if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2152         if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2153         if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2154         if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2155         if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2156         if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2157         fprintf(f, "\n");
2158
2159         if (c->printdatasize) {
2160                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2161         } else {
2162                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2163                 for (i=sizeof(*h);i<data.dsize;i++) {
2164                         if (ISASCII(data.dptr[i])) {
2165                                 fprintf(f, "%c", data.dptr[i]);
2166                         } else {
2167                                 fprintf(f, "\\%02X", data.dptr[i]);
2168                         }
2169                 }
2170                 fprintf(f, "\"\n");
2171         }
2172
2173         fprintf(f, "\n");
2174
2175         return 0;
2176 }
2177
2178 /*
2179   convenience function to list all keys to stdout
2180  */
2181 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2182                  struct ctdb_dump_db_context *ctx)
2183 {
2184         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2185                                  ctx->printemptyrecords, ctx);
2186 }
2187
2188 /*
2189   get the pid of a ctdb daemon
2190  */
2191 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2192 {
2193         int ret;
2194         int32_t res;
2195
2196         ret = ctdb_control(ctdb, destnode, 0, 
2197                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2198                            NULL, NULL, &res, &timeout, NULL);
2199         if (ret != 0) {
2200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2201                 return -1;
2202         }
2203
2204         *pid = res;
2205
2206         return 0;
2207 }
2208
2209
2210 /*
2211   async freeze send control
2212  */
2213 struct ctdb_client_control_state *
2214 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2215 {
2216         return ctdb_control_send(ctdb, destnode, priority, 
2217                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2218                            mem_ctx, &timeout, NULL);
2219 }
2220
2221 /* 
2222    async freeze recv control
2223 */
2224 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2225 {
2226         int ret;
2227         int32_t res;
2228
2229         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2230         if ( (ret != 0) || (res != 0) ){
2231                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2232                 return -1;
2233         }
2234
2235         return 0;
2236 }
2237
2238 /*
2239   freeze databases of a certain priority
2240  */
2241 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2242 {
2243         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2244         struct ctdb_client_control_state *state;
2245         int ret;
2246
2247         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2248         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2249         talloc_free(tmp_ctx);
2250
2251         return ret;
2252 }
2253
2254 /* Freeze all databases */
2255 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2256 {
2257         int i;
2258
2259         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2260                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2261                         return -1;
2262                 }
2263         }
2264         return 0;
2265 }
2266
2267 /*
2268   thaw databases of a certain priority
2269  */
2270 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2271 {
2272         int ret;
2273         int32_t res;
2274
2275         ret = ctdb_control(ctdb, destnode, priority, 
2276                            CTDB_CONTROL_THAW, 0, tdb_null, 
2277                            NULL, NULL, &res, &timeout, NULL);
2278         if (ret != 0 || res != 0) {
2279                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2280                 return -1;
2281         }
2282
2283         return 0;
2284 }
2285
2286 /* thaw all databases */
2287 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2288 {
2289         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2290 }
2291
2292 /*
2293   get pnn of a node, or -1
2294  */
2295 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2296 {
2297         int ret;
2298         int32_t res;
2299
2300         ret = ctdb_control(ctdb, destnode, 0, 
2301                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2302                            NULL, NULL, &res, &timeout, NULL);
2303         if (ret != 0) {
2304                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2305                 return -1;
2306         }
2307
2308         return res;
2309 }
2310
2311 /*
2312   get the monitoring mode of a remote node
2313  */
2314 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2315 {
2316         int ret;
2317         int32_t res;
2318
2319         ret = ctdb_control(ctdb, destnode, 0, 
2320                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2321                            NULL, NULL, &res, &timeout, NULL);
2322         if (ret != 0) {
2323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2324                 return -1;
2325         }
2326
2327         *monmode = res;
2328
2329         return 0;
2330 }
2331
2332
2333 /*
2334  set the monitoring mode of a remote node to active
2335  */
2336 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2337 {
2338         int ret;
2339         
2340
2341         ret = ctdb_control(ctdb, destnode, 0, 
2342                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2343                            NULL, NULL,NULL, &timeout, NULL);
2344         if (ret != 0) {
2345                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2346                 return -1;
2347         }
2348
2349         
2350
2351         return 0;
2352 }
2353
2354 /*
2355   set the monitoring mode of a remote node to disable
2356  */
2357 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2358 {
2359         int ret;
2360         
2361
2362         ret = ctdb_control(ctdb, destnode, 0, 
2363                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2364                            NULL, NULL, NULL, &timeout, NULL);
2365         if (ret != 0) {
2366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2367                 return -1;
2368         }
2369
2370         
2371
2372         return 0;
2373 }
2374
2375
2376
2377 /* 
2378   sent to a node to make it take over an ip address
2379 */
2380 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2381                           uint32_t destnode, struct ctdb_public_ip *ip)
2382 {
2383         TDB_DATA data;
2384         struct ctdb_public_ipv4 ipv4;
2385         int ret;
2386         int32_t res;
2387
2388         if (ip->addr.sa.sa_family == AF_INET) {
2389                 ipv4.pnn = ip->pnn;
2390                 ipv4.sin = ip->addr.ip;
2391
2392                 data.dsize = sizeof(ipv4);
2393                 data.dptr  = (uint8_t *)&ipv4;
2394
2395                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2396                            NULL, &res, &timeout, NULL);
2397         } else {
2398                 data.dsize = sizeof(*ip);
2399                 data.dptr  = (uint8_t *)ip;
2400
2401                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2402                            NULL, &res, &timeout, NULL);
2403         }
2404
2405         if (ret != 0 || res != 0) {
2406                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2407                 return -1;
2408         }
2409
2410         return 0;       
2411 }
2412
2413
2414 /* 
2415   sent to a node to make it release an ip address
2416 */
2417 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2418                          uint32_t destnode, struct ctdb_public_ip *ip)
2419 {
2420         TDB_DATA data;
2421         struct ctdb_public_ipv4 ipv4;
2422         int ret;
2423         int32_t res;
2424
2425         if (ip->addr.sa.sa_family == AF_INET) {
2426                 ipv4.pnn = ip->pnn;
2427                 ipv4.sin = ip->addr.ip;
2428
2429                 data.dsize = sizeof(ipv4);
2430                 data.dptr  = (uint8_t *)&ipv4;
2431
2432                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2433                                    NULL, &res, &timeout, NULL);
2434         } else {
2435                 data.dsize = sizeof(*ip);
2436                 data.dptr  = (uint8_t *)ip;
2437
2438                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2439                                    NULL, &res, &timeout, NULL);
2440         }
2441
2442         if (ret != 0 || res != 0) {
2443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2444                 return -1;
2445         }
2446
2447         return 0;       
2448 }
2449
2450
2451 /*
2452   get a tunable
2453  */
2454 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2455                           struct timeval timeout, 
2456                           uint32_t destnode,
2457                           const char *name, uint32_t *value)
2458 {
2459         struct ctdb_control_get_tunable *t;
2460         TDB_DATA data, outdata;
2461         int32_t res;
2462         int ret;
2463
2464         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2465         data.dptr  = talloc_size(ctdb, data.dsize);
2466         CTDB_NO_MEMORY(ctdb, data.dptr);
2467
2468         t = (struct ctdb_control_get_tunable *)data.dptr;
2469         t->length = strlen(name)+1;
2470         memcpy(t->name, name, t->length);
2471
2472         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2473                            &outdata, &res, &timeout, NULL);
2474         talloc_free(data.dptr);
2475         if (ret != 0 || res != 0) {
2476                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2477                 return -1;
2478         }
2479
2480         if (outdata.dsize != sizeof(uint32_t)) {
2481                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2482                 talloc_free(outdata.dptr);
2483                 return -1;
2484         }
2485         
2486         *value = *(uint32_t *)outdata.dptr;
2487         talloc_free(outdata.dptr);
2488
2489         return 0;
2490 }
2491
2492 /*
2493   set a tunable
2494  */
2495 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2496                           struct timeval timeout, 
2497                           uint32_t destnode,
2498                           const char *name, uint32_t value)
2499 {
2500         struct ctdb_control_set_tunable *t;
2501         TDB_DATA data;
2502         int32_t res;
2503         int ret;
2504
2505         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2506         data.dptr  = talloc_size(ctdb, data.dsize);
2507         CTDB_NO_MEMORY(ctdb, data.dptr);
2508
2509         t = (struct ctdb_control_set_tunable *)data.dptr;
2510         t->length = strlen(name)+1;
2511         memcpy(t->name, name, t->length);
2512         t->value = value;
2513
2514         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2515                            NULL, &res, &timeout, NULL);
2516         talloc_free(data.dptr);
2517         if (ret != 0 || res != 0) {
2518                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2519                 return -1;
2520         }
2521
2522         return 0;
2523 }
2524
2525 /*
2526   list tunables
2527  */
2528 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2529                             struct timeval timeout, 
2530                             uint32_t destnode,
2531                             TALLOC_CTX *mem_ctx,
2532                             const char ***list, uint32_t *count)
2533 {
2534         TDB_DATA outdata;
2535         int32_t res;
2536         int ret;
2537         struct ctdb_control_list_tunable *t;
2538         char *p, *s, *ptr;
2539
2540         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2541                            mem_ctx, &outdata, &res, &timeout, NULL);
2542         if (ret != 0 || res != 0) {
2543                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2544                 return -1;
2545         }
2546
2547         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2548         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2549             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2550                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2551                 talloc_free(outdata.dptr);
2552                 return -1;              
2553         }
2554         
2555         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2556         CTDB_NO_MEMORY(ctdb, p);
2557
2558         talloc_free(outdata.dptr);
2559         
2560         (*list) = NULL;
2561         (*count) = 0;
2562
2563         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2564                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2565                 CTDB_NO_MEMORY(ctdb, *list);
2566                 (*list)[*count] = talloc_strdup(*list, s);
2567                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2568                 (*count)++;
2569         }
2570
2571         talloc_free(p);
2572
2573         return 0;
2574 }
2575
2576
2577 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2578                                    struct timeval timeout, uint32_t destnode,
2579                                    TALLOC_CTX *mem_ctx,
2580                                    uint32_t flags,
2581                                    struct ctdb_all_public_ips **ips)
2582 {
2583         int ret;
2584         TDB_DATA outdata;
2585         int32_t res;
2586
2587         ret = ctdb_control(ctdb, destnode, 0, 
2588                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2589                            mem_ctx, &outdata, &res, &timeout, NULL);
2590         if (ret == 0 && res == -1) {
2591                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2592                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2593         }
2594         if (ret != 0 || res != 0) {
2595           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2596                 return -1;
2597         }
2598
2599         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2600         talloc_free(outdata.dptr);
2601                     
2602         return 0;
2603 }
2604
2605 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2606                              struct timeval timeout, uint32_t destnode,
2607                              TALLOC_CTX *mem_ctx,
2608                              struct ctdb_all_public_ips **ips)
2609 {
2610         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2611                                               destnode, mem_ctx,
2612                                               0, ips);
2613 }
2614
2615 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2616                         struct timeval timeout, uint32_t destnode, 
2617                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2618 {
2619         int ret, i, len;
2620         TDB_DATA outdata;
2621         int32_t res;
2622         struct ctdb_all_public_ipsv4 *ipsv4;
2623
2624         ret = ctdb_control(ctdb, destnode, 0, 
2625                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2626                            mem_ctx, &outdata, &res, &timeout, NULL);
2627         if (ret != 0 || res != 0) {
2628                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2629                 return -1;
2630         }
2631
2632         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2633         len = offsetof(struct ctdb_all_public_ips, ips) +
2634                 ipsv4->num*sizeof(struct ctdb_public_ip);
2635         *ips = talloc_zero_size(mem_ctx, len);
2636         CTDB_NO_MEMORY(ctdb, *ips);
2637         (*ips)->num = ipsv4->num;
2638         for (i=0; i<ipsv4->num; i++) {
2639                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2640                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2641         }
2642
2643         talloc_free(outdata.dptr);
2644                     
2645         return 0;
2646 }
2647
2648 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2649                                  struct timeval timeout, uint32_t destnode,
2650                                  TALLOC_CTX *mem_ctx,
2651                                  const ctdb_sock_addr *addr,
2652                                  struct ctdb_control_public_ip_info **_info)
2653 {
2654         int ret;
2655         TDB_DATA indata;
2656         TDB_DATA outdata;
2657         int32_t res;
2658         struct ctdb_control_public_ip_info *info;
2659         uint32_t len;
2660         uint32_t i;
2661
2662         indata.dptr = discard_const_p(uint8_t, addr);
2663         indata.dsize = sizeof(*addr);
2664
2665         ret = ctdb_control(ctdb, destnode, 0,
2666                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2667                            mem_ctx, &outdata, &res, &timeout, NULL);
2668         if (ret != 0 || res != 0) {
2669                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2670                                 "failed ret:%d res:%d\n",
2671                                 ret, res));
2672                 return -1;
2673         }
2674
2675         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2676         if (len > outdata.dsize) {
2677                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2678                                 "returned invalid data with size %u > %u\n",
2679                                 (unsigned int)outdata.dsize,
2680                                 (unsigned int)len));
2681                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2682                 return -1;
2683         }
2684
2685         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2686         len += info->num*sizeof(struct ctdb_control_iface_info);
2687
2688         if (len > outdata.dsize) {
2689                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2690                                 "returned invalid data with size %u > %u\n",
2691                                 (unsigned int)outdata.dsize,
2692                                 (unsigned int)len));
2693                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2694                 return -1;
2695         }
2696
2697         /* make sure we null terminate the returned strings */
2698         for (i=0; i < info->num; i++) {
2699                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2700         }
2701
2702         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2703                                                                 outdata.dptr,
2704                                                                 outdata.dsize);
2705         talloc_free(outdata.dptr);
2706         if (*_info == NULL) {
2707                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2708                                 "talloc_memdup size %u failed\n",
2709                                 (unsigned int)outdata.dsize));
2710                 return -1;
2711         }
2712
2713         return 0;
2714 }
2715
2716 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2717                          struct timeval timeout, uint32_t destnode,
2718                          TALLOC_CTX *mem_ctx,
2719                          struct ctdb_control_get_ifaces **_ifaces)
2720 {
2721         int ret;
2722         TDB_DATA outdata;
2723         int32_t res;
2724         struct ctdb_control_get_ifaces *ifaces;
2725         uint32_t len;
2726         uint32_t i;
2727
2728         ret = ctdb_control(ctdb, destnode, 0,
2729                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2730                            mem_ctx, &outdata, &res, &timeout, NULL);
2731         if (ret != 0 || res != 0) {
2732                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2733                                 "failed ret:%d res:%d\n",
2734                                 ret, res));
2735                 return -1;
2736         }
2737
2738         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2739         if (len > outdata.dsize) {
2740                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2741                                 "returned invalid data with size %u > %u\n",
2742                                 (unsigned int)outdata.dsize,
2743                                 (unsigned int)len));
2744                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2745                 return -1;
2746         }
2747
2748         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2749         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2750
2751         if (len > outdata.dsize) {
2752                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2753                                 "returned invalid data with size %u > %u\n",
2754                                 (unsigned int)outdata.dsize,
2755                                 (unsigned int)len));
2756                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2757                 return -1;
2758         }
2759
2760         /* make sure we null terminate the returned strings */
2761         for (i=0; i < ifaces->num; i++) {
2762                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2763         }
2764
2765         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2766                                                                   outdata.dptr,
2767                                                                   outdata.dsize);
2768         talloc_free(outdata.dptr);
2769         if (*_ifaces == NULL) {
2770                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2771                                 "talloc_memdup size %u failed\n",
2772                                 (unsigned int)outdata.dsize));
2773                 return -1;
2774         }
2775
2776         return 0;
2777 }
2778
2779 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2780                              struct timeval timeout, uint32_t destnode,
2781                              TALLOC_CTX *mem_ctx,
2782                              const struct ctdb_control_iface_info *info)
2783 {
2784         int ret;
2785         TDB_DATA indata;
2786         int32_t res;
2787
2788         indata.dptr = discard_const_p(uint8_t, info);
2789         indata.dsize = sizeof(*info);
2790
2791         ret = ctdb_control(ctdb, destnode, 0,
2792                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2793                            mem_ctx, NULL, &res, &timeout, NULL);
2794         if (ret != 0 || res != 0) {
2795                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2796                                 "failed ret:%d res:%d\n",
2797                                 ret, res));
2798                 return -1;
2799         }
2800
2801         return 0;
2802 }
2803
2804 /*
2805   set/clear the permanent disabled bit on a remote node
2806  */
2807 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2808                        uint32_t set, uint32_t clear)
2809 {
2810         int ret;
2811         TDB_DATA data;
2812         struct ctdb_node_map *nodemap=NULL;
2813         struct ctdb_node_flag_change c;
2814         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2815         uint32_t recmaster;
2816         uint32_t *nodes;
2817
2818
2819         /* find the recovery master */
2820         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2821         if (ret != 0) {
2822                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2823                 talloc_free(tmp_ctx);
2824                 return ret;
2825         }
2826
2827
2828         /* read the node flags from the recmaster */
2829         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2830         if (ret != 0) {
2831                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2832                 talloc_free(tmp_ctx);
2833                 return -1;
2834         }
2835         if (destnode >= nodemap->num) {
2836                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2837                 talloc_free(tmp_ctx);
2838                 return -1;
2839         }
2840
2841         c.pnn       = destnode;
2842         c.old_flags = nodemap->nodes[destnode].flags;
2843         c.new_flags = c.old_flags;
2844         c.new_flags |= set;
2845         c.new_flags &= ~clear;
2846
2847         data.dsize = sizeof(c);
2848         data.dptr = (unsigned char *)&c;
2849
2850         /* send the flags update to all connected nodes */
2851         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2852
2853         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2854                                         nodes, 0,
2855                                         timeout, false, data,
2856                                         NULL, NULL,
2857                                         NULL) != 0) {
2858                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2859
2860                 talloc_free(tmp_ctx);
2861                 return -1;
2862         }
2863
2864         talloc_free(tmp_ctx);
2865         return 0;
2866 }
2867
2868
2869 /*
2870   get all tunables
2871  */
2872 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2873                                struct timeval timeout, 
2874                                uint32_t destnode,
2875                                struct ctdb_tunable *tunables)
2876 {
2877         TDB_DATA outdata;
2878         int ret;
2879         int32_t res;
2880
2881         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2882                            &outdata, &res, &timeout, NULL);
2883         if (ret != 0 || res != 0) {
2884                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2885                 return -1;
2886         }
2887
2888         if (outdata.dsize != sizeof(*tunables)) {
2889                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2890                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2891                 return -1;              
2892         }
2893
2894         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2895         talloc_free(outdata.dptr);
2896         return 0;
2897 }
2898
2899 /*
2900   add a public address to a node
2901  */
2902 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2903                       struct timeval timeout, 
2904                       uint32_t destnode,
2905                       struct ctdb_control_ip_iface *pub)
2906 {
2907         TDB_DATA data;
2908         int32_t res;
2909         int ret;
2910
2911         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2912         data.dptr  = (unsigned char *)pub;
2913
2914         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2915                            NULL, &res, &timeout, NULL);
2916         if (ret != 0 || res != 0) {
2917                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2918                 return -1;
2919         }
2920
2921         return 0;
2922 }
2923
2924 /*
2925   delete a public address from a node
2926  */
2927 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2928                       struct timeval timeout, 
2929                       uint32_t destnode,
2930                       struct ctdb_control_ip_iface *pub)
2931 {
2932         TDB_DATA data;
2933         int32_t res;
2934         int ret;
2935
2936         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2937         data.dptr  = (unsigned char *)pub;
2938
2939         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2940                            NULL, &res, &timeout, NULL);
2941         if (ret != 0 || res != 0) {
2942                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2943                 return -1;
2944         }
2945
2946         return 0;
2947 }
2948
2949 /*
2950   kill a tcp connection
2951  */
2952 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2953                       struct timeval timeout, 
2954                       uint32_t destnode,
2955                       struct ctdb_control_killtcp *killtcp)
2956 {
2957         TDB_DATA data;
2958         int32_t res;
2959         int ret;
2960
2961         data.dsize = sizeof(struct ctdb_control_killtcp);
2962         data.dptr  = (unsigned char *)killtcp;
2963
2964         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2965                            NULL, &res, &timeout, NULL);
2966         if (ret != 0 || res != 0) {
2967                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2968                 return -1;
2969         }
2970
2971         return 0;
2972 }
2973
2974 /*
2975   send a gratious arp
2976  */
2977 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2978                       struct timeval timeout, 
2979                       uint32_t destnode,
2980                       ctdb_sock_addr *addr,
2981                       const char *ifname)
2982 {
2983         TDB_DATA data;
2984         int32_t res;
2985         int ret, len;
2986         struct ctdb_control_gratious_arp *gratious_arp;
2987         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2988
2989
2990         len = strlen(ifname)+1;
2991         gratious_arp = talloc_size(tmp_ctx, 
2992                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2993         CTDB_NO_MEMORY(ctdb, gratious_arp);
2994
2995         gratious_arp->addr = *addr;
2996         gratious_arp->len = len;
2997         memcpy(&gratious_arp->iface[0], ifname, len);
2998
2999
3000         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3001         data.dptr  = (unsigned char *)gratious_arp;
3002
3003         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3004                            NULL, &res, &timeout, NULL);
3005         if (ret != 0 || res != 0) {
3006                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3007                 talloc_free(tmp_ctx);
3008                 return -1;
3009         }
3010
3011         talloc_free(tmp_ctx);
3012         return 0;
3013 }
3014
3015 /*
3016   get a list of all tcp tickles that a node knows about for a particular vnn
3017  */
3018 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3019                               struct timeval timeout, uint32_t destnode, 
3020                               TALLOC_CTX *mem_ctx, 
3021                               ctdb_sock_addr *addr,
3022                               struct ctdb_control_tcp_tickle_list **list)
3023 {
3024         int ret;
3025         TDB_DATA data, outdata;
3026         int32_t status;
3027
3028         data.dptr = (uint8_t*)addr;
3029         data.dsize = sizeof(ctdb_sock_addr);
3030
3031         ret = ctdb_control(ctdb, destnode, 0, 
3032                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3033                            mem_ctx, &outdata, &status, NULL, NULL);
3034         if (ret != 0 || status != 0) {
3035                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3036                 return -1;
3037         }
3038
3039         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3040
3041         return status;
3042 }
3043
3044 /*
3045   register a server id
3046  */
3047 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3048                       struct timeval timeout, 
3049                       struct ctdb_server_id *id)
3050 {
3051         TDB_DATA data;
3052         int32_t res;
3053         int ret;
3054
3055         data.dsize = sizeof(struct ctdb_server_id);
3056         data.dptr  = (unsigned char *)id;
3057
3058         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3059                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3060                         0, data, NULL,
3061                         NULL, &res, &timeout, NULL);
3062         if (ret != 0 || res != 0) {
3063                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3064                 return -1;
3065         }
3066
3067         return 0;
3068 }
3069
3070 /*
3071   unregister a server id
3072  */
3073 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3074                       struct timeval timeout, 
3075                       struct ctdb_server_id *id)
3076 {
3077         TDB_DATA data;
3078         int32_t res;
3079         int ret;
3080
3081         data.dsize = sizeof(struct ctdb_server_id);
3082         data.dptr  = (unsigned char *)id;
3083
3084         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3085                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3086                         0, data, NULL,
3087                         NULL, &res, &timeout, NULL);
3088         if (ret != 0 || res != 0) {
3089                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3090                 return -1;
3091         }
3092
3093         return 0;
3094 }
3095
3096
3097 /*
3098   check if a server id exists
3099
3100   if a server id does exist, return *status == 1, otherwise *status == 0
3101  */
3102 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3103                       struct timeval timeout, 
3104                       uint32_t destnode,
3105                       struct ctdb_server_id *id,
3106                       uint32_t *status)
3107 {
3108         TDB_DATA data;
3109         int32_t res;
3110         int ret;
3111
3112         data.dsize = sizeof(struct ctdb_server_id);
3113         data.dptr  = (unsigned char *)id;
3114
3115         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3116                         0, data, NULL,
3117                         NULL, &res, &timeout, NULL);
3118         if (ret != 0) {
3119                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3120                 return -1;
3121         }
3122
3123         if (res) {
3124                 *status = 1;
3125         } else {
3126                 *status = 0;
3127         }
3128
3129         return 0;
3130 }
3131
3132 /*
3133    get the list of server ids that are registered on a node
3134 */
3135 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3136                 TALLOC_CTX *mem_ctx,
3137                 struct timeval timeout, uint32_t destnode, 
3138                 struct ctdb_server_id_list **svid_list)
3139 {
3140         int ret;
3141         TDB_DATA outdata;
3142         int32_t res;
3143
3144         ret = ctdb_control(ctdb, destnode, 0, 
3145                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3146                            mem_ctx, &outdata, &res, &timeout, NULL);
3147         if (ret != 0 || res != 0) {
3148                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3149                 return -1;
3150         }
3151
3152         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3153                     
3154         return 0;
3155 }
3156
3157 /*
3158   initialise the ctdb daemon for client applications
3159
3160   NOTE: In current code the daemon does not fork. This is for testing purposes only
3161   and to simplify the code.
3162 */
3163 struct ctdb_context *ctdb_init(struct event_context *ev)
3164 {
3165         int ret;
3166         struct ctdb_context *ctdb;
3167
3168         ctdb = talloc_zero(ev, struct ctdb_context);
3169         if (ctdb == NULL) {
3170                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3171                 return NULL;
3172         }
3173         ctdb->ev  = ev;
3174         ctdb->idr = idr_init(ctdb);
3175         /* Wrap early to exercise code. */
3176         ctdb->lastid = INT_MAX-200;
3177         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3178
3179         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3180         if (ret != 0) {
3181                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3182                 talloc_free(ctdb);
3183                 return NULL;
3184         }
3185
3186         ctdb->statistics.statistics_start_time = timeval_current();
3187
3188         return ctdb;
3189 }
3190
3191
3192 /*
3193   set some ctdb flags
3194 */
3195 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3196 {
3197         ctdb->flags |= flags;
3198 }
3199
3200 /*
3201   setup the local socket name
3202 */
3203 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3204 {
3205         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3206         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3207
3208         return 0;
3209 }
3210
3211 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3212 {
3213         return ctdb->daemon.name;
3214 }
3215
3216 /*
3217   return the pnn of this node
3218 */
3219 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3220 {
3221         return ctdb->pnn;
3222 }
3223
3224
3225 /*
3226   get the uptime of a remote node
3227  */
3228 struct ctdb_client_control_state *
3229 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3230 {
3231         return ctdb_control_send(ctdb, destnode, 0, 
3232                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3233                            mem_ctx, &timeout, NULL);
3234 }
3235
3236 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3237 {
3238         int ret;
3239         int32_t res;
3240         TDB_DATA outdata;
3241
3242         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3243         if (ret != 0 || res != 0) {
3244                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3245                 return -1;
3246         }
3247
3248         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3249
3250         return 0;
3251 }
3252
3253 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3254 {
3255         struct ctdb_client_control_state *state;
3256
3257         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3258         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3259 }
3260
3261 /*
3262   send a control to execute the "recovered" event script on a node
3263  */
3264 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3265 {
3266         int ret;
3267         int32_t status;
3268
3269         ret = ctdb_control(ctdb, destnode, 0, 
3270                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3271                            NULL, NULL, &status, &timeout, NULL);
3272         if (ret != 0 || status != 0) {
3273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3274                 return -1;
3275         }
3276
3277         return 0;
3278 }
3279
3280 /* 
3281   callback for the async helpers used when sending the same control
3282   to multiple nodes in parallell.
3283 */
3284 static void async_callback(struct ctdb_client_control_state *state)
3285 {
3286         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3287         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3288         int ret;
3289         TDB_DATA outdata;
3290         int32_t res;
3291         uint32_t destnode = state->c->hdr.destnode;
3292
3293         /* one more node has responded with recmode data */
3294         data->count--;
3295
3296         /* if we failed to push the db, then return an error and let
3297            the main loop try again.
3298         */
3299         if (state->state != CTDB_CONTROL_DONE) {
3300                 if ( !data->dont_log_errors) {
3301                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3302                 }
3303                 data->fail_count++;
3304                 if (data->fail_callback) {
3305                         data->fail_callback(ctdb, destnode, res, outdata,
3306                                         data->callback_data);
3307                 }
3308                 return;
3309         }
3310         
3311         state->async.fn = NULL;
3312
3313         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3314         if ((ret != 0) || (res != 0)) {
3315                 if ( !data->dont_log_errors) {
3316                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3317                 }
3318                 data->fail_count++;
3319                 if (data->fail_callback) {
3320                         data->fail_callback(ctdb, destnode, res, outdata,
3321                                         data->callback_data);
3322                 }
3323         }
3324         if ((ret == 0) && (data->callback != NULL)) {
3325                 data->callback(ctdb, destnode, res, outdata,
3326                                         data->callback_data);
3327         }
3328 }
3329
3330
3331 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3332 {
3333         /* set up the callback functions */
3334         state->async.fn = async_callback;
3335         state->async.private_data = data;
3336         
3337         /* one more control to wait for to complete */
3338         data->count++;
3339 }
3340
3341
3342 /* wait for up to the maximum number of seconds allowed
3343    or until all nodes we expect a response from has replied
3344 */
3345 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3346 {
3347         while (data->count > 0) {
3348                 event_loop_once(ctdb->ev);
3349         }
3350         if (data->fail_count != 0) {
3351                 if (!data->dont_log_errors) {
3352                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3353                                  data->fail_count));
3354                 }
3355                 return -1;
3356         }
3357         return 0;
3358 }
3359
3360
3361 /* 
3362    perform a simple control on the listed nodes
3363    The control cannot return data
3364  */
3365 int ctdb_client_async_control(struct ctdb_context *ctdb,
3366                                 enum ctdb_controls opcode,
3367                                 uint32_t *nodes,
3368                                 uint64_t srvid,
3369                                 struct timeval timeout,
3370                                 bool dont_log_errors,
3371                                 TDB_DATA data,
3372                                 client_async_callback client_callback,
3373                                 client_async_callback fail_callback,
3374                                 void *callback_data)
3375 {
3376         struct client_async_data *async_data;
3377         struct ctdb_client_control_state *state;
3378         int j, num_nodes;
3379
3380         async_data = talloc_zero(ctdb, struct client_async_data);
3381         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3382         async_data->dont_log_errors = dont_log_errors;
3383         async_data->callback = client_callback;
3384         async_data->fail_callback = fail_callback;
3385         async_data->callback_data = callback_data;
3386         async_data->opcode        = opcode;
3387
3388         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3389
3390         /* loop over all nodes and send an async control to each of them */
3391         for (j=0; j<num_nodes; j++) {
3392                 uint32_t pnn = nodes[j];
3393
3394                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3395                                           0, data, async_data, &timeout, NULL);
3396                 if (state == NULL) {
3397                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3398                         talloc_free(async_data);
3399                         return -1;
3400                 }
3401                 
3402                 ctdb_client_async_add(async_data, state);
3403         }
3404
3405         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3406                 talloc_free(async_data);
3407                 return -1;
3408         }
3409
3410         talloc_free(async_data);
3411         return 0;
3412 }
3413
3414 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3415                                 struct ctdb_vnn_map *vnn_map,
3416                                 TALLOC_CTX *mem_ctx,
3417                                 bool include_self)
3418 {
3419         int i, j, num_nodes;
3420         uint32_t *nodes;
3421
3422         for (i=num_nodes=0;i<vnn_map->size;i++) {
3423                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3424                         continue;
3425                 }
3426                 num_nodes++;
3427         } 
3428
3429         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3430         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3431
3432         for (i=j=0;i<vnn_map->size;i++) {
3433                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3434                         continue;
3435                 }
3436                 nodes[j++] = vnn_map->map[i];
3437         } 
3438
3439         return nodes;
3440 }
3441
3442 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3443                                 struct ctdb_node_map *node_map,
3444                                 TALLOC_CTX *mem_ctx,
3445                                 bool include_self)
3446 {
3447         int i, j, num_nodes;
3448         uint32_t *nodes;
3449
3450         for (i=num_nodes=0;i<node_map->num;i++) {
3451                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3452                         continue;
3453                 }
3454                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3455                         continue;
3456                 }
3457                 num_nodes++;
3458         } 
3459
3460         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3461         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3462
3463         for (i=j=0;i<node_map->num;i++) {
3464                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3465                         continue;
3466                 }
3467                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3468                         continue;
3469                 }
3470                 nodes[j++] = node_map->nodes[i].pnn;
3471         } 
3472
3473         return nodes;
3474 }
3475
3476 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3477                                 struct ctdb_node_map *node_map,
3478                                 TALLOC_CTX *mem_ctx,
3479                                 uint32_t pnn)
3480 {
3481         int i, j, num_nodes;
3482         uint32_t *nodes;
3483
3484         for (i=num_nodes=0;i<node_map->num;i++) {
3485                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3486                         continue;
3487                 }
3488                 if (node_map->nodes[i].pnn == pnn) {
3489                         continue;
3490                 }
3491                 num_nodes++;
3492         } 
3493
3494         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3495         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3496
3497         for (i=j=0;i<node_map->num;i++) {
3498                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3499                         continue;
3500                 }
3501                 if (node_map->nodes[i].pnn == pnn) {
3502                         continue;
3503                 }
3504                 nodes[j++] = node_map->nodes[i].pnn;
3505         } 
3506
3507         return nodes;
3508 }
3509
3510 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3511                                 struct ctdb_node_map *node_map,
3512                                 TALLOC_CTX *mem_ctx,
3513                                 bool include_self)
3514 {
3515         int i, j, num_nodes;
3516         uint32_t *nodes;
3517
3518         for (i=num_nodes=0;i<node_map->num;i++) {
3519                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3520                         continue;
3521                 }
3522                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3523                         continue;
3524                 }
3525                 num_nodes++;
3526         } 
3527
3528         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3529         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3530
3531         for (i=j=0;i<node_map->num;i++) {
3532                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3533                         continue;
3534                 }
3535                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3536                         continue;
3537                 }
3538                 nodes[j++] = node_map->nodes[i].pnn;
3539         } 
3540
3541         return nodes;
3542 }
3543
3544 /* 
3545   this is used to test if a pnn lock exists and if it exists will return
3546   the number of connections that pnn has reported or -1 if that recovery
3547   daemon is not running.
3548 */
3549 int
3550 ctdb_read_pnn_lock(int fd, int32_t pnn)
3551 {
3552         struct flock lock;
3553         char c;
3554
3555         lock.l_type = F_WRLCK;
3556         lock.l_whence = SEEK_SET;
3557         lock.l_start = pnn;
3558         lock.l_len = 1;
3559         lock.l_pid = 0;
3560
3561         if (fcntl(fd, F_GETLK, &lock) != 0) {
3562                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3563                 return -1;
3564         }
3565
3566         if (lock.l_type == F_UNLCK) {
3567                 return -1;
3568         }
3569
3570         if (pread(fd, &c, 1, pnn) == -1) {
3571                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3572                 return -1;
3573         }
3574
3575         return c;
3576 }
3577
3578 /*
3579   get capabilities of a remote node
3580  */
3581 struct ctdb_client_control_state *
3582 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3583 {
3584         return ctdb_control_send(ctdb, destnode, 0, 
3585                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3586                            mem_ctx, &timeout, NULL);
3587 }
3588
3589 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3590 {
3591         int ret;
3592         int32_t res;
3593         TDB_DATA outdata;
3594
3595         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3596         if ( (ret != 0) || (res != 0) ) {
3597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3598                 return -1;
3599         }
3600
3601         if (capabilities) {
3602                 *capabilities = *((uint32_t *)outdata.dptr);
3603         }
3604
3605         return 0;
3606 }
3607
3608 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3609 {
3610         struct ctdb_client_control_state *state;
3611         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3612         int ret;
3613
3614         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3615         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3616         talloc_free(tmp_ctx);
3617         return ret;
3618 }
3619
3620 /**
3621  * check whether a transaction is active on a given db on a given node
3622  */
3623 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3624                                      uint32_t destnode,
3625                                      uint32_t db_id)
3626 {
3627         int32_t status;
3628         int ret;
3629         TDB_DATA indata;
3630
3631         indata.dptr = (uint8_t *)&db_id;
3632         indata.dsize = sizeof(db_id);
3633
3634         ret = ctdb_control(ctdb, destnode, 0,
3635                            CTDB_CONTROL_TRANS2_ACTIVE,
3636                            0, indata, NULL, NULL, &status,
3637                            NULL, NULL);
3638
3639         if (ret != 0) {
3640                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3641                 return -1;
3642         }
3643
3644         return status;
3645 }
3646
3647
3648 struct ctdb_transaction_handle {
3649         struct ctdb_db_context *ctdb_db;
3650         bool in_replay;
3651         /*
3652          * we store the reads and writes done under a transaction:
3653          * - one list stores both reads and writes (m_all),
3654          * - the other just writes (m_write)
3655          */
3656         struct ctdb_marshall_buffer *m_all;
3657         struct ctdb_marshall_buffer *m_write;
3658 };
3659
3660 /* start a transaction on a database */
3661 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3662 {
3663         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3664         return 0;
3665 }
3666
3667 /* start a transaction on a database */
3668 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3669 {
3670         struct ctdb_record_handle *rh;
3671         TDB_DATA key;
3672         TDB_DATA data;
3673         struct ctdb_ltdb_header header;
3674         TALLOC_CTX *tmp_ctx;
3675         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3676         int ret;
3677         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3678         pid_t pid;
3679         int32_t status;
3680
3681         key.dptr = discard_const(keyname);
3682         key.dsize = strlen(keyname);
3683
3684         if (!ctdb_db->persistent) {
3685                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3686                 return -1;
3687         }
3688
3689 again:
3690         tmp_ctx = talloc_new(h);
3691
3692         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3693         if (rh == NULL) {
3694                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3695                 talloc_free(tmp_ctx);
3696                 return -1;
3697         }
3698
3699         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3700                                               CTDB_CURRENT_NODE,
3701                                               ctdb_db->db_id);
3702         if (status == 1) {
3703                 unsigned long int usec = (1000 + random()) % 100000;
3704                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3705                                     "on db_id[0x%08x]. waiting for %lu "
3706                                     "microseconds\n",
3707                                     ctdb_db->db_id, usec));
3708                 talloc_free(tmp_ctx);
3709                 usleep(usec);
3710                 goto again;
3711         }
3712
3713         /*
3714          * store the pid in the database:
3715          * it is not enough that the node is dmaster...
3716          */
3717         pid = getpid();
3718         data.dptr = (unsigned char *)&pid;
3719         data.dsize = sizeof(pid_t);
3720         rh->header.rsn++;
3721         rh->header.dmaster = ctdb_db->ctdb->pnn;
3722         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3723         if (ret != 0) {
3724                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3725                                   "transaction record\n"));
3726                 talloc_free(tmp_ctx);
3727                 return -1;
3728         }
3729
3730         talloc_free(rh);
3731
3732         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3733         if (ret != 0) {
3734                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3735                 talloc_free(tmp_ctx);
3736                 return -1;
3737         }
3738
3739         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3740         if (ret != 0) {
3741                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3742                                  "lock record inside transaction\n"));
3743                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3744                 talloc_free(tmp_ctx);
3745                 goto again;
3746         }
3747
3748         if (header.dmaster != ctdb_db->ctdb->pnn) {
3749                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3750                                    "transaction lock record\n"));
3751                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3752                 talloc_free(tmp_ctx);
3753                 goto again;
3754         }
3755
3756         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3757                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3758                                     "the transaction lock record\n"));
3759                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3760                 talloc_free(tmp_ctx);
3761                 goto again;
3762         }
3763
3764         talloc_free(tmp_ctx);
3765
3766         return 0;
3767 }
3768
3769
3770 /* start a transaction on a database */
3771 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3772                                                        TALLOC_CTX *mem_ctx)
3773 {
3774         struct ctdb_transaction_handle *h;
3775         int ret;
3776
3777         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3778         if (h == NULL) {
3779                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3780                 return NULL;
3781         }
3782
3783         h->ctdb_db = ctdb_db;
3784
3785         ret = ctdb_transaction_fetch_start(h);
3786         if (ret != 0) {
3787                 talloc_free(h);
3788                 return NULL;
3789         }
3790
3791         talloc_set_destructor(h, ctdb_transaction_destructor);
3792
3793         return h;
3794 }
3795
3796
3797
3798 /*
3799   fetch a record inside a transaction
3800  */
3801 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3802                            TALLOC_CTX *mem_ctx, 
3803                            TDB_DATA key, TDB_DATA *data)
3804 {
3805         struct ctdb_ltdb_header header;
3806         int ret;
3807
3808         ZERO_STRUCT(header);
3809
3810         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3811         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3812                 /* record doesn't exist yet */
3813                 *data = tdb_null;
3814                 ret = 0;
3815         }
3816         
3817         if (ret != 0) {
3818                 return ret;
3819         }
3820
3821         if (!h->in_replay) {
3822                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3823                 if (h->m_all == NULL) {
3824                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3825                         return -1;
3826                 }
3827         }
3828
3829         return 0;
3830 }
3831
3832 /*
3833   stores a record inside a transaction
3834  */
3835 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3836                            TDB_DATA key, TDB_DATA data)
3837 {
3838         TALLOC_CTX *tmp_ctx = talloc_new(h);
3839         struct ctdb_ltdb_header header;
3840         TDB_DATA olddata;
3841         int ret;
3842
3843         ZERO_STRUCT(header);
3844
3845         /* we need the header so we can update the RSN */
3846         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3847         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3848                 /* the record doesn't exist - create one with us as dmaster.
3849                    This is only safe because we are in a transaction and this
3850                    is a persistent database */
3851                 ZERO_STRUCT(header);
3852         } else if (ret != 0) {
3853                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3854                 talloc_free(tmp_ctx);
3855                 return ret;
3856         }
3857
3858         if (data.dsize == olddata.dsize &&
3859             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3860                 /* save writing the same data */
3861                 talloc_free(tmp_ctx);
3862                 return 0;
3863         }
3864
3865         header.dmaster = h->ctdb_db->ctdb->pnn;
3866         header.rsn++;
3867
3868         if (!h->in_replay) {
3869                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3870                 if (h->m_all == NULL) {
3871                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3872                         talloc_free(tmp_ctx);
3873                         return -1;
3874                 }
3875         }               
3876
3877         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3878         if (h->m_write == NULL) {
3879                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3880                 talloc_free(tmp_ctx);
3881                 return -1;
3882         }
3883         
3884         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3885
3886         talloc_free(tmp_ctx);
3887         
3888         return ret;
3889 }
3890
3891 /*
3892   replay a transaction
3893  */
3894 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3895 {
3896         int ret, i;
3897         struct ctdb_rec_data *rec = NULL;
3898
3899         h->in_replay = true;
3900         talloc_free(h->m_write);
3901         h->m_write = NULL;
3902
3903         ret = ctdb_transaction_fetch_start(h);
3904         if (ret != 0) {
3905                 return ret;
3906         }
3907
3908         for (i=0;i<h->m_all->count;i++) {
3909                 TDB_DATA key, data;
3910
3911                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3912                 if (rec == NULL) {
3913                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3914                         goto failed;
3915                 }
3916
3917                 if (rec->reqid == 0) {
3918                         /* its a store */
3919                         if (ctdb_transaction_store(h, key, data) != 0) {
3920                                 goto failed;
3921                         }
3922                 } else {
3923                         TDB_DATA data2;
3924                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3925
3926                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3927                                 talloc_free(tmp_ctx);
3928                                 goto failed;
3929                         }
3930                         if (data2.dsize != data.dsize ||
3931                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3932                                 /* the record has changed on us - we have to give up */
3933                                 talloc_free(tmp_ctx);
3934                                 goto failed;
3935                         }
3936                         talloc_free(tmp_ctx);
3937                 }
3938         }
3939         
3940         return 0;
3941
3942 failed:
3943         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3944         return -1;
3945 }
3946
3947
3948 /*
3949   commit a transaction
3950  */
3951 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3952 {
3953         int ret, retries=0;
3954         int32_t status;
3955         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3956         struct timeval timeout;
3957         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3958
3959         talloc_set_destructor(h, NULL);
3960
3961         /* our commit strategy is quite complex.
3962
3963            - we first try to commit the changes to all other nodes
3964
3965            - if that works, then we commit locally and we are done
3966
3967            - if a commit on another node fails, then we need to cancel
3968              the transaction, then restart the transaction (thus
3969              opening a window of time for a pending recovery to
3970              complete), then replay the transaction, checking all the
3971              reads and writes (checking that reads give the same data,
3972              and writes succeed). Then we retry the transaction to the
3973              other nodes
3974         */
3975
3976 again:
3977         if (h->m_write == NULL) {
3978                 /* no changes were made */
3979                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3980                 talloc_free(h);
3981                 return 0;
3982         }
3983
3984         /* tell ctdbd to commit to the other nodes */
3985         timeout = timeval_current_ofs(1, 0);
3986         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3987                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3988                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3989                            &timeout, NULL);
3990         if (ret != 0 || status != 0) {
3991                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3992                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3993                                      ", retrying after 1 second...\n",
3994                                      (retries==0)?"":"retry "));
3995                 sleep(1);
3996
3997                 if (ret != 0) {
3998                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3999                 } else {
4000                         /* work out what error code we will give if we 
4001                            have to fail the operation */
4002                         switch ((enum ctdb_trans2_commit_error)status) {
4003                         case CTDB_TRANS2_COMMIT_SUCCESS:
4004                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4005                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4006                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4007                                 break;
4008                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4009                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4010                                 break;
4011                         }
4012                 }
4013
4014                 if (++retries == 100) {
4015                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4016                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4017                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4018                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4019                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4020                         talloc_free(h);
4021                         return -1;
4022                 }               
4023
4024                 if (ctdb_replay_transaction(h) != 0) {
4025                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4026                                           "transaction on db 0x%08x, "
4027                                           "failure control =%u\n",
4028                                           h->ctdb_db->db_id,
4029                                           (unsigned)failure_control));
4030                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4031                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4032                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4033                         talloc_free(h);
4034                         return -1;
4035                 }
4036                 goto again;
4037         } else {
4038                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4039         }
4040
4041         /* do the real commit locally */
4042         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4043         if (ret != 0) {
4044                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4045                                   "on db id 0x%08x locally, "
4046                                   "failure_control=%u\n",
4047                                   h->ctdb_db->db_id,
4048                                   (unsigned)failure_control));
4049                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4050                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4051                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4052                 talloc_free(h);
4053                 return ret;
4054         }
4055
4056         /* tell ctdbd that we are finished with our local commit */
4057         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4058                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4059                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4060         talloc_free(h);
4061         return 0;
4062 }
4063
4064 /*
4065   recovery daemon ping to main daemon
4066  */
4067 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4068 {
4069         int ret;
4070         int32_t res;
4071
4072         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4073                            ctdb, NULL, &res, NULL, NULL);
4074         if (ret != 0 || res != 0) {
4075                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4076                 return -1;
4077         }
4078
4079         return 0;
4080 }
4081
4082 /* when forking the main daemon and the child process needs to connect back
4083  * to the daemon as a client process, this function can be used to change
4084  * the ctdb context from daemon into client mode
4085  */
4086 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4087 {
4088         int ret;
4089         va_list ap;
4090
4091         /* Add extra information so we can identify this in the logs */
4092         va_start(ap, fmt);
4093         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4094         va_end(ap);
4095
4096         /* shutdown the transport */
4097         if (ctdb->methods) {
4098                 ctdb->methods->shutdown(ctdb);
4099         }
4100
4101         /* get a new event context */
4102         talloc_free(ctdb->ev);
4103         ctdb->ev = event_context_init(ctdb);
4104         tevent_loop_allow_nesting(ctdb->ev);
4105
4106         close(ctdb->daemon.sd);
4107         ctdb->daemon.sd = -1;
4108
4109         /* the client does not need to be realtime */
4110         if (ctdb->do_setsched) {
4111                 ctdb_restore_scheduler(ctdb);
4112         }
4113
4114         /* initialise ctdb */
4115         ret = ctdb_socket_connect(ctdb);
4116         if (ret != 0) {
4117                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4118                 return -1;
4119         }
4120
4121          return 0;
4122 }
4123
4124 /*
4125   get the status of running the monitor eventscripts: NULL means never run.
4126  */
4127 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4128                 struct timeval timeout, uint32_t destnode, 
4129                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4130                 struct ctdb_scripts_wire **scripts)
4131 {
4132         int ret;
4133         TDB_DATA outdata, indata;
4134         int32_t res;
4135         uint32_t uinttype = type;
4136
4137         indata.dptr = (uint8_t *)&uinttype;
4138         indata.dsize = sizeof(uinttype);
4139
4140         ret = ctdb_control(ctdb, destnode, 0, 
4141                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4142                            mem_ctx, &outdata, &res, &timeout, NULL);
4143         if (ret != 0 || res != 0) {
4144                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4145                 return -1;
4146         }
4147
4148         if (outdata.dsize == 0) {
4149                 *scripts = NULL;
4150         } else {
4151                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4152                 talloc_free(outdata.dptr);
4153         }
4154                     
4155         return 0;
4156 }
4157
4158 /*
4159   tell the main daemon how long it took to lock the reclock file
4160  */
4161 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4162 {
4163         int ret;
4164         int32_t res;
4165         TDB_DATA data;
4166
4167         data.dptr = (uint8_t *)&latency;
4168         data.dsize = sizeof(latency);
4169
4170         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4171                            ctdb, NULL, &res, NULL, NULL);
4172         if (ret != 0 || res != 0) {
4173                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4174                 return -1;
4175         }
4176
4177         return 0;
4178 }
4179
4180 /*
4181   get the name of the reclock file
4182  */
4183 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4184                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4185                          const char **name)
4186 {
4187         int ret;
4188         int32_t res;
4189         TDB_DATA data;
4190
4191         ret = ctdb_control(ctdb, destnode, 0, 
4192                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4193                            mem_ctx, &data, &res, &timeout, NULL);
4194         if (ret != 0 || res != 0) {
4195                 return -1;
4196         }
4197
4198         if (data.dsize == 0) {
4199                 *name = NULL;
4200         } else {
4201                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4202         }
4203         talloc_free(data.dptr);
4204
4205         return 0;
4206 }
4207
4208 /*
4209   set the reclock filename for a node
4210  */
4211 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4212 {
4213         int ret;
4214         TDB_DATA data;
4215         int32_t res;
4216
4217         if (reclock == NULL) {
4218                 data.dsize = 0;
4219                 data.dptr  = NULL;
4220         } else {
4221                 data.dsize = strlen(reclock) + 1;
4222                 data.dptr  = discard_const(reclock);
4223         }
4224
4225         ret = ctdb_control(ctdb, destnode, 0, 
4226                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4227                            NULL, NULL, &res, &timeout, NULL);
4228         if (ret != 0 || res != 0) {
4229                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4230                 return -1;
4231         }
4232
4233         return 0;
4234 }
4235
4236 /*
4237   stop a node
4238  */
4239 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4240 {
4241         int ret;
4242         int32_t res;
4243
4244         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4245                            ctdb, NULL, &res, &timeout, NULL);
4246         if (ret != 0 || res != 0) {
4247                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4248                 return -1;
4249         }
4250
4251         return 0;
4252 }
4253
4254 /*
4255   continue a node
4256  */
4257 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4258 {
4259         int ret;
4260
4261         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4262                            ctdb, NULL, NULL, &timeout, NULL);
4263         if (ret != 0) {
4264                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4265                 return -1;
4266         }
4267
4268         return 0;
4269 }
4270
4271 /*
4272   set the natgw state for a node
4273  */
4274 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4275 {
4276         int ret;
4277         TDB_DATA data;
4278         int32_t res;
4279
4280         data.dsize = sizeof(natgwstate);
4281         data.dptr  = (uint8_t *)&natgwstate;
4282
4283         ret = ctdb_control(ctdb, destnode, 0, 
4284                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4285                            NULL, NULL, &res, &timeout, NULL);
4286         if (ret != 0 || res != 0) {
4287                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4288                 return -1;
4289         }
4290
4291         return 0;
4292 }
4293
4294 /*
4295   set the lmaster role for a node
4296  */
4297 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4298 {
4299         int ret;
4300         TDB_DATA data;
4301         int32_t res;
4302
4303         data.dsize = sizeof(lmasterrole);
4304         data.dptr  = (uint8_t *)&lmasterrole;
4305
4306         ret = ctdb_control(ctdb, destnode, 0, 
4307                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4308                            NULL, NULL, &res, &timeout, NULL);
4309         if (ret != 0 || res != 0) {
4310                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4311                 return -1;
4312         }
4313
4314         return 0;
4315 }
4316
4317 /*
4318   set the recmaster role for a node
4319  */
4320 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4321 {
4322         int ret;
4323         TDB_DATA data;
4324         int32_t res;
4325
4326         data.dsize = sizeof(recmasterrole);
4327         data.dptr  = (uint8_t *)&recmasterrole;
4328
4329         ret = ctdb_control(ctdb, destnode, 0, 
4330                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4331                            NULL, NULL, &res, &timeout, NULL);
4332         if (ret != 0 || res != 0) {
4333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4334                 return -1;
4335         }
4336
4337         return 0;
4338 }
4339
4340 /* enable an eventscript
4341  */
4342 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4343 {
4344         int ret;
4345         TDB_DATA data;
4346         int32_t res;
4347
4348         data.dsize = strlen(script) + 1;
4349         data.dptr  = discard_const(script);
4350
4351         ret = ctdb_control(ctdb, destnode, 0, 
4352                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4353                            NULL, NULL, &res, &timeout, NULL);
4354         if (ret != 0 || res != 0) {
4355                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4356                 return -1;
4357         }
4358
4359         return 0;
4360 }
4361
4362 /* disable an eventscript
4363  */
4364 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4365 {
4366         int ret;
4367         TDB_DATA data;
4368         int32_t res;
4369
4370         data.dsize = strlen(script) + 1;
4371         data.dptr  = discard_const(script);
4372
4373         ret = ctdb_control(ctdb, destnode, 0, 
4374                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4375                            NULL, NULL, &res, &timeout, NULL);
4376         if (ret != 0 || res != 0) {
4377                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4378                 return -1;
4379         }
4380
4381         return 0;
4382 }
4383
4384
4385 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4386 {
4387         int ret;
4388         TDB_DATA data;
4389         int32_t res;
4390
4391         data.dsize = sizeof(*bantime);
4392         data.dptr  = (uint8_t *)bantime;
4393
4394         ret = ctdb_control(ctdb, destnode, 0, 
4395                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4396                            NULL, NULL, &res, &timeout, NULL);
4397         if (ret != 0 || res != 0) {
4398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4399                 return -1;
4400         }
4401
4402         return 0;
4403 }
4404
4405
4406 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4407 {
4408         int ret;
4409         TDB_DATA outdata;
4410         int32_t res;
4411         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4412
4413         ret = ctdb_control(ctdb, destnode, 0, 
4414                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4415                            tmp_ctx, &outdata, &res, &timeout, NULL);
4416         if (ret != 0 || res != 0) {
4417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4418                 talloc_free(tmp_ctx);
4419                 return -1;
4420         }
4421
4422         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4423         talloc_free(tmp_ctx);
4424
4425         return 0;
4426 }
4427
4428
4429 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4430 {
4431         int ret;
4432         int32_t res;
4433         TDB_DATA data;
4434         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4435
4436         data.dptr = (uint8_t*)db_prio;
4437         data.dsize = sizeof(*db_prio);
4438
4439         ret = ctdb_control(ctdb, destnode, 0, 
4440                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4441                            tmp_ctx, NULL, &res, &timeout, NULL);
4442         if (ret != 0 || res != 0) {
4443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4444                 talloc_free(tmp_ctx);
4445                 return -1;
4446         }
4447
4448         talloc_free(tmp_ctx);
4449
4450         return 0;
4451 }
4452
4453 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4454 {
4455         int ret;
4456         int32_t res;
4457         TDB_DATA data;
4458         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4459
4460         data.dptr = (uint8_t*)&db_id;
4461         data.dsize = sizeof(db_id);
4462
4463         ret = ctdb_control(ctdb, destnode, 0, 
4464                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4465                            tmp_ctx, NULL, &res, &timeout, NULL);
4466         if (ret != 0 || res < 0) {
4467                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4468                 talloc_free(tmp_ctx);
4469                 return -1;
4470         }
4471
4472         if (priority) {
4473                 *priority = res;
4474         }
4475
4476         talloc_free(tmp_ctx);
4477
4478         return 0;
4479 }
4480
4481 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4482 {
4483         int ret;
4484         TDB_DATA outdata;
4485         int32_t res;
4486
4487         ret = ctdb_control(ctdb, destnode, 0, 
4488                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4489                            mem_ctx, &outdata, &res, &timeout, NULL);
4490         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4491                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4492                 return -1;
4493         }
4494
4495         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4496         talloc_free(outdata.dptr);
4497                     
4498         return 0;
4499 }
4500
4501 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4502 {
4503         if (h == NULL) {
4504                 return NULL;
4505         }
4506
4507         return &h->header;
4508 }
4509
4510
4511 struct ctdb_client_control_state *
4512 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4513 {
4514         struct ctdb_client_control_state *handle;
4515         struct ctdb_marshall_buffer *m;
4516         struct ctdb_rec_data *rec;
4517         TDB_DATA outdata;
4518
4519         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4520         if (m == NULL) {
4521                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4522                 return NULL;
4523         }
4524
4525         m->db_id = ctdb_db->db_id;
4526
4527         rec = ctdb_marshall_record(m, 0, key, header, data);
4528         if (rec == NULL) {
4529                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4530                 talloc_free(m);
4531                 return NULL;
4532         }
4533         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4534         if (m == NULL) {
4535                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4536                 talloc_free(m);
4537                 return NULL;
4538         }
4539         m->count++;
4540         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4541
4542
4543         outdata.dptr = (uint8_t *)m;
4544         outdata.dsize = talloc_get_size(m);
4545
4546         handle = ctdb_control_send(ctdb, destnode, 0, 
4547                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4548                            mem_ctx, &timeout, NULL);
4549         talloc_free(m);
4550         return handle;
4551 }
4552
4553 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4554 {
4555         int ret;
4556         int32_t res;
4557
4558         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4559         if ( (ret != 0) || (res != 0) ){
4560                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4561                 return -1;
4562         }
4563
4564         return 0;
4565 }
4566
4567 int
4568 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4569 {
4570         struct ctdb_client_control_state *state;
4571
4572         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4573         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4574 }
4575
4576
4577
4578
4579
4580
4581 /*
4582   set a database to be readonly
4583  */
4584 struct ctdb_client_control_state *
4585 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4586 {
4587         TDB_DATA data;
4588
4589         data.dptr = (uint8_t *)&dbid;
4590         data.dsize = sizeof(dbid);
4591
4592         return ctdb_control_send(ctdb, destnode, 0, 
4593                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4594                            ctdb, NULL, NULL);
4595 }
4596
4597 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4598 {
4599         int ret;
4600         int32_t res;
4601
4602         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4603         if (ret != 0 || res != 0) {
4604           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4605                 return -1;
4606         }
4607
4608         return 0;
4609 }
4610
4611 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4612 {
4613         struct ctdb_client_control_state *state;
4614
4615         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4616         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4617 }