a7624b7e3c86d985693960688722c7cf2014842e
[vlendec/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/locale.h"
25
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/time.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 /*
45   allocate a packet for use in client<->daemon communication
46  */
47 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
48                                             TALLOC_CTX *mem_ctx, 
49                                             enum ctdb_operation operation, 
50                                             size_t length, size_t slength,
51                                             const char *type)
52 {
53         int size;
54         struct ctdb_req_header *hdr;
55
56         length = MAX(length, slength);
57         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
58
59         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
60         if (hdr == NULL) {
61                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
62                          operation, (unsigned)length));
63                 return NULL;
64         }
65         talloc_set_name_const(hdr, type);
66         hdr->length       = length;
67         hdr->operation    = operation;
68         hdr->ctdb_magic   = CTDB_MAGIC;
69         hdr->ctdb_version = CTDB_PROTOCOL;
70         hdr->srcnode      = ctdb->pnn;
71         if (ctdb->vnn_map) {
72                 hdr->generation = ctdb->vnn_map->generation;
73         }
74
75         return hdr;
76 }
77
78 /*
79   local version of ctdb_call
80 */
81 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
82                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
83                     TDB_DATA *data, bool updatetdb)
84 {
85         struct ctdb_call_info *c;
86         struct ctdb_registered_call *fn;
87         struct ctdb_context *ctdb = ctdb_db->ctdb;
88         
89         c = talloc_zero(mem_ctx, struct ctdb_call_info);
90         CTDB_NO_MEMORY(ctdb, c);
91
92         c->key = call->key;
93         c->call_data = &call->call_data;
94         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
95         c->record_data.dsize = data->dsize;
96         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
97         c->header = header;
98
99         for (fn=ctdb_db->calls;fn;fn=fn->next) {
100                 if (fn->id == call->call_id) break;
101         }
102         if (fn == NULL) {
103                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (fn->fn(c) != 0) {
109                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
110                 talloc_free(c);
111                 return -1;
112         }
113
114         /* we need to force the record to be written out if this was a remote access */
115         if (c->new_data == NULL) {
116                 c->new_data = &c->record_data;
117         }
118
119         if (c->new_data && updatetdb) {
120                 /* XXX check that we always have the lock here? */
121                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
122                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
123                         talloc_free(c);
124                         return -1;
125                 }
126         }
127
128         if (c->reply_data) {
129                 call->reply_data = *c->reply_data;
130
131                 talloc_steal(call, call->reply_data.dptr);
132                 talloc_set_name_const(call->reply_data.dptr, __location__);
133         } else {
134                 call->reply_data.dptr = NULL;
135                 call->reply_data.dsize = 0;
136         }
137         call->status = c->status;
138
139         talloc_free(c);
140
141         return 0;
142 }
143
144
145 /*
146   queue a packet for sending from client to daemon
147 */
148 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
149 {
150         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
151 }
152
153
154 /*
155   called when a CTDB_REPLY_CALL packet comes in in the client
156
157   This packet comes in response to a CTDB_REQ_CALL request packet. It
158   contains any reply data from the call
159 */
160 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
161 {
162         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
163         struct ctdb_client_call_state *state;
164
165         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
166         if (state == NULL) {
167                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
168                 return;
169         }
170
171         if (hdr->reqid != state->reqid) {
172                 /* we found a record  but it was the wrong one */
173                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
174                 return;
175         }
176
177         state->call->reply_data.dptr = c->data;
178         state->call->reply_data.dsize = c->datalen;
179         state->call->status = c->status;
180
181         talloc_steal(state, c);
182
183         state->state = CTDB_CALL_DONE;
184
185         if (state->async.fn) {
186                 state->async.fn(state);
187         }
188 }
189
190 void ctdb_request_message(struct ctdb_context *ctdb,
191                           struct ctdb_req_header *hdr)
192 {
193         struct ctdb_req_message_old *c = (struct ctdb_req_message_old *)hdr;
194         TDB_DATA data;
195
196         data.dsize = c->datalen;
197         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
198         if (data.dptr == NULL) {
199                 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
200                 return;
201         }
202
203         srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
204 }
205
206 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
207
208 /*
209   this is called in the client, when data comes in from the daemon
210  */
211 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
212 {
213         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
214         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
215         TALLOC_CTX *tmp_ctx;
216
217         /* place the packet as a child of a tmp_ctx. We then use
218            talloc_free() below to free it. If any of the calls want
219            to keep it, then they will steal it somewhere else, and the
220            talloc_free() will be a no-op */
221         tmp_ctx = talloc_new(ctdb);
222         talloc_steal(tmp_ctx, hdr);
223
224         if (cnt == 0) {
225                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
226                 exit(1);
227         }
228
229         if (cnt < sizeof(*hdr)) {
230                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
231                 goto done;
232         }
233         if (cnt != hdr->length) {
234                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
235                                (unsigned)hdr->length, (unsigned)cnt);
236                 goto done;
237         }
238
239         if (hdr->ctdb_magic != CTDB_MAGIC) {
240                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
241                 goto done;
242         }
243
244         if (hdr->ctdb_version != CTDB_PROTOCOL) {
245                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
246                 goto done;
247         }
248
249         switch (hdr->operation) {
250         case CTDB_REPLY_CALL:
251                 ctdb_client_reply_call(ctdb, hdr);
252                 break;
253
254         case CTDB_REQ_MESSAGE:
255                 ctdb_request_message(ctdb, hdr);
256                 break;
257
258         case CTDB_REPLY_CONTROL:
259                 ctdb_client_reply_control(ctdb, hdr);
260                 break;
261
262         default:
263                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
264         }
265
266 done:
267         talloc_free(tmp_ctx);
268 }
269
270 /*
271   connect to a unix domain socket
272 */
273 int ctdb_socket_connect(struct ctdb_context *ctdb)
274 {
275         struct sockaddr_un addr;
276         int ret;
277
278         memset(&addr, 0, sizeof(addr));
279         addr.sun_family = AF_UNIX;
280         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
281
282         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
283         if (ctdb->daemon.sd == -1) {
284                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
285                 return -1;
286         }
287
288         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
289                 DEBUG(DEBUG_ERR,
290                       (__location__
291                        "Failed to connect client socket to daemon (%s)\n",
292                        strerror(errno)));
293                 close(ctdb->daemon.sd);
294                 ctdb->daemon.sd = -1;
295                 return -1;
296         }
297
298         ret = set_blocking(ctdb->daemon.sd, false);
299         if (ret != 0) {
300                 DEBUG(DEBUG_ERR,
301                       (__location__
302                        " failed to set socket non-blocking (%s)\n",
303                        strerror(errno)));
304                 close(ctdb->daemon.sd);
305                 ctdb->daemon.sd = -1;
306                 return -1;
307         }
308
309         set_close_on_exec(ctdb->daemon.sd);
310
311         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
312                                               CTDB_DS_ALIGNMENT, 
313                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
314         return 0;
315 }
316
317
318 struct ctdb_record_handle {
319         struct ctdb_db_context *ctdb_db;
320         TDB_DATA key;
321         TDB_DATA *data;
322         struct ctdb_ltdb_header header;
323 };
324
325
326 /*
327   make a recv call to the local ctdb daemon - called from client context
328
329   This is called when the program wants to wait for a ctdb_call to complete and get the 
330   results. This call will block unless the call has already completed.
331 */
332 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
333 {
334         if (state == NULL) {
335                 return -1;
336         }
337
338         while (state->state < CTDB_CALL_DONE) {
339                 tevent_loop_once(state->ctdb_db->ctdb->ev);
340         }
341         if (state->state != CTDB_CALL_DONE) {
342                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
343                 talloc_free(state);
344                 return -1;
345         }
346
347         if (state->call->reply_data.dsize) {
348                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
349                                                       state->call->reply_data.dptr,
350                                                       state->call->reply_data.dsize);
351                 call->reply_data.dsize = state->call->reply_data.dsize;
352         } else {
353                 call->reply_data.dptr = NULL;
354                 call->reply_data.dsize = 0;
355         }
356         call->status = state->call->status;
357         talloc_free(state);
358
359         return call->status;
360 }
361
362
363
364
365 /*
366   destroy a ctdb_call in client
367 */
368 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
369 {
370         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
371         return 0;
372 }
373
374 /*
375   construct an event driven local ctdb_call
376
377   this is used so that locally processed ctdb_call requests are processed
378   in an event driven manner
379 */
380 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
381                                                                   struct ctdb_call *call,
382                                                                   struct ctdb_ltdb_header *header,
383                                                                   TDB_DATA *data)
384 {
385         struct ctdb_client_call_state *state;
386         struct ctdb_context *ctdb = ctdb_db->ctdb;
387         int ret;
388
389         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
390         CTDB_NO_MEMORY_NULL(ctdb, state);
391         state->call = talloc_zero(state, struct ctdb_call);
392         CTDB_NO_MEMORY_NULL(ctdb, state->call);
393
394         talloc_steal(state, data->dptr);
395
396         state->state   = CTDB_CALL_DONE;
397         *(state->call) = *call;
398         state->ctdb_db = ctdb_db;
399
400         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
401         if (ret != 0) {
402                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
403         }
404
405         return state;
406 }
407
408 /*
409   make a ctdb call to the local daemon - async send. Called from client context.
410
411   This constructs a ctdb_call request and queues it for processing. 
412   This call never blocks.
413 */
414 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
415                                               struct ctdb_call *call)
416 {
417         struct ctdb_client_call_state *state;
418         struct ctdb_context *ctdb = ctdb_db->ctdb;
419         struct ctdb_ltdb_header header;
420         TDB_DATA data;
421         int ret;
422         size_t len;
423         struct ctdb_req_call_old *c;
424
425         /* if the domain socket is not yet open, open it */
426         if (ctdb->daemon.sd==-1) {
427                 ctdb_socket_connect(ctdb);
428         }
429
430         ret = ctdb_ltdb_lock(ctdb_db, call->key);
431         if (ret != 0) {
432                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
433                 return NULL;
434         }
435
436         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
437
438         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
439                 ret = -1;
440         }
441
442         if (ret == 0 && header.dmaster == ctdb->pnn) {
443                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
444                 talloc_free(data.dptr);
445                 ctdb_ltdb_unlock(ctdb_db, call->key);
446                 return state;
447         }
448
449         ctdb_ltdb_unlock(ctdb_db, call->key);
450         talloc_free(data.dptr);
451
452         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
453         if (state == NULL) {
454                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
455                 return NULL;
456         }
457         state->call = talloc_zero(state, struct ctdb_call);
458         if (state->call == NULL) {
459                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
460                 return NULL;
461         }
462
463         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
464         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call_old);
465         if (c == NULL) {
466                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
467                 return NULL;
468         }
469
470         state->reqid     = reqid_new(ctdb->idr, state);
471         state->ctdb_db = ctdb_db;
472         talloc_set_destructor(state, ctdb_client_call_destructor);
473
474         c->hdr.reqid     = state->reqid;
475         c->flags         = call->flags;
476         c->db_id         = ctdb_db->db_id;
477         c->callid        = call->call_id;
478         c->hopcount      = 0;
479         c->keylen        = call->key.dsize;
480         c->calldatalen   = call->call_data.dsize;
481         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
482         memcpy(&c->data[call->key.dsize], 
483                call->call_data.dptr, call->call_data.dsize);
484         *(state->call)              = *call;
485         state->call->call_data.dptr = &c->data[call->key.dsize];
486         state->call->key.dptr       = &c->data[0];
487
488         state->state  = CTDB_CALL_WAIT;
489
490
491         ctdb_client_queue_pkt(ctdb, &c->hdr);
492
493         return state;
494 }
495
496
497 /*
498   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
499 */
500 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
501 {
502         struct ctdb_client_call_state *state;
503
504         state = ctdb_call_send(ctdb_db, call);
505         return ctdb_call_recv(state, call);
506 }
507
508
509 /*
510   tell the daemon what messaging srvid we will use, and register the message
511   handler function in the client
512 */
513 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
514                                     srvid_handler_fn handler,
515                                     void *private_data)
516 {
517         int res;
518         int32_t status;
519
520         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
521                            CTDB_CONTROL_REGISTER_SRVID, 0,
522                            tdb_null, NULL, NULL, &status, NULL, NULL);
523         if (res != 0 || status != 0) {
524                 DEBUG(DEBUG_ERR,
525                       ("Failed to register srvid %llu\n",
526                        (unsigned long long)srvid));
527                 return -1;
528         }
529
530         /* also need to register the handler with our own ctdb structure */
531         return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
532 }
533
534 /*
535   tell the daemon we no longer want a srvid
536 */
537 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
538                                        uint64_t srvid, void *private_data)
539 {
540         int res;
541         int32_t status;
542
543         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
544                            CTDB_CONTROL_DEREGISTER_SRVID, 0,
545                            tdb_null, NULL, NULL, &status, NULL, NULL);
546         if (res != 0 || status != 0) {
547                 DEBUG(DEBUG_ERR,
548                       ("Failed to deregister srvid %llu\n",
549                        (unsigned long long)srvid));
550                 return -1;
551         }
552
553         /* also need to register the handler with our own ctdb structure */
554         srvid_deregister(ctdb->srv, srvid, private_data);
555         return 0;
556 }
557
558 /*
559   send a message - from client context
560  */
561 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
562                       uint64_t srvid, TDB_DATA data)
563 {
564         struct ctdb_req_message_old *r;
565         int len, res;
566
567         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
568         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
569                                len, struct ctdb_req_message_old);
570         CTDB_NO_MEMORY(ctdb, r);
571
572         r->hdr.destnode  = pnn;
573         r->srvid         = srvid;
574         r->datalen       = data.dsize;
575         memcpy(&r->data[0], data.dptr, data.dsize);
576         
577         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
578         talloc_free(r);
579         return res;
580 }
581
582
583 /*
584   cancel a ctdb_fetch_lock operation, releasing the lock
585  */
586 static int fetch_lock_destructor(struct ctdb_record_handle *h)
587 {
588         ctdb_ltdb_unlock(h->ctdb_db, h->key);
589         return 0;
590 }
591
592 /*
593   force the migration of a record to this node
594  */
595 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
596 {
597         struct ctdb_call call;
598         ZERO_STRUCT(call);
599         call.call_id = CTDB_NULL_FUNC;
600         call.key = key;
601         call.flags = CTDB_IMMEDIATE_MIGRATION;
602         return ctdb_call(ctdb_db, &call);
603 }
604
605 /*
606   try to fetch a readonly copy of a record
607  */
608 static int
609 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
610 {
611         int ret;
612
613         struct ctdb_call call;
614         ZERO_STRUCT(call);
615
616         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
617         call.call_data.dptr = NULL;
618         call.call_data.dsize = 0;
619         call.key = key;
620         call.flags = CTDB_WANT_READONLY;
621         ret = ctdb_call(ctdb_db, &call);
622
623         if (ret != 0) {
624                 return -1;
625         }
626         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
627                 return -1;
628         }
629
630         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
631         if (*hdr == NULL) {
632                 talloc_free(call.reply_data.dptr);
633                 return -1;
634         }
635
636         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
637         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
638         if (data->dptr == NULL) {
639                 talloc_free(call.reply_data.dptr);
640                 talloc_free(hdr);
641                 return -1;
642         }
643
644         return 0;
645 }
646
647 /*
648   get a lock on a record, and return the records data. Blocks until it gets the lock
649  */
650 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
651                                            TDB_DATA key, TDB_DATA *data)
652 {
653         int ret;
654         struct ctdb_record_handle *h;
655
656         /*
657           procedure is as follows:
658
659           1) get the chain lock. 
660           2) check if we are dmaster
661           3) if we are the dmaster then return handle 
662           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
663              reply from ctdbd
664           5) when we get the reply, goto (1)
665          */
666
667         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
668         if (h == NULL) {
669                 return NULL;
670         }
671
672         h->ctdb_db = ctdb_db;
673         h->key     = key;
674         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
675         if (h->key.dptr == NULL) {
676                 talloc_free(h);
677                 return NULL;
678         }
679         h->data    = data;
680
681         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
682                  (const char *)key.dptr));
683
684 again:
685         /* step 1 - get the chain lock */
686         ret = ctdb_ltdb_lock(ctdb_db, key);
687         if (ret != 0) {
688                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
689                 talloc_free(h);
690                 return NULL;
691         }
692
693         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
694
695         talloc_set_destructor(h, fetch_lock_destructor);
696
697         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
698
699         /* when torturing, ensure we test the remote path */
700         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
701             random() % 5 == 0) {
702                 h->header.dmaster = (uint32_t)-1;
703         }
704
705
706         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
707
708         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
709                 ctdb_ltdb_unlock(ctdb_db, key);
710                 ret = ctdb_client_force_migration(ctdb_db, key);
711                 if (ret != 0) {
712                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
713                         talloc_free(h);
714                         return NULL;
715                 }
716                 goto again;
717         }
718
719         /* if this is a request for read/write and we have delegations
720            we have to revoke all delegations first
721         */
722         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
723             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
724                 ctdb_ltdb_unlock(ctdb_db, key);
725                 ret = ctdb_client_force_migration(ctdb_db, key);
726                 if (ret != 0) {
727                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
728                         talloc_free(h);
729                         return NULL;
730                 }
731                 goto again;
732         }
733
734         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
735         return h;
736 }
737
738 /*
739   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
740  */
741 struct ctdb_record_handle *
742 ctdb_fetch_readonly_lock(
743         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
744         TDB_DATA key, TDB_DATA *data,
745         int read_only)
746 {
747         int ret;
748         struct ctdb_record_handle *h;
749         struct ctdb_ltdb_header *roheader = NULL;
750
751         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
752         if (h == NULL) {
753                 return NULL;
754         }
755
756         h->ctdb_db = ctdb_db;
757         h->key     = key;
758         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
759         if (h->key.dptr == NULL) {
760                 talloc_free(h);
761                 return NULL;
762         }
763         h->data    = data;
764
765         data->dptr = NULL;
766         data->dsize = 0;
767
768
769 again:
770         talloc_free(roheader);
771         roheader = NULL;
772
773         talloc_free(data->dptr);
774         data->dptr = NULL;
775         data->dsize = 0;
776
777         /* Lock the record/chain */
778         ret = ctdb_ltdb_lock(ctdb_db, key);
779         if (ret != 0) {
780                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
781                 talloc_free(h);
782                 return NULL;
783         }
784
785         talloc_set_destructor(h, fetch_lock_destructor);
786
787         /* Check if record exists yet in the TDB */
788         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
789         if (ret != 0) {
790                 ctdb_ltdb_unlock(ctdb_db, key);
791                 ret = ctdb_client_force_migration(ctdb_db, key);
792                 if (ret != 0) {
793                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
794                         talloc_free(h);
795                         return NULL;
796                 }
797                 goto again;
798         }
799
800         /* if this is a request for read/write and we have delegations
801            we have to revoke all delegations first
802         */
803         if ((read_only == 0) 
804         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
805         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
806                 ctdb_ltdb_unlock(ctdb_db, key);
807                 ret = ctdb_client_force_migration(ctdb_db, key);
808                 if (ret != 0) {
809                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
810                         talloc_free(h);
811                         return NULL;
812                 }
813                 goto again;
814         }
815
816         /* if we are dmaster, just return the handle */
817         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
818                 return h;
819         }
820
821         if (read_only != 0) {
822                 TDB_DATA rodata = {NULL, 0};
823
824                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
825                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
826                         return h;
827                 }
828
829                 ctdb_ltdb_unlock(ctdb_db, key);
830                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
831                 if (ret != 0) {
832                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
833                         ret = ctdb_client_force_migration(ctdb_db, key);
834                         if (ret != 0) {
835                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
836                                 talloc_free(h);
837                                 return NULL;
838                         }
839
840                         goto again;
841                 }
842
843                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
844                         ret = ctdb_client_force_migration(ctdb_db, key);
845                         if (ret != 0) {
846                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
847                                 talloc_free(h);
848                                 return NULL;
849                         }
850
851                         goto again;
852                 }
853
854                 ret = ctdb_ltdb_lock(ctdb_db, key);
855                 if (ret != 0) {
856                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
857                         talloc_free(h);
858                         return NULL;
859                 }
860
861                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
862                 if (ret != 0) {
863                         ctdb_ltdb_unlock(ctdb_db, key);
864
865                         ret = ctdb_client_force_migration(ctdb_db, key);
866                         if (ret != 0) {
867                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
868                                 talloc_free(h);
869                                 return NULL;
870                         }
871
872                         goto again;
873                 }
874
875                 return h;
876         }
877
878         /* we are not dmaster and this was not a request for a readonly lock
879          * so unlock the record, migrate it and try again
880          */
881         ctdb_ltdb_unlock(ctdb_db, key);
882         ret = ctdb_client_force_migration(ctdb_db, key);
883         if (ret != 0) {
884                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
885                 talloc_free(h);
886                 return NULL;
887         }
888         goto again;
889 }
890
891 /*
892   store some data to the record that was locked with ctdb_fetch_lock()
893 */
894 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
895 {
896         if (! ctdb_db_volatile(h->ctdb_db)) {
897                 DEBUG(DEBUG_ERR,
898                       ("ctdb_record_store prohibited for non-volatile dbs\n"));
899                 return -1;
900         }
901
902         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
903 }
904
905 /*
906   non-locking fetch of a record
907  */
908 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
909                TDB_DATA key, TDB_DATA *data)
910 {
911         struct ctdb_call call;
912         int ret;
913
914         call.call_id = CTDB_FETCH_FUNC;
915         call.call_data.dptr = NULL;
916         call.call_data.dsize = 0;
917         call.key = key;
918
919         ret = ctdb_call(ctdb_db, &call);
920
921         if (ret == 0) {
922                 *data = call.reply_data;
923                 talloc_steal(mem_ctx, data->dptr);
924         }
925
926         return ret;
927 }
928
929
930
931 /*
932    called when a control completes or timesout to invoke the callback
933    function the user provided
934 */
935 static void invoke_control_callback(struct tevent_context *ev,
936                                     struct tevent_timer *te,
937                                     struct timeval t, void *private_data)
938 {
939         struct ctdb_client_control_state *state;
940         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
941         int ret;
942
943         state = talloc_get_type(private_data, struct ctdb_client_control_state);
944         talloc_steal(tmp_ctx, state);
945
946         ret = ctdb_control_recv(state->ctdb, state, state,
947                         NULL, 
948                         NULL, 
949                         NULL);
950         if (ret != 0) {
951                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
952         }
953
954         talloc_free(tmp_ctx);
955 }
956
957 /*
958   called when a CTDB_REPLY_CONTROL packet comes in in the client
959
960   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
961   contains any reply data from the control
962 */
963 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
964                                       struct ctdb_req_header *hdr)
965 {
966         struct ctdb_reply_control_old *c = (struct ctdb_reply_control_old *)hdr;
967         struct ctdb_client_control_state *state;
968
969         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
970         if (state == NULL) {
971                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
972                 return;
973         }
974
975         if (hdr->reqid != state->reqid) {
976                 /* we found a record  but it was the wrong one */
977                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
978                 return;
979         }
980
981         state->outdata.dptr = c->data;
982         state->outdata.dsize = c->datalen;
983         state->status = c->status;
984         if (c->errorlen) {
985                 state->errormsg = talloc_strndup(state, 
986                                                  (char *)&c->data[c->datalen], 
987                                                  c->errorlen);
988         }
989
990         /* state->outdata now uses resources from c so we don't want c
991            to just dissappear from under us while state is still alive
992         */
993         talloc_steal(state, c);
994
995         state->state = CTDB_CONTROL_DONE;
996
997         /* if we had a callback registered for this control, pull the response
998            and call the callback.
999         */
1000         if (state->async.fn) {
1001                 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1002                                  invoke_control_callback, state);
1003         }
1004 }
1005
1006
1007 /*
1008   destroy a ctdb_control in client
1009 */
1010 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
1011 {
1012         reqid_remove(state->ctdb->idr, state->reqid);
1013         return 0;
1014 }
1015
1016
1017 /* time out handler for ctdb_control */
1018 static void control_timeout_func(struct tevent_context *ev,
1019                                  struct tevent_timer *te,
1020                                  struct timeval t, void *private_data)
1021 {
1022         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1023
1024         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1025                          "dstnode:%u\n", state->reqid, state->c->opcode,
1026                          state->c->hdr.destnode));
1027
1028         state->state = CTDB_CONTROL_TIMEOUT;
1029
1030         /* if we had a callback registered for this control, pull the response
1031            and call the callback.
1032         */
1033         if (state->async.fn) {
1034                 tevent_add_timer(state->ctdb->ev, state, timeval_zero(),
1035                                  invoke_control_callback, state);
1036         }
1037 }
1038
1039 /* async version of send control request */
1040 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1041                 uint32_t destnode, uint64_t srvid, 
1042                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1043                 TALLOC_CTX *mem_ctx,
1044                 struct timeval *timeout,
1045                 char **errormsg)
1046 {
1047         struct ctdb_client_control_state *state;
1048         size_t len;
1049         struct ctdb_req_control_old *c;
1050         int ret;
1051
1052         if (errormsg) {
1053                 *errormsg = NULL;
1054         }
1055
1056         /* if the domain socket is not yet open, open it */
1057         if (ctdb->daemon.sd==-1) {
1058                 ctdb_socket_connect(ctdb);
1059         }
1060
1061         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1062         CTDB_NO_MEMORY_NULL(ctdb, state);
1063
1064         state->ctdb       = ctdb;
1065         state->reqid      = reqid_new(ctdb->idr, state);
1066         state->state      = CTDB_CONTROL_WAIT;
1067         state->errormsg   = NULL;
1068
1069         talloc_set_destructor(state, ctdb_client_control_destructor);
1070
1071         len = offsetof(struct ctdb_req_control_old, data) + data.dsize;
1072         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1073                                len, struct ctdb_req_control_old);
1074         state->c            = c;        
1075         CTDB_NO_MEMORY_NULL(ctdb, c);
1076         c->hdr.reqid        = state->reqid;
1077         c->hdr.destnode     = destnode;
1078         c->opcode           = opcode;
1079         c->client_id        = 0;
1080         c->flags            = flags;
1081         c->srvid            = srvid;
1082         c->datalen          = data.dsize;
1083         if (data.dsize) {
1084                 memcpy(&c->data[0], data.dptr, data.dsize);
1085         }
1086
1087         /* timeout */
1088         if (timeout && !timeval_is_zero(timeout)) {
1089                 tevent_add_timer(ctdb->ev, state, *timeout,
1090                                  control_timeout_func, state);
1091         }
1092
1093         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1094         if (ret != 0) {
1095                 talloc_free(state);
1096                 return NULL;
1097         }
1098
1099         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1100                 talloc_free(state);
1101                 return NULL;
1102         }
1103
1104         return state;
1105 }
1106
1107
1108 /* async version of receive control reply */
1109 int ctdb_control_recv(struct ctdb_context *ctdb, 
1110                 struct ctdb_client_control_state *state, 
1111                 TALLOC_CTX *mem_ctx,
1112                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1113 {
1114         TALLOC_CTX *tmp_ctx;
1115
1116         if (status != NULL) {
1117                 *status = -1;
1118         }
1119         if (errormsg != NULL) {
1120                 *errormsg = NULL;
1121         }
1122
1123         if (state == NULL) {
1124                 return -1;
1125         }
1126
1127         /* prevent double free of state */
1128         tmp_ctx = talloc_new(ctdb);
1129         talloc_steal(tmp_ctx, state);
1130
1131         /* loop one event at a time until we either timeout or the control
1132            completes.
1133         */
1134         while (state->state == CTDB_CONTROL_WAIT) {
1135                 tevent_loop_once(ctdb->ev);
1136         }
1137
1138         if (state->state != CTDB_CONTROL_DONE) {
1139                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1140                 if (state->async.fn) {
1141                         state->async.fn(state);
1142                 }
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         if (state->errormsg) {
1148                 int s = (state->status == 0 ? -1 : state->status);
1149                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1150                 if (errormsg) {
1151                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1152                 }
1153                 if (state->async.fn) {
1154                         state->async.fn(state);
1155                 }
1156                 talloc_free(tmp_ctx);
1157                 return s;
1158         }
1159
1160         if (outdata) {
1161                 *outdata = state->outdata;
1162                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1163         }
1164
1165         if (status) {
1166                 *status = state->status;
1167         }
1168
1169         if (state->async.fn) {
1170                 state->async.fn(state);
1171         }
1172
1173         talloc_free(tmp_ctx);
1174         return 0;
1175 }
1176
1177
1178
1179 /*
1180   send a ctdb control message
1181   timeout specifies how long we should wait for a reply.
1182   if timeout is NULL we wait indefinitely
1183  */
1184 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1185                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1186                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1187                  struct timeval *timeout,
1188                  char **errormsg)
1189 {
1190         struct ctdb_client_control_state *state;
1191
1192         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1193                         flags, data, mem_ctx,
1194                         timeout, errormsg);
1195
1196         /* FIXME: Error conditions in ctdb_control_send return NULL without
1197          * setting errormsg.  So, there is no way to distinguish between sucess
1198          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1199         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1200                 if (status != NULL) {
1201                         *status = 0;
1202                 }
1203                 return 0;
1204         }
1205
1206         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1207                         errormsg);
1208 }
1209
1210
1211
1212
1213 /*
1214   a process exists call. Returns 0 if process exists, -1 otherwise
1215  */
1216 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1217 {
1218         int ret;
1219         TDB_DATA data;
1220         int32_t status;
1221
1222         data.dptr = (uint8_t*)&pid;
1223         data.dsize = sizeof(pid);
1224
1225         ret = ctdb_control(ctdb, destnode, 0, 
1226                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1227                            NULL, NULL, &status, NULL, NULL);
1228         if (ret != 0) {
1229                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1230                 return -1;
1231         }
1232
1233         return status;
1234 }
1235
1236 /*
1237   get remote statistics
1238  */
1239 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1240 {
1241         int ret;
1242         TDB_DATA data;
1243         int32_t res;
1244
1245         ret = ctdb_control(ctdb, destnode, 0, 
1246                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1247                            ctdb, &data, &res, NULL, NULL);
1248         if (ret != 0 || res != 0) {
1249                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1250                 return -1;
1251         }
1252
1253         if (data.dsize != sizeof(struct ctdb_statistics)) {
1254                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1255                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1256                       return -1;
1257         }
1258
1259         *status = *(struct ctdb_statistics *)data.dptr;
1260         talloc_free(data.dptr);
1261                         
1262         return 0;
1263 }
1264
1265 /*
1266  * get db statistics
1267  */
1268 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1269                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics_old **dbstat)
1270 {
1271         int ret;
1272         TDB_DATA indata, outdata;
1273         int32_t res;
1274         struct ctdb_db_statistics_old *wire, *s;
1275         char *ptr;
1276         int i;
1277
1278         indata.dptr = (uint8_t *)&dbid;
1279         indata.dsize = sizeof(dbid);
1280
1281         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1282                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1283         if (ret != 0 || res != 0) {
1284                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1285                 return -1;
1286         }
1287
1288         if (outdata.dsize < offsetof(struct ctdb_db_statistics_old, hot_keys_wire)) {
1289                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1290                                  outdata.dsize,
1291                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1292                 return -1;
1293         }
1294
1295         s = talloc_zero(mem_ctx, struct ctdb_db_statistics_old);
1296         if (s == NULL) {
1297                 talloc_free(outdata.dptr);
1298                 CTDB_NO_MEMORY(ctdb, s);
1299         }
1300
1301         wire = (struct ctdb_db_statistics_old *)outdata.dptr;
1302         memcpy(s, wire, offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1303         ptr = &wire->hot_keys_wire[0];
1304         for (i=0; i<wire->num_hot_keys; i++) {
1305                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1306                 if (s->hot_keys[i].key.dptr == NULL) {
1307                         talloc_free(outdata.dptr);
1308                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1309                 }
1310
1311                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1312                 ptr += wire->hot_keys[i].key.dsize;
1313         }
1314
1315         talloc_free(outdata.dptr);
1316         *dbstat = s;
1317         return 0;
1318 }
1319
1320 /*
1321   shutdown a remote ctdb node
1322  */
1323 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1324 {
1325         struct ctdb_client_control_state *state;
1326
1327         state = ctdb_control_send(ctdb, destnode, 0, 
1328                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1329                            NULL, &timeout, NULL);
1330         if (state == NULL) {
1331                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1332                 return -1;
1333         }
1334
1335         return 0;
1336 }
1337
1338 /*
1339   get vnn map from a remote node
1340  */
1341 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1342 {
1343         int ret;
1344         TDB_DATA outdata;
1345         int32_t res;
1346         struct ctdb_vnn_map_wire *map;
1347
1348         ret = ctdb_control(ctdb, destnode, 0, 
1349                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1350                            mem_ctx, &outdata, &res, &timeout, NULL);
1351         if (ret != 0 || res != 0) {
1352                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1353                 return -1;
1354         }
1355         
1356         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1357         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1358             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1359                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1360                 return -1;
1361         }
1362
1363         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1364         CTDB_NO_MEMORY(ctdb, *vnnmap);
1365         (*vnnmap)->generation = map->generation;
1366         (*vnnmap)->size       = map->size;
1367         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1368
1369         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1370         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1371         talloc_free(outdata.dptr);
1372                     
1373         return 0;
1374 }
1375
1376
1377 /*
1378   get the recovery mode of a remote node
1379  */
1380 struct ctdb_client_control_state *
1381 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1382 {
1383         return ctdb_control_send(ctdb, destnode, 0, 
1384                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1385                            mem_ctx, &timeout, NULL);
1386 }
1387
1388 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1389 {
1390         int ret;
1391         int32_t res;
1392
1393         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1394         if (ret != 0) {
1395                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1396                 return -1;
1397         }
1398
1399         if (recmode) {
1400                 *recmode = (uint32_t)res;
1401         }
1402
1403         return 0;
1404 }
1405
1406 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1407 {
1408         struct ctdb_client_control_state *state;
1409
1410         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1411         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1412 }
1413
1414
1415
1416
1417 /*
1418   set the recovery mode of a remote node
1419  */
1420 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1421 {
1422         int ret;
1423         TDB_DATA data;
1424         int32_t res;
1425
1426         data.dsize = sizeof(uint32_t);
1427         data.dptr = (unsigned char *)&recmode;
1428
1429         ret = ctdb_control(ctdb, destnode, 0, 
1430                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1431                            NULL, NULL, &res, &timeout, NULL);
1432         if (ret != 0 || res != 0) {
1433                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1434                 return -1;
1435         }
1436
1437         return 0;
1438 }
1439
1440
1441
1442 /*
1443   get the recovery master of a remote node
1444  */
1445 struct ctdb_client_control_state *
1446 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1447                         struct timeval timeout, uint32_t destnode)
1448 {
1449         return ctdb_control_send(ctdb, destnode, 0, 
1450                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1451                            mem_ctx, &timeout, NULL);
1452 }
1453
1454 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1455 {
1456         int ret;
1457         int32_t res;
1458
1459         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1460         if (ret != 0) {
1461                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1462                 return -1;
1463         }
1464
1465         if (recmaster) {
1466                 *recmaster = (uint32_t)res;
1467         }
1468
1469         return 0;
1470 }
1471
1472 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1473 {
1474         struct ctdb_client_control_state *state;
1475
1476         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1477         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1478 }
1479
1480
1481 /*
1482   set the recovery master of a remote node
1483  */
1484 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1485 {
1486         int ret;
1487         TDB_DATA data;
1488         int32_t res;
1489
1490         ZERO_STRUCT(data);
1491         data.dsize = sizeof(uint32_t);
1492         data.dptr = (unsigned char *)&recmaster;
1493
1494         ret = ctdb_control(ctdb, destnode, 0, 
1495                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1496                            NULL, NULL, &res, &timeout, NULL);
1497         if (ret != 0 || res != 0) {
1498                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1499                 return -1;
1500         }
1501
1502         return 0;
1503 }
1504
1505
1506 /*
1507   get a list of databases off a remote node
1508  */
1509 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1510                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map_old **dbmap)
1511 {
1512         int ret;
1513         TDB_DATA outdata;
1514         int32_t res;
1515
1516         ret = ctdb_control(ctdb, destnode, 0, 
1517                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1518                            mem_ctx, &outdata, &res, &timeout, NULL);
1519         if (ret != 0 || res != 0) {
1520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1521                 return -1;
1522         }
1523
1524         *dbmap = (struct ctdb_dbid_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1525         talloc_free(outdata.dptr);
1526                     
1527         return 0;
1528 }
1529
1530 /*
1531   get a list of nodes (vnn and flags ) from a remote node
1532  */
1533 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1534                 struct timeval timeout, uint32_t destnode, 
1535                 TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1536 {
1537         int ret;
1538         TDB_DATA outdata;
1539         int32_t res;
1540
1541         ret = ctdb_control(ctdb, destnode, 0,
1542                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1543                            mem_ctx, &outdata, &res, &timeout, NULL);
1544         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1545                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1546                 return -1;
1547         }
1548
1549         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1550         talloc_free(outdata.dptr);
1551         return 0;
1552 }
1553
1554 /*
1555   load nodes file on a remote node and return as a node map
1556  */
1557 int ctdb_ctrl_getnodesfile(struct ctdb_context *ctdb,
1558                            struct timeval timeout, uint32_t destnode,
1559                            TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1560 {
1561         int ret;
1562         TDB_DATA outdata;
1563         int32_t res;
1564
1565         ret = ctdb_control(ctdb, destnode, 0,
1566                            CTDB_CONTROL_GET_NODES_FILE, 0, tdb_null,
1567                            mem_ctx, &outdata, &res, &timeout, NULL);
1568         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1569                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1570                 return -1;
1571         }
1572
1573         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1574         talloc_free(outdata.dptr);
1575
1576         return 0;
1577 }
1578
1579 /*
1580   drop the transport, reload the nodes file and restart the transport
1581  */
1582 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1583                     struct timeval timeout, uint32_t destnode)
1584 {
1585         int ret;
1586         int32_t res;
1587
1588         ret = ctdb_control(ctdb, destnode, 0, 
1589                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1590                            NULL, NULL, &res, &timeout, NULL);
1591         if (ret != 0 || res != 0) {
1592                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1593                 return -1;
1594         }
1595
1596         return 0;
1597 }
1598
1599
1600 /*
1601   set vnn map on a node
1602  */
1603 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1604                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1605 {
1606         int ret;
1607         TDB_DATA data;
1608         int32_t res;
1609         struct ctdb_vnn_map_wire *map;
1610         size_t len;
1611
1612         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1613         map = talloc_size(mem_ctx, len);
1614         CTDB_NO_MEMORY(ctdb, map);
1615
1616         map->generation = vnnmap->generation;
1617         map->size = vnnmap->size;
1618         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1619         
1620         data.dsize = len;
1621         data.dptr  = (uint8_t *)map;
1622
1623         ret = ctdb_control(ctdb, destnode, 0, 
1624                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1625                            NULL, NULL, &res, &timeout, NULL);
1626         if (ret != 0 || res != 0) {
1627                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1628                 return -1;
1629         }
1630
1631         talloc_free(map);
1632
1633         return 0;
1634 }
1635
1636
1637 /*
1638   async send for pull database
1639  */
1640 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1641         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1642         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1643 {
1644         TDB_DATA indata;
1645         struct ctdb_pulldb *pull;
1646         struct ctdb_client_control_state *state;
1647
1648         pull = talloc(mem_ctx, struct ctdb_pulldb);
1649         CTDB_NO_MEMORY_NULL(ctdb, pull);
1650
1651         pull->db_id   = dbid;
1652         pull->lmaster = lmaster;
1653
1654         indata.dsize = sizeof(struct ctdb_pulldb);
1655         indata.dptr  = (unsigned char *)pull;
1656
1657         state = ctdb_control_send(ctdb, destnode, 0, 
1658                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1659                                   mem_ctx, &timeout, NULL);
1660         talloc_free(pull);
1661
1662         return state;
1663 }
1664
1665 /*
1666   async recv for pull database
1667  */
1668 int ctdb_ctrl_pulldb_recv(
1669         struct ctdb_context *ctdb, 
1670         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1671         TDB_DATA *outdata)
1672 {
1673         int ret;
1674         int32_t res;
1675
1676         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1677         if ( (ret != 0) || (res != 0) ){
1678                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1679                 return -1;
1680         }
1681
1682         return 0;
1683 }
1684
1685 /*
1686   pull all keys and records for a specific database on a node
1687  */
1688 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1689                 uint32_t dbid, uint32_t lmaster, 
1690                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1691                 TDB_DATA *outdata)
1692 {
1693         struct ctdb_client_control_state *state;
1694
1695         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1696                                       timeout);
1697         
1698         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1699 }
1700
1701
1702 /*
1703   change dmaster for all keys in the database to the new value
1704  */
1705 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1706                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1707 {
1708         int ret;
1709         TDB_DATA indata;
1710         int32_t res;
1711
1712         indata.dsize = 2*sizeof(uint32_t);
1713         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1714
1715         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1716         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1717
1718         ret = ctdb_control(ctdb, destnode, 0, 
1719                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1720                            NULL, NULL, &res, &timeout, NULL);
1721         if (ret != 0 || res != 0) {
1722                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1723                 return -1;
1724         }
1725
1726         return 0;
1727 }
1728
1729 /*
1730   ping a node, return number of clients connected
1731  */
1732 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1733 {
1734         int ret;
1735         int32_t res;
1736
1737         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1738                            tdb_null, NULL, NULL, &res, NULL, NULL);
1739         if (ret != 0) {
1740                 return -1;
1741         }
1742         return res;
1743 }
1744
1745 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1746                            struct timeval timeout, 
1747                            uint32_t destnode,
1748                            uint32_t *runstate)
1749 {
1750         TDB_DATA outdata;
1751         int32_t res;
1752         int ret;
1753
1754         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1755                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1756         if (ret != 0 || res != 0) {
1757                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1758                 return ret != 0 ? ret : res;
1759         }
1760
1761         if (outdata.dsize != sizeof(uint32_t)) {
1762                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1763                 talloc_free(outdata.dptr);
1764                 return -1;
1765         }
1766
1767         if (runstate != NULL) {
1768                 *runstate = *(uint32_t *)outdata.dptr;
1769         }
1770         talloc_free(outdata.dptr);
1771
1772         return 0;
1773 }
1774
1775 /*
1776   find the real path to a ltdb 
1777  */
1778 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1779                    const char **path)
1780 {
1781         int ret;
1782         int32_t res;
1783         TDB_DATA data;
1784
1785         data.dptr = (uint8_t *)&dbid;
1786         data.dsize = sizeof(dbid);
1787
1788         ret = ctdb_control(ctdb, destnode, 0, 
1789                            CTDB_CONTROL_GETDBPATH, 0, data, 
1790                            mem_ctx, &data, &res, &timeout, NULL);
1791         if (ret != 0 || res != 0) {
1792                 return -1;
1793         }
1794
1795         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1796         if ((*path) == NULL) {
1797                 return -1;
1798         }
1799
1800         talloc_free(data.dptr);
1801
1802         return 0;
1803 }
1804
1805 /*
1806   find the name of a db 
1807  */
1808 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1809                    const char **name)
1810 {
1811         int ret;
1812         int32_t res;
1813         TDB_DATA data;
1814
1815         data.dptr = (uint8_t *)&dbid;
1816         data.dsize = sizeof(dbid);
1817
1818         ret = ctdb_control(ctdb, destnode, 0, 
1819                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1820                            mem_ctx, &data, &res, &timeout, NULL);
1821         if (ret != 0 || res != 0) {
1822                 return -1;
1823         }
1824
1825         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1826         if ((*name) == NULL) {
1827                 return -1;
1828         }
1829
1830         talloc_free(data.dptr);
1831
1832         return 0;
1833 }
1834
1835 /*
1836   get the health status of a db
1837  */
1838 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1839                           struct timeval timeout,
1840                           uint32_t destnode,
1841                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1842                           const char **reason)
1843 {
1844         int ret;
1845         int32_t res;
1846         TDB_DATA data;
1847
1848         data.dptr = (uint8_t *)&dbid;
1849         data.dsize = sizeof(dbid);
1850
1851         ret = ctdb_control(ctdb, destnode, 0,
1852                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1853                            mem_ctx, &data, &res, &timeout, NULL);
1854         if (ret != 0 || res != 0) {
1855                 return -1;
1856         }
1857
1858         if (data.dsize == 0) {
1859                 (*reason) = NULL;
1860                 return 0;
1861         }
1862
1863         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1864         if ((*reason) == NULL) {
1865                 return -1;
1866         }
1867
1868         talloc_free(data.dptr);
1869
1870         return 0;
1871 }
1872
1873 /*
1874  * get db sequence number
1875  */
1876 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1877                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1878 {
1879         int ret;
1880         int32_t res;
1881         TDB_DATA data, outdata;
1882         uint8_t buf[sizeof(uint64_t)] = { 0 };
1883
1884         *(uint32_t *)buf = dbid;
1885         data.dptr = buf;
1886         data.dsize = sizeof(uint64_t);
1887
1888         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1889                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1890         if (ret != 0 || res != 0) {
1891                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1892                 return -1;
1893         }
1894
1895         if (outdata.dsize != sizeof(uint64_t)) {
1896                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1897                 talloc_free(outdata.dptr);
1898                 return -1;
1899         }
1900
1901         if (seqnum != NULL) {
1902                 *seqnum = *(uint64_t *)outdata.dptr;
1903         }
1904         talloc_free(outdata.dptr);
1905
1906         return 0;
1907 }
1908
1909 /*
1910   create a database
1911  */
1912 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout,
1913                        uint32_t destnode, TALLOC_CTX *mem_ctx,
1914                        const char *name, uint8_t db_flags, uint32_t *db_id)
1915 {
1916         int ret;
1917         int32_t res;
1918         TDB_DATA data;
1919         uint32_t opcode;
1920
1921         data.dptr = discard_const(name);
1922         data.dsize = strlen(name)+1;
1923
1924         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
1925                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1926         } else if (db_flags & CTDB_DB_FLAGS_REPLICATED) {
1927                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1928         } else {
1929                 opcode = CTDB_CONTROL_DB_ATTACH;
1930         }
1931
1932         ret = ctdb_control(ctdb, destnode, 0, opcode, 0, data,
1933                            mem_ctx, &data, &res, &timeout, NULL);
1934
1935         if (ret != 0 || res != 0) {
1936                 return -1;
1937         }
1938
1939         if (data.dsize != sizeof(uint32_t)) {
1940                 TALLOC_FREE(data.dptr);
1941                 return -1;
1942         }
1943         if (db_id != NULL) {
1944                 *db_id = *(uint32_t *)data.dptr;
1945         }
1946         talloc_free(data.dptr);
1947
1948         return 0;
1949 }
1950
1951 /*
1952   get debug level on a node
1953  */
1954 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1955 {
1956         int ret;
1957         int32_t res;
1958         TDB_DATA data;
1959
1960         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1961                            ctdb, &data, &res, NULL, NULL);
1962         if (ret != 0 || res != 0) {
1963                 return -1;
1964         }
1965         if (data.dsize != sizeof(int32_t)) {
1966                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1967                          (unsigned)data.dsize));
1968                 return -1;
1969         }
1970         *level = *(int32_t *)data.dptr;
1971         talloc_free(data.dptr);
1972         return 0;
1973 }
1974
1975 /*
1976   set debug level on a node
1977  */
1978 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1979 {
1980         int ret;
1981         int32_t res;
1982         TDB_DATA data;
1983
1984         data.dptr = (uint8_t *)&level;
1985         data.dsize = sizeof(level);
1986
1987         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1988                            NULL, NULL, &res, NULL, NULL);
1989         if (ret != 0 || res != 0) {
1990                 return -1;
1991         }
1992         return 0;
1993 }
1994
1995
1996 /*
1997   get a list of connected nodes
1998  */
1999 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
2000                                 struct timeval timeout,
2001                                 TALLOC_CTX *mem_ctx,
2002                                 uint32_t *num_nodes)
2003 {
2004         struct ctdb_node_map_old *map=NULL;
2005         int ret, i;
2006         uint32_t *nodes;
2007
2008         *num_nodes = 0;
2009
2010         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2011         if (ret != 0) {
2012                 return NULL;
2013         }
2014
2015         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2016         if (nodes == NULL) {
2017                 return NULL;
2018         }
2019
2020         for (i=0;i<map->num;i++) {
2021                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2022                         nodes[*num_nodes] = map->nodes[i].pnn;
2023                         (*num_nodes)++;
2024                 }
2025         }
2026
2027         return nodes;
2028 }
2029
2030
2031 /*
2032   reset remote status
2033  */
2034 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2035 {
2036         int ret;
2037         int32_t res;
2038
2039         ret = ctdb_control(ctdb, destnode, 0, 
2040                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2041                            NULL, NULL, &res, NULL, NULL);
2042         if (ret != 0 || res != 0) {
2043                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2044                 return -1;
2045         }
2046         return 0;
2047 }
2048
2049 /*
2050  * Get db open flags
2051  */
2052 int ctdb_ctrl_db_open_flags(struct ctdb_context *ctdb, uint32_t db_id,
2053                             int *tdb_flags)
2054 {
2055         TDB_DATA indata, outdata;
2056         int ret;
2057         int32_t res;
2058
2059         indata.dptr = (uint8_t *)&db_id;
2060         indata.dsize = sizeof(db_id);
2061
2062         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2063                            CTDB_CONTROL_DB_OPEN_FLAGS, 0, indata,
2064                            ctdb, &outdata, &res, NULL, NULL);
2065         if (ret != 0 || res != 0) {
2066                 D_ERR("ctdb control for db open flags failed\n");
2067                 return  -1;
2068         }
2069
2070         if (outdata.dsize != sizeof(int32_t)) {
2071                 D_ERR(__location__ " expected %zi bytes, received %zi bytes\n",
2072                       sizeof(int32_t), outdata.dsize);
2073                 talloc_free(outdata.dptr);
2074                 return -1;
2075         }
2076
2077         *tdb_flags = *(int32_t *)outdata.dptr;
2078         talloc_free(outdata.dptr);
2079         return 0;
2080 }
2081
2082 /*
2083   attach to a specific database - client call
2084 */
2085 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2086                                     struct timeval timeout,
2087                                     const char *name,
2088                                     uint8_t db_flags)
2089 {
2090         struct ctdb_db_context *ctdb_db;
2091         int ret;
2092         int tdb_flags;
2093
2094         ctdb_db = ctdb_db_handle(ctdb, name);
2095         if (ctdb_db) {
2096                 return ctdb_db;
2097         }
2098
2099         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2100         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2101
2102         ctdb_db->ctdb = ctdb;
2103         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2104         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2105
2106         /* tell ctdb daemon to attach */
2107         ret = ctdb_ctrl_createdb(ctdb, timeout, CTDB_CURRENT_NODE,
2108                                  ctdb_db, name, db_flags, &ctdb_db->db_id);
2109         if (ret != 0) {
2110                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2111                 talloc_free(ctdb_db);
2112                 return NULL;
2113         }
2114
2115         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2116         if (ret != 0) {
2117                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2118                 talloc_free(ctdb_db);
2119                 return NULL;
2120         }
2121
2122         ret = ctdb_ctrl_db_open_flags(ctdb, ctdb_db->db_id, &tdb_flags);
2123         if (ret != 0) {
2124                 D_ERR("Failed to get tdb_flags for database '%s'\n", name);
2125                 talloc_free(ctdb_db);
2126                 return NULL;
2127         }
2128
2129         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2130                                       O_RDWR, 0);
2131         if (ctdb_db->ltdb == NULL) {
2132                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2133                 talloc_free(ctdb_db);
2134                 return NULL;
2135         }
2136
2137         ctdb_db->db_flags = db_flags;
2138
2139         DLIST_ADD(ctdb->db_list, ctdb_db);
2140
2141         /* add well known functions */
2142         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2143         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2144         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2145
2146         return ctdb_db;
2147 }
2148
2149 /*
2150  * detach from a specific database - client call
2151  */
2152 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2153 {
2154         int ret;
2155         int32_t status;
2156         TDB_DATA data;
2157
2158         data.dsize = sizeof(db_id);
2159         data.dptr = (uint8_t *)&db_id;
2160
2161         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2162                            0, data, NULL, NULL, &status, NULL, NULL);
2163         if (ret != 0 || status != 0) {
2164                 return -1;
2165         }
2166         return 0;
2167 }
2168
2169 /*
2170   setup a call for a database
2171  */
2172 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2173 {
2174         struct ctdb_registered_call *call;
2175
2176         /* register locally */
2177         call = talloc(ctdb_db, struct ctdb_registered_call);
2178         call->fn = fn;
2179         call->id = id;
2180
2181         DLIST_ADD(ctdb_db->calls, call);
2182         return 0;
2183 }
2184
2185
2186 struct traverse_state {
2187         bool done;
2188         uint32_t count;
2189         ctdb_traverse_func fn;
2190         void *private_data;
2191         bool listemptyrecords;
2192 };
2193
2194 /*
2195   called on each key during a ctdb_traverse
2196  */
2197 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
2198 {
2199         struct traverse_state *state = (struct traverse_state *)p;
2200         struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
2201         TDB_DATA key;
2202
2203         if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) {
2204                 DEBUG(DEBUG_ERR, ("Bad data size %u in traverse_handler\n",
2205                                   (unsigned)data.dsize));
2206                 state->done = true;
2207                 return;
2208         }
2209
2210         key.dsize = d->keylen;
2211         key.dptr  = &d->data[0];
2212         data.dsize = d->datalen;
2213         data.dptr = &d->data[d->keylen];
2214
2215         if (key.dsize == 0 && data.dsize == 0) {
2216                 /* end of traverse */
2217                 state->done = true;
2218                 return;
2219         }
2220
2221         if (!state->listemptyrecords &&
2222             data.dsize == sizeof(struct ctdb_ltdb_header))
2223         {
2224                 /* empty records are deleted records in ctdb */
2225                 return;
2226         }
2227
2228         if (state->fn(key, data, state->private_data) != 0) {
2229                 state->done = true;
2230         }
2231
2232         state->count++;
2233 }
2234
2235 /**
2236  * start a cluster wide traverse, calling the supplied fn on each record
2237  * return the number of records traversed, or -1 on error
2238  *
2239  * Extendet variant with a flag to signal whether empty records should
2240  * be listed.
2241  */
2242 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2243                              ctdb_traverse_func fn,
2244                              bool withemptyrecords,
2245                              void *private_data)
2246 {
2247         TDB_DATA data;
2248         struct ctdb_traverse_start_ext t;
2249         int32_t status;
2250         int ret;
2251         uint64_t srvid = (getpid() | 0xFLL<<60);
2252         struct traverse_state state;
2253
2254         state.done = false;
2255         state.count = 0;
2256         state.private_data = private_data;
2257         state.fn = fn;
2258         state.listemptyrecords = withemptyrecords;
2259
2260         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2261         if (ret != 0) {
2262                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2263                 return -1;
2264         }
2265
2266         t.db_id = ctdb_db->db_id;
2267         t.srvid = srvid;
2268         t.reqid = 0;
2269         t.withemptyrecords = withemptyrecords;
2270
2271         data.dptr = (uint8_t *)&t;
2272         data.dsize = sizeof(t);
2273
2274         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2275                            data, NULL, NULL, &status, NULL, NULL);
2276         if (ret != 0 || status != 0) {
2277                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2278                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2279                 return -1;
2280         }
2281
2282         while (!state.done) {
2283                 tevent_loop_once(ctdb_db->ctdb->ev);
2284         }
2285
2286         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2287         if (ret != 0) {
2288                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2289                 return -1;
2290         }
2291
2292         return state.count;
2293 }
2294
2295 /**
2296  * start a cluster wide traverse, calling the supplied fn on each record
2297  * return the number of records traversed, or -1 on error
2298  *
2299  * Standard version which does not list the empty records:
2300  * These are considered deleted.
2301  */
2302 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2303 {
2304         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2305 }
2306
2307 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2308 /*
2309   called on each key during a catdb
2310  */
2311 int ctdb_dumpdb_record(TDB_DATA key, TDB_DATA data, void *p)
2312 {
2313         int i;
2314         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2315         FILE *f = c->f;
2316         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2317
2318         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2319         for (i=0;i<key.dsize;i++) {
2320                 if (ISASCII(key.dptr[i])) {
2321                         fprintf(f, "%c", key.dptr[i]);
2322                 } else {
2323                         fprintf(f, "\\%02X", key.dptr[i]);
2324                 }
2325         }
2326         fprintf(f, "\"\n");
2327
2328         fprintf(f, "dmaster: %u\n", h->dmaster);
2329         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2330
2331         if (c->printlmaster && c->ctdb->vnn_map != NULL) {
2332                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(c->ctdb, &key));
2333         }
2334
2335         if (c->printhash) {
2336                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2337         }
2338
2339         if (c->printrecordflags) {
2340                 fprintf(f, "flags: 0x%08x", h->flags);
2341                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2342                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2343                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2344                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2345                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2346                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2347                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2348                 fprintf(f, "\n");
2349         }
2350
2351         if (c->printdatasize) {
2352                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2353         } else {
2354                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2355                 for (i=sizeof(*h);i<data.dsize;i++) {
2356                         if (ISASCII(data.dptr[i])) {
2357                                 fprintf(f, "%c", data.dptr[i]);
2358                         } else {
2359                                 fprintf(f, "\\%02X", data.dptr[i]);
2360                         }
2361                 }
2362                 fprintf(f, "\"\n");
2363         }
2364
2365         fprintf(f, "\n");
2366
2367         return 0;
2368 }
2369
2370 /*
2371   convenience function to list all keys to stdout
2372  */
2373 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2374                  struct ctdb_dump_db_context *ctx)
2375 {
2376         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2377                                  ctx->printemptyrecords, ctx);
2378 }
2379
2380 /*
2381   get the pid of a ctdb daemon
2382  */
2383 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2384 {
2385         int ret;
2386         int32_t res;
2387
2388         ret = ctdb_control(ctdb, destnode, 0, 
2389                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2390                            NULL, NULL, &res, &timeout, NULL);
2391         if (ret != 0) {
2392                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2393                 return -1;
2394         }
2395
2396         *pid = res;
2397
2398         return 0;
2399 }
2400
2401 /* Freeze all databases */
2402 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout,
2403                      uint32_t destnode)
2404 {
2405         int ret;
2406         int32_t res;
2407
2408         ret = ctdb_control(ctdb, destnode, 0,
2409                            CTDB_CONTROL_FREEZE, 0, tdb_null,
2410                            NULL, NULL, &res, &timeout, NULL);
2411         if (ret != 0 || res != 0) {
2412                 DEBUG(DEBUG_ERR, ("ctdb_ctrl_freeze_priority failed\n"));
2413                 return -1;
2414         }
2415
2416         return 0;
2417 }
2418
2419 /*
2420   get pnn of a node, or -1
2421  */
2422 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2423 {
2424         int ret;
2425         int32_t res;
2426
2427         ret = ctdb_control(ctdb, destnode, 0, 
2428                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2429                            NULL, NULL, &res, &timeout, NULL);
2430         if (ret != 0) {
2431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2432                 return -1;
2433         }
2434
2435         return res;
2436 }
2437
2438
2439 /*
2440   sent to a node to make it take over an ip address
2441 */
2442 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2443                           uint32_t destnode, struct ctdb_public_ip *ip)
2444 {
2445         TDB_DATA data;
2446         int ret;
2447         int32_t res;
2448
2449         data.dsize = sizeof(*ip);
2450         data.dptr  = (uint8_t *)ip;
2451
2452         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0,
2453                            data, NULL, NULL, &res, &timeout, NULL);
2454         if (ret != 0 || res != 0) {
2455                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2456                 return -1;
2457         }
2458
2459         return 0;
2460 }
2461
2462
2463 /*
2464   sent to a node to make it release an ip address
2465 */
2466 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2467                          uint32_t destnode, struct ctdb_public_ip *ip)
2468 {
2469         TDB_DATA data;
2470         int ret;
2471         int32_t res;
2472
2473         data.dsize = sizeof(*ip);
2474         data.dptr  = (uint8_t *)ip;
2475
2476         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0,
2477                            data, NULL, NULL, &res, &timeout, NULL);
2478         if (ret != 0 || res != 0) {
2479                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2480                 return -1;
2481         }
2482
2483         return 0;
2484 }
2485
2486
2487 /*
2488   get a tunable
2489  */
2490 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2491                           struct timeval timeout, 
2492                           uint32_t destnode,
2493                           const char *name, uint32_t *value)
2494 {
2495         struct ctdb_control_get_tunable *t;
2496         TDB_DATA data, outdata;
2497         int32_t res;
2498         int ret;
2499
2500         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2501         data.dptr  = talloc_size(ctdb, data.dsize);
2502         CTDB_NO_MEMORY(ctdb, data.dptr);
2503
2504         t = (struct ctdb_control_get_tunable *)data.dptr;
2505         t->length = strlen(name)+1;
2506         memcpy(t->name, name, t->length);
2507
2508         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2509                            &outdata, &res, &timeout, NULL);
2510         talloc_free(data.dptr);
2511         if (ret != 0 || res != 0) {
2512                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2513                 return ret != 0 ? ret : res;
2514         }
2515
2516         if (outdata.dsize != sizeof(uint32_t)) {
2517                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2518                 talloc_free(outdata.dptr);
2519                 return -1;
2520         }
2521         
2522         *value = *(uint32_t *)outdata.dptr;
2523         talloc_free(outdata.dptr);
2524
2525         return 0;
2526 }
2527
2528 /*
2529   set a tunable
2530  */
2531 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2532                           struct timeval timeout, 
2533                           uint32_t destnode,
2534                           const char *name, uint32_t value)
2535 {
2536         struct ctdb_tunable_old *t;
2537         TDB_DATA data;
2538         int32_t res;
2539         int ret;
2540
2541         data.dsize = offsetof(struct ctdb_tunable_old, name) + strlen(name) + 1;
2542         data.dptr  = talloc_size(ctdb, data.dsize);
2543         CTDB_NO_MEMORY(ctdb, data.dptr);
2544
2545         t = (struct ctdb_tunable_old *)data.dptr;
2546         t->length = strlen(name)+1;
2547         memcpy(t->name, name, t->length);
2548         t->value = value;
2549
2550         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2551                            NULL, &res, &timeout, NULL);
2552         talloc_free(data.dptr);
2553         if ((ret != 0) || (res == -1)) {
2554                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2555                 return -1;
2556         }
2557
2558         return res;
2559 }
2560
2561 /*
2562   list tunables
2563  */
2564 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2565                             struct timeval timeout, 
2566                             uint32_t destnode,
2567                             TALLOC_CTX *mem_ctx,
2568                             const char ***list, uint32_t *count)
2569 {
2570         TDB_DATA outdata;
2571         int32_t res;
2572         int ret;
2573         struct ctdb_control_list_tunable *t;
2574         char *p, *s, *ptr;
2575
2576         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2577                            mem_ctx, &outdata, &res, &timeout, NULL);
2578         if (ret != 0 || res != 0) {
2579                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2580                 return -1;
2581         }
2582
2583         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2584         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2585             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2586                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2587                 talloc_free(outdata.dptr);
2588                 return -1;              
2589         }
2590         
2591         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2592         CTDB_NO_MEMORY(ctdb, p);
2593
2594         talloc_free(outdata.dptr);
2595         
2596         (*list) = NULL;
2597         (*count) = 0;
2598
2599         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2600                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2601                 CTDB_NO_MEMORY(ctdb, *list);
2602                 (*list)[*count] = talloc_strdup(*list, s);
2603                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2604                 (*count)++;
2605         }
2606
2607         talloc_free(p);
2608
2609         return 0;
2610 }
2611
2612
2613 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2614                                    struct timeval timeout, uint32_t destnode,
2615                                    TALLOC_CTX *mem_ctx,
2616                                    uint32_t flags,
2617                                    struct ctdb_public_ip_list_old **ips)
2618 {
2619         int ret;
2620         TDB_DATA outdata;
2621         int32_t res;
2622
2623         ret = ctdb_control(ctdb, destnode, 0,
2624                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2625                            mem_ctx, &outdata, &res, &timeout, NULL);
2626         if (ret != 0 || res != 0) {
2627                 DEBUG(DEBUG_ERR,(__location__
2628                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
2629                                  ret, res));
2630                 return -1;
2631         }
2632
2633         *ips = (struct ctdb_public_ip_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2634         talloc_free(outdata.dptr);
2635
2636         return 0;
2637 }
2638
2639 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2640                              struct timeval timeout, uint32_t destnode,
2641                              TALLOC_CTX *mem_ctx,
2642                              struct ctdb_public_ip_list_old **ips)
2643 {
2644         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2645                                               destnode, mem_ctx,
2646                                               0, ips);
2647 }
2648
2649 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2650                                  struct timeval timeout, uint32_t destnode,
2651                                  TALLOC_CTX *mem_ctx,
2652                                  const ctdb_sock_addr *addr,
2653                                  struct ctdb_public_ip_info_old **_info)
2654 {
2655         int ret;
2656         TDB_DATA indata;
2657         TDB_DATA outdata;
2658         int32_t res;
2659         struct ctdb_public_ip_info_old *info;
2660         uint32_t len;
2661         uint32_t i;
2662
2663         indata.dptr = discard_const_p(uint8_t, addr);
2664         indata.dsize = sizeof(*addr);
2665
2666         ret = ctdb_control(ctdb, destnode, 0,
2667                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2668                            mem_ctx, &outdata, &res, &timeout, NULL);
2669         if (ret != 0 || res != 0) {
2670                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2671                                 "failed ret:%d res:%d\n",
2672                                 ret, res));
2673                 return -1;
2674         }
2675
2676         len = offsetof(struct ctdb_public_ip_info_old, ifaces);
2677         if (len > outdata.dsize) {
2678                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2679                                 "returned invalid data with size %u > %u\n",
2680                                 (unsigned int)outdata.dsize,
2681                                 (unsigned int)len));
2682                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2683                 return -1;
2684         }
2685
2686         info = (struct ctdb_public_ip_info_old *)outdata.dptr;
2687         len += info->num*sizeof(struct ctdb_iface);
2688
2689         if (len > outdata.dsize) {
2690                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2691                                 "returned invalid data with size %u > %u\n",
2692                                 (unsigned int)outdata.dsize,
2693                                 (unsigned int)len));
2694                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2695                 return -1;
2696         }
2697
2698         /* make sure we null terminate the returned strings */
2699         for (i=0; i < info->num; i++) {
2700                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2701         }
2702
2703         *_info = (struct ctdb_public_ip_info_old *)talloc_memdup(mem_ctx,
2704                                                                 outdata.dptr,
2705                                                                 outdata.dsize);
2706         talloc_free(outdata.dptr);
2707         if (*_info == NULL) {
2708                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2709                                 "talloc_memdup size %u failed\n",
2710                                 (unsigned int)outdata.dsize));
2711                 return -1;
2712         }
2713
2714         return 0;
2715 }
2716
2717 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2718                          struct timeval timeout, uint32_t destnode,
2719                          TALLOC_CTX *mem_ctx,
2720                          struct ctdb_iface_list_old **_ifaces)
2721 {
2722         int ret;
2723         TDB_DATA outdata;
2724         int32_t res;
2725         struct ctdb_iface_list_old *ifaces;
2726         uint32_t len;
2727         uint32_t i;
2728
2729         ret = ctdb_control(ctdb, destnode, 0,
2730                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2731                            mem_ctx, &outdata, &res, &timeout, NULL);
2732         if (ret != 0 || res != 0) {
2733                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2734                                 "failed ret:%d res:%d\n",
2735                                 ret, res));
2736                 return -1;
2737         }
2738
2739         len = offsetof(struct ctdb_iface_list_old, ifaces);
2740         if (len > outdata.dsize) {
2741                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2742                                 "returned invalid data with size %u > %u\n",
2743                                 (unsigned int)outdata.dsize,
2744                                 (unsigned int)len));
2745                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2746                 return -1;
2747         }
2748
2749         ifaces = (struct ctdb_iface_list_old *)outdata.dptr;
2750         len += ifaces->num*sizeof(struct ctdb_iface);
2751
2752         if (len > outdata.dsize) {
2753                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2754                                 "returned invalid data with size %u > %u\n",
2755                                 (unsigned int)outdata.dsize,
2756                                 (unsigned int)len));
2757                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2758                 return -1;
2759         }
2760
2761         /* make sure we null terminate the returned strings */
2762         for (i=0; i < ifaces->num; i++) {
2763                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2764         }
2765
2766         *_ifaces = (struct ctdb_iface_list_old *)talloc_memdup(mem_ctx,
2767                                                                   outdata.dptr,
2768                                                                   outdata.dsize);
2769         talloc_free(outdata.dptr);
2770         if (*_ifaces == NULL) {
2771                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2772                                 "talloc_memdup size %u failed\n",
2773                                 (unsigned int)outdata.dsize));
2774                 return -1;
2775         }
2776
2777         return 0;
2778 }
2779
2780 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2781                              struct timeval timeout, uint32_t destnode,
2782                              TALLOC_CTX *mem_ctx,
2783                              const struct ctdb_iface *info)
2784 {
2785         int ret;
2786         TDB_DATA indata;
2787         int32_t res;
2788
2789         indata.dptr = discard_const_p(uint8_t, info);
2790         indata.dsize = sizeof(*info);
2791
2792         ret = ctdb_control(ctdb, destnode, 0,
2793                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2794                            mem_ctx, NULL, &res, &timeout, NULL);
2795         if (ret != 0 || res != 0) {
2796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2797                                 "failed ret:%d res:%d\n",
2798                                 ret, res));
2799                 return -1;
2800         }
2801
2802         return 0;
2803 }
2804
2805 /*
2806   set/clear the permanent disabled bit on a remote node
2807  */
2808 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2809                        uint32_t set, uint32_t clear)
2810 {
2811         int ret;
2812         TDB_DATA data;
2813         struct ctdb_node_map_old *nodemap=NULL;
2814         struct ctdb_node_flag_change c;
2815         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2816         uint32_t recmaster;
2817         uint32_t *nodes;
2818
2819
2820         /* find the recovery master */
2821         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2822         if (ret != 0) {
2823                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2824                 talloc_free(tmp_ctx);
2825                 return ret;
2826         }
2827
2828
2829         /* read the node flags from the recmaster */
2830         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2831         if (ret != 0) {
2832                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2833                 talloc_free(tmp_ctx);
2834                 return -1;
2835         }
2836         if (destnode >= nodemap->num) {
2837                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2838                 talloc_free(tmp_ctx);
2839                 return -1;
2840         }
2841
2842         c.pnn       = destnode;
2843         c.old_flags = nodemap->nodes[destnode].flags;
2844         c.new_flags = c.old_flags;
2845         c.new_flags |= set;
2846         c.new_flags &= ~clear;
2847
2848         data.dsize = sizeof(c);
2849         data.dptr = (unsigned char *)&c;
2850
2851         /* send the flags update to all connected nodes */
2852         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2853
2854         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2855                                         nodes, 0,
2856                                         timeout, false, data,
2857                                         NULL, NULL,
2858                                         NULL) != 0) {
2859                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2860
2861                 talloc_free(tmp_ctx);
2862                 return -1;
2863         }
2864
2865         talloc_free(tmp_ctx);
2866         return 0;
2867 }
2868
2869
2870 /*
2871   get all tunables
2872  */
2873 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2874                                struct timeval timeout,
2875                                uint32_t destnode,
2876                                struct ctdb_tunable_list *tunables)
2877 {
2878         TDB_DATA outdata;
2879         int ret;
2880         int32_t res;
2881
2882         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2883                            &outdata, &res, &timeout, NULL);
2884         if (ret != 0 || res != 0) {
2885                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2886                 return -1;
2887         }
2888
2889         if (outdata.dsize != sizeof(*tunables)) {
2890                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2891                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2892                 return -1;
2893         }
2894
2895         *tunables = *(struct ctdb_tunable_list *)outdata.dptr;
2896         talloc_free(outdata.dptr);
2897         return 0;
2898 }
2899
2900 /*
2901   add a public address to a node
2902  */
2903 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2904                             struct timeval timeout, uint32_t destnode,
2905                             struct ctdb_addr_info_old *pub)
2906 {
2907         TDB_DATA data;
2908         int32_t res;
2909         int ret;
2910
2911         data.dsize = offsetof(struct ctdb_addr_info_old, iface) + pub->len;
2912         data.dptr  = (unsigned char *)pub;
2913
2914         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2915                            NULL, &res, &timeout, NULL);
2916         if (ret != 0 || res != 0) {
2917                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2918                 return -1;
2919         }
2920
2921         return 0;
2922 }
2923
2924 /*
2925   delete a public address from a node
2926  */
2927 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2928                             struct timeval timeout, uint32_t destnode,
2929                             struct ctdb_addr_info_old *pub)
2930 {
2931         TDB_DATA data;
2932         int32_t res;
2933         int ret;
2934
2935         data.dsize = offsetof(struct ctdb_addr_info_old, iface) + pub->len;
2936         data.dptr  = (unsigned char *)pub;
2937
2938         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2939                            NULL, &res, &timeout, NULL);
2940         if (ret != 0 || res != 0) {
2941                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2942                 return -1;
2943         }
2944
2945         return 0;
2946 }
2947
2948 /*
2949   send a gratious arp
2950  */
2951 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2952                            struct timeval timeout, uint32_t destnode,
2953                            ctdb_sock_addr *addr, const char *ifname)
2954 {
2955         TDB_DATA data;
2956         int32_t res;
2957         int ret, len;
2958         struct ctdb_addr_info_old *gratious_arp;
2959         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2960
2961
2962         len = strlen(ifname)+1;
2963         gratious_arp = talloc_size(tmp_ctx,
2964                 offsetof(struct ctdb_addr_info_old, iface) + len);
2965         CTDB_NO_MEMORY(ctdb, gratious_arp);
2966
2967         gratious_arp->addr = *addr;
2968         gratious_arp->len = len;
2969         memcpy(&gratious_arp->iface[0], ifname, len);
2970
2971
2972         data.dsize = offsetof(struct ctdb_addr_info_old, iface) + len;
2973         data.dptr  = (unsigned char *)gratious_arp;
2974
2975         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATUITOUS_ARP, 0, data, NULL,
2976                            NULL, &res, &timeout, NULL);
2977         if (ret != 0 || res != 0) {
2978                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2979                 talloc_free(tmp_ctx);
2980                 return -1;
2981         }
2982
2983         talloc_free(tmp_ctx);
2984         return 0;
2985 }
2986
2987 /*
2988   get a list of all tcp tickles that a node knows about for a particular vnn
2989  */
2990 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2991                               struct timeval timeout, uint32_t destnode, 
2992                               TALLOC_CTX *mem_ctx, 
2993                               ctdb_sock_addr *addr,
2994                               struct ctdb_tickle_list_old **list)
2995 {
2996         int ret;
2997         TDB_DATA data, outdata;
2998         int32_t status;
2999
3000         data.dptr = (uint8_t*)addr;
3001         data.dsize = sizeof(ctdb_sock_addr);
3002
3003         ret = ctdb_control(ctdb, destnode, 0, 
3004                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3005                            mem_ctx, &outdata, &status, NULL, NULL);
3006         if (ret != 0 || status != 0) {
3007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3008                 return -1;
3009         }
3010
3011         *list = (struct ctdb_tickle_list_old *)outdata.dptr;
3012
3013         return status;
3014 }
3015
3016 /*
3017   initialise the ctdb daemon for client applications
3018
3019   NOTE: In current code the daemon does not fork. This is for testing purposes only
3020   and to simplify the code.
3021 */
3022 struct ctdb_context *ctdb_init(struct tevent_context *ev)
3023 {
3024         int ret;
3025         struct ctdb_context *ctdb;
3026
3027         ctdb = talloc_zero(ev, struct ctdb_context);
3028         if (ctdb == NULL) {
3029                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3030                 return NULL;
3031         }
3032         ctdb->ev  = ev;
3033         /* Wrap early to exercise code. */
3034         ret = reqid_init(ctdb, INT_MAX-200, &ctdb->idr);
3035         if (ret != 0) {
3036                 DEBUG(DEBUG_ERR, ("reqid_init failed (%s)\n", strerror(ret)));
3037                 talloc_free(ctdb);
3038                 return NULL;
3039         }
3040
3041         ret = srvid_init(ctdb, &ctdb->srv);
3042         if (ret != 0) {
3043                 DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
3044                 talloc_free(ctdb);
3045                 return NULL;
3046         }
3047
3048         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3049         if (ret != 0) {
3050                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3051                 talloc_free(ctdb);
3052                 return NULL;
3053         }
3054
3055         ctdb->statistics.statistics_start_time = timeval_current();
3056
3057         return ctdb;
3058 }
3059
3060
3061 /*
3062   set some ctdb flags
3063 */
3064 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3065 {
3066         ctdb->flags |= flags;
3067 }
3068
3069 /*
3070   setup the local socket name
3071 */
3072 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3073 {
3074         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3075         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3076
3077         return 0;
3078 }
3079
3080 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3081 {
3082         return ctdb->daemon.name;
3083 }
3084
3085 /*
3086   return the pnn of this node
3087 */
3088 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3089 {
3090         return ctdb->pnn;
3091 }
3092
3093
3094 /*
3095   get the uptime of a remote node
3096  */
3097 struct ctdb_client_control_state *
3098 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3099 {
3100         return ctdb_control_send(ctdb, destnode, 0, 
3101                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3102                            mem_ctx, &timeout, NULL);
3103 }
3104
3105 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3106 {
3107         int ret;
3108         int32_t res;
3109         TDB_DATA outdata;
3110
3111         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3112         if (ret != 0 || res != 0) {
3113                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3114                 return -1;
3115         }
3116
3117         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3118
3119         return 0;
3120 }
3121
3122 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3123 {
3124         struct ctdb_client_control_state *state;
3125
3126         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3127         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3128 }
3129
3130 /*
3131   send a control to execute the "recovered" event script on a node
3132  */
3133 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3134 {
3135         int ret;
3136         int32_t status;
3137
3138         ret = ctdb_control(ctdb, destnode, 0, 
3139                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3140                            NULL, NULL, &status, &timeout, NULL);
3141         if (ret != 0 || status != 0) {
3142                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3143                 return -1;
3144         }
3145
3146         return 0;
3147 }
3148
3149 /* 
3150   callback for the async helpers used when sending the same control
3151   to multiple nodes in parallell.
3152 */
3153 static void async_callback(struct ctdb_client_control_state *state)
3154 {
3155         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3156         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3157         int ret;
3158         TDB_DATA outdata;
3159         int32_t res = -1;
3160         uint32_t destnode = state->c->hdr.destnode;
3161
3162         outdata.dsize = 0;
3163         outdata.dptr = NULL;
3164
3165         /* one more node has responded with recmode data */
3166         data->count--;
3167
3168         /* if we failed to push the db, then return an error and let
3169            the main loop try again.
3170         */
3171         if (state->state != CTDB_CONTROL_DONE) {
3172                 if ( !data->dont_log_errors) {
3173                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3174                 }
3175                 data->fail_count++;
3176                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3177                         res = -ETIME;
3178                 } else {
3179                         res = -1;
3180                 }
3181                 if (data->fail_callback) {
3182                         data->fail_callback(ctdb, destnode, res, outdata,
3183                                         data->callback_data);
3184                 }
3185                 return;
3186         }
3187         
3188         state->async.fn = NULL;
3189
3190         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3191         if ((ret != 0) || (res != 0)) {
3192                 if ( !data->dont_log_errors) {
3193                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3194                 }
3195                 data->fail_count++;
3196                 if (data->fail_callback) {
3197                         data->fail_callback(ctdb, destnode, res, outdata,
3198                                         data->callback_data);
3199                 }
3200         }
3201         if ((ret == 0) && (data->callback != NULL)) {
3202                 data->callback(ctdb, destnode, res, outdata,
3203                                         data->callback_data);
3204         }
3205 }
3206
3207
3208 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3209 {
3210         /* set up the callback functions */
3211         state->async.fn = async_callback;
3212         state->async.private_data = data;
3213         
3214         /* one more control to wait for to complete */
3215         data->count++;
3216 }
3217
3218
3219 /* wait for up to the maximum number of seconds allowed
3220    or until all nodes we expect a response from has replied
3221 */
3222 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3223 {
3224         while (data->count > 0) {
3225                 tevent_loop_once(ctdb->ev);
3226         }
3227         if (data->fail_count != 0) {
3228                 if (!data->dont_log_errors) {
3229                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3230                                  data->fail_count));
3231                 }
3232                 return -1;
3233         }
3234         return 0;
3235 }
3236
3237
3238 /* 
3239    perform a simple control on the listed nodes
3240    The control cannot return data
3241  */
3242 int ctdb_client_async_control(struct ctdb_context *ctdb,
3243                                 enum ctdb_controls opcode,
3244                                 uint32_t *nodes,
3245                                 uint64_t srvid,
3246                                 struct timeval timeout,
3247                                 bool dont_log_errors,
3248                                 TDB_DATA data,
3249                                 client_async_callback client_callback,
3250                                 client_async_callback fail_callback,
3251                                 void *callback_data)
3252 {
3253         struct client_async_data *async_data;
3254         struct ctdb_client_control_state *state;
3255         int j, num_nodes;
3256
3257         async_data = talloc_zero(ctdb, struct client_async_data);
3258         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3259         async_data->dont_log_errors = dont_log_errors;
3260         async_data->callback = client_callback;
3261         async_data->fail_callback = fail_callback;
3262         async_data->callback_data = callback_data;
3263         async_data->opcode        = opcode;
3264
3265         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3266
3267         /* loop over all nodes and send an async control to each of them */
3268         for (j=0; j<num_nodes; j++) {
3269                 uint32_t pnn = nodes[j];
3270
3271                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3272                                           0, data, async_data, &timeout, NULL);
3273                 if (state == NULL) {
3274                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3275                         talloc_free(async_data);
3276                         return -1;
3277                 }
3278                 
3279                 ctdb_client_async_add(async_data, state);
3280         }
3281
3282         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3283                 talloc_free(async_data);
3284                 return -1;
3285         }
3286
3287         talloc_free(async_data);
3288         return 0;
3289 }
3290
3291 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3292                                 struct ctdb_vnn_map *vnn_map,
3293                                 TALLOC_CTX *mem_ctx,
3294                                 bool include_self)
3295 {
3296         int i, j, num_nodes;
3297         uint32_t *nodes;
3298
3299         for (i=num_nodes=0;i<vnn_map->size;i++) {
3300                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3301                         continue;
3302                 }
3303                 num_nodes++;
3304         } 
3305
3306         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3307         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3308
3309         for (i=j=0;i<vnn_map->size;i++) {
3310                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3311                         continue;
3312                 }
3313                 nodes[j++] = vnn_map->map[i];
3314         } 
3315
3316         return nodes;
3317 }
3318
3319 /* Get list of nodes not including those with flags specified by mask.
3320  * If exclude_pnn is not -1 then exclude that pnn from the list.
3321  */
3322 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3323                         struct ctdb_node_map_old *node_map,
3324                         TALLOC_CTX *mem_ctx,
3325                         uint32_t mask,
3326                         int exclude_pnn)
3327 {
3328         int i, j, num_nodes;
3329         uint32_t *nodes;
3330
3331         for (i=num_nodes=0;i<node_map->num;i++) {
3332                 if (node_map->nodes[i].flags & mask) {
3333                         continue;
3334                 }
3335                 if (node_map->nodes[i].pnn == exclude_pnn) {
3336                         continue;
3337                 }
3338                 num_nodes++;
3339         } 
3340
3341         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3342         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3343
3344         for (i=j=0;i<node_map->num;i++) {
3345                 if (node_map->nodes[i].flags & mask) {
3346                         continue;
3347                 }
3348                 if (node_map->nodes[i].pnn == exclude_pnn) {
3349                         continue;
3350                 }
3351                 nodes[j++] = node_map->nodes[i].pnn;
3352         } 
3353
3354         return nodes;
3355 }
3356
3357 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3358                                 struct ctdb_node_map_old *node_map,
3359                                 TALLOC_CTX *mem_ctx,
3360                                 bool include_self)
3361 {
3362         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3363                              include_self ? -1 : ctdb->pnn);
3364 }
3365
3366 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3367                                 struct ctdb_node_map_old *node_map,
3368                                 TALLOC_CTX *mem_ctx,
3369                                 bool include_self)
3370 {
3371         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3372                              include_self ? -1 : ctdb->pnn);
3373 }
3374
3375 /* 
3376   this is used to test if a pnn lock exists and if it exists will return
3377   the number of connections that pnn has reported or -1 if that recovery
3378   daemon is not running.
3379 */
3380 int
3381 ctdb_read_pnn_lock(int fd, int32_t pnn)
3382 {
3383         struct flock lock;
3384         char c;
3385
3386         lock.l_type = F_WRLCK;
3387         lock.l_whence = SEEK_SET;
3388         lock.l_start = pnn;
3389         lock.l_len = 1;
3390         lock.l_pid = 0;
3391
3392         if (fcntl(fd, F_GETLK, &lock) != 0) {
3393                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3394                 return -1;
3395         }
3396
3397         if (lock.l_type == F_UNLCK) {
3398                 return -1;
3399         }
3400
3401         if (pread(fd, &c, 1, pnn) == -1) {
3402                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3403                 return -1;
3404         }
3405
3406         return c;
3407 }
3408
3409 /*
3410   get capabilities of a remote node
3411  */
3412 struct ctdb_client_control_state *
3413 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3414 {
3415         return ctdb_control_send(ctdb, destnode, 0, 
3416                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3417                            mem_ctx, &timeout, NULL);
3418 }
3419
3420 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3421 {
3422         int ret;
3423         int32_t res;
3424         TDB_DATA outdata;
3425
3426         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3427         if ( (ret != 0) || (res != 0) ) {
3428                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3429                 return -1;
3430         }
3431
3432         if (capabilities) {
3433                 *capabilities = *((uint32_t *)outdata.dptr);
3434         }
3435
3436         return 0;
3437 }
3438
3439 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3440 {
3441         struct ctdb_client_control_state *state;
3442         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3443         int ret;
3444
3445         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3446         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3447         talloc_free(tmp_ctx);
3448         return ret;
3449 }
3450
3451 static void get_capabilities_callback(struct ctdb_context *ctdb,
3452                                       uint32_t node_pnn, int32_t res,
3453                                       TDB_DATA outdata, void *callback_data)
3454 {
3455         struct ctdb_node_capabilities *caps =
3456                 talloc_get_type(callback_data,
3457                                 struct ctdb_node_capabilities);
3458
3459         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
3460                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
3461                 return;
3462         }
3463
3464         if (node_pnn >= talloc_array_length(caps)) {
3465                 DEBUG(DEBUG_ERR,
3466                       (__location__ " unexpected PNN %u\n", node_pnn));
3467                 return;
3468         }
3469
3470         caps[node_pnn].retrieved = true;
3471         caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
3472 }
3473
3474 struct ctdb_node_capabilities *
3475 ctdb_get_capabilities(struct ctdb_context *ctdb,
3476                       TALLOC_CTX *mem_ctx,
3477                       struct timeval timeout,
3478                       struct ctdb_node_map_old *nodemap)
3479 {
3480         uint32_t *nodes;
3481         uint32_t i, res;
3482         struct ctdb_node_capabilities *ret;
3483
3484         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3485
3486         ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
3487                            nodemap->num);
3488         CTDB_NO_MEMORY_NULL(ctdb, ret);
3489         /* Prepopulate the expected PNNs */
3490         for (i = 0; i < talloc_array_length(ret); i++) {
3491                 ret[i].retrieved = false;
3492         }
3493
3494         res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
3495                                         nodes, 0, timeout,
3496                                         false, tdb_null,
3497                                         get_capabilities_callback, NULL,
3498                                         ret);
3499         if (res != 0) {
3500                 DEBUG(DEBUG_ERR,
3501                       (__location__ " Failed to read node capabilities.\n"));
3502                 TALLOC_FREE(ret);
3503         }
3504
3505         return ret;
3506 }
3507
3508 uint32_t *
3509 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
3510                            uint32_t pnn)
3511 {
3512         if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
3513                 return &caps[pnn].capabilities;
3514         }
3515
3516         return NULL;
3517 }
3518
3519 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
3520                                 uint32_t pnn,
3521                                 uint32_t capabilities_required)
3522 {
3523         uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
3524         return (capp != NULL) &&
3525                 ((*capp & capabilities_required) == capabilities_required);
3526 }
3527
3528
3529 static struct ctdb_server_id server_id_fetch(struct ctdb_context *ctdb, uint32_t reqid)
3530 {
3531         struct ctdb_server_id id;
3532
3533         id.pid = getpid();
3534         id.task_id = reqid;
3535         id.vnn = ctdb_get_pnn(ctdb);
3536         id.unique_id = id.vnn;
3537         id.unique_id = (id.unique_id << 32) | reqid;
3538
3539         return id;
3540 }
3541
3542 /* This is basically a copy from Samba's server_id.*.  However, a
3543  * dependency chain stops us from using Samba's version, so use a
3544  * renamed copy until a better solution is found. */
3545 static bool ctdb_server_id_equal(struct ctdb_server_id *id1, struct ctdb_server_id *id2)
3546 {
3547         if (id1->pid != id2->pid) {
3548                 return false;
3549         }
3550
3551         if (id1->task_id != id2->task_id) {
3552                 return false;
3553         }
3554
3555         if (id1->vnn != id2->vnn) {
3556                 return false;
3557         }
3558
3559         if (id1->unique_id != id2->unique_id) {
3560                 return false;
3561         }
3562
3563         return true;
3564 }
3565
3566 static bool server_id_exists(struct ctdb_context *ctdb,
3567                              struct ctdb_server_id *id)
3568 {
3569         int ret;
3570
3571         ret = ctdb_ctrl_process_exists(ctdb, id->vnn, id->pid);
3572         if (ret == 0) {
3573                 return true;
3574         }
3575
3576         return false;
3577 }
3578
3579 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3580                          struct ctdb_g_lock_list **locks)
3581 {
3582         struct ctdb_g_lock_list *recs;
3583
3584         recs = talloc_zero(mem_ctx, struct ctdb_g_lock_list);
3585         if (recs == NULL) {
3586                 return false;
3587         }
3588
3589         if (data.dsize == 0) {
3590                 goto done;
3591         }
3592
3593         if (data.dsize % sizeof(struct ctdb_g_lock) != 0) {
3594                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3595                                   (unsigned long)data.dsize));
3596                 talloc_free(recs);
3597                 return false;
3598         }
3599
3600         recs->num = data.dsize / sizeof(struct ctdb_g_lock);
3601         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3602         if (recs->lock == NULL) {
3603                 talloc_free(recs);
3604                 return false;
3605         }
3606
3607 done:
3608         if (locks != NULL) {
3609                 *locks = recs;
3610         }
3611
3612         return true;
3613 }
3614
3615
3616 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3617                         struct ctdb_db_context *ctdb_db,
3618                         const char *keyname, uint32_t reqid)
3619 {
3620         TDB_DATA key, data;
3621         struct ctdb_record_handle *h;
3622         struct ctdb_g_lock_list *locks;
3623         struct ctdb_server_id id;
3624         struct timeval t_start;
3625         int i;
3626
3627         key.dptr = (uint8_t *)discard_const(keyname);
3628         key.dsize = strlen(keyname) + 1;
3629
3630         t_start = timeval_current();
3631
3632 again:
3633         /* Keep trying for an hour. */
3634         if (timeval_elapsed(&t_start) > 3600) {
3635                 return false;
3636         }
3637
3638         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3639         if (h == NULL) {
3640                 return false;
3641         }
3642
3643         if (!g_lock_parse(h, data, &locks)) {
3644                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3645                 talloc_free(data.dptr);
3646                 talloc_free(h);
3647                 return false;
3648         }
3649
3650         talloc_free(data.dptr);
3651
3652         id = server_id_fetch(ctdb_db->ctdb, reqid);
3653
3654         i = 0;
3655         while (i < locks->num) {
3656                 if (ctdb_server_id_equal(&locks->lock[i].sid, &id)) {
3657                         /* Internal error */
3658                         talloc_free(h);
3659                         return false;
3660                 }
3661
3662                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].sid)) {
3663                         if (i < locks->num-1) {
3664                                 locks->lock[i] = locks->lock[locks->num-1];
3665                         }
3666                         locks->num--;
3667                         continue;
3668                 }
3669
3670                 /* This entry is locked. */
3671                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3672                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3673                                    (unsigned long long)id.pid,
3674                                    id.task_id, id.vnn,
3675                                    (unsigned long long)id.unique_id));
3676                 talloc_free(h);
3677                 goto again;
3678         }
3679
3680         locks->lock = talloc_realloc(locks, locks->lock, struct ctdb_g_lock,
3681                                      locks->num+1);
3682         if (locks->lock == NULL) {
3683                 talloc_free(h);
3684                 return false;
3685         }
3686
3687         locks->lock[locks->num].type = CTDB_G_LOCK_WRITE;
3688         locks->lock[locks->num].sid = id;
3689         locks->num++;
3690
3691         data.dptr = (uint8_t *)locks->lock;
3692         data.dsize = locks->num * sizeof(struct ctdb_g_lock);
3693
3694         if (ctdb_record_store(h, data) != 0) {
3695                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3696                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3697                                   (unsigned long long)id.pid,
3698                                   id.task_id, id.vnn,
3699                                   (unsigned long long)id.unique_id));
3700                 talloc_free(h);
3701                 return false;
3702         }
3703
3704         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3705                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3706                            (unsigned long long)id.pid,
3707                            id.task_id, id.vnn,
3708                            (unsigned long long)id.unique_id));
3709
3710         talloc_free(h);
3711         return true;
3712 }
3713
3714 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3715                           struct ctdb_db_context *ctdb_db,
3716                           const char *keyname, uint32_t reqid)
3717 {
3718         TDB_DATA key, data;
3719         struct ctdb_record_handle *h;
3720         struct ctdb_g_lock_list *locks;
3721         struct ctdb_server_id id;
3722         int i;
3723         bool found = false;
3724
3725         key.dptr = (uint8_t *)discard_const(keyname);
3726         key.dsize = strlen(keyname) + 1;
3727         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3728         if (h == NULL) {
3729                 return false;
3730         }
3731
3732         if (!g_lock_parse(h, data, &locks)) {
3733                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3734                 talloc_free(data.dptr);
3735                 talloc_free(h);
3736                 return false;
3737         }
3738
3739         talloc_free(data.dptr);
3740
3741         id = server_id_fetch(ctdb_db->ctdb, reqid);
3742
3743         for (i=0; i<locks->num; i++) {
3744                 if (ctdb_server_id_equal(&locks->lock[i].sid, &id)) {
3745                         if (i < locks->num-1) {
3746                                 locks->lock[i] = locks->lock[locks->num-1];
3747                         }
3748                         locks->num--;
3749                         found = true;
3750                         break;
3751                 }
3752         }
3753
3754         if (!found) {
3755                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
3756                 talloc_free(h);
3757                 return false;
3758         }
3759
3760         data.dptr = (uint8_t *)locks->lock;
3761         data.dsize = locks->num * sizeof(struct ctdb_g_lock);
3762
3763         if (ctdb_record_store(h, data) != 0) {
3764                 talloc_free(h);
3765                 return false;
3766         }
3767
3768         talloc_free(h);
3769         return true;
3770 }
3771
3772
3773 struct ctdb_transaction_handle {
3774         struct ctdb_db_context *ctdb_db;
3775         struct ctdb_db_context *g_lock_db;
3776         char *lock_name;
3777         uint32_t reqid;
3778         /*
3779          * we store reads and writes done under a transaction:
3780          * - one list stores both reads and writes (m_all)
3781          * - the other just writes (m_write)
3782          */
3783         struct ctdb_marshall_buffer *m_all;
3784         struct ctdb_marshall_buffer *m_write;
3785 };
3786
3787 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3788 {
3789         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
3790         reqid_remove(h->ctdb_db->ctdb->idr, h->reqid);
3791         return 0;
3792 }
3793
3794
3795 /**
3796  * start a transaction on a database
3797  */
3798 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3799                                                        TALLOC_CTX *mem_ctx)
3800 {
3801         struct ctdb_transaction_handle *h;
3802
3803         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3804         if (h == NULL) {
3805                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
3806                 return NULL;
3807         }
3808
3809         h->ctdb_db = ctdb_db;
3810         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
3811                                        (unsigned int)ctdb_db->db_id);
3812         if (h->lock_name == NULL) {
3813                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
3814                 talloc_free(h);
3815                 return NULL;
3816         }
3817
3818         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
3819                                    "g_lock.tdb", 0);
3820         if (!h->g_lock_db) {
3821                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
3822                 talloc_free(h);
3823                 return NULL;
3824         }
3825
3826         h->reqid = reqid_new(h->ctdb_db->ctdb->idr, h);
3827
3828         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
3829                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
3830                 talloc_free(h);
3831                 return NULL;
3832         }
3833
3834         talloc_set_destructor(h, ctdb_transaction_destructor);
3835         return h;
3836 }
3837
3838 /**
3839  * fetch a record inside a transaction
3840  */
3841 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3842                            TALLOC_CTX *mem_ctx,
3843                            TDB_DATA key, TDB_DATA *data)
3844 {
3845         struct ctdb_ltdb_header header;
3846         int ret;
3847
3848         ZERO_STRUCT(header);
3849
3850         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3851         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3852                 /* record doesn't exist yet */
3853                 *data = tdb_null;
3854                 ret = 0;
3855         }
3856
3857         if (ret != 0) {
3858                 return ret;
3859         }
3860
3861         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3862         if (h->m_all == NULL) {
3863                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3864                 return -1;
3865         }
3866
3867         return 0;
3868 }
3869
3870 /**
3871  * stores a record inside a transaction
3872  */
3873 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3874                            TDB_DATA key, TDB_DATA data)
3875 {
3876         TALLOC_CTX *tmp_ctx = talloc_new(h);
3877         struct ctdb_ltdb_header header;
3878         TDB_DATA olddata;
3879         int ret;
3880
3881         /* we need the header so we can update the RSN */
3882         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3883         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3884                 /* the record doesn't exist - create one with us as dmaster.
3885                    This is only safe because we are in a transaction and this
3886                    is a persistent database */
3887                 ZERO_STRUCT(header);
3888         } else if (ret != 0) {
3889                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3890                 talloc_free(tmp_ctx);
3891                 return ret;
3892         }
3893
3894         if (data.dsize == olddata.dsize &&
3895             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
3896             header.rsn != 0) {
3897                 /* save writing the same data */
3898                 talloc_free(tmp_ctx);
3899                 return 0;
3900         }
3901
3902         header.dmaster = h->ctdb_db->ctdb->pnn;
3903         header.rsn++;
3904
3905         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3906         if (h->m_all == NULL) {
3907                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3908                 talloc_free(tmp_ctx);
3909                 return -1;
3910         }
3911
3912         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3913         if (h->m_write == NULL) {
3914                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3915                 talloc_free(tmp_ctx);
3916                 return -1;
3917         }
3918
3919         talloc_free(tmp_ctx);
3920         return 0;
3921 }
3922
3923 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
3924 {
3925         const char *keyname = CTDB_DB_SEQNUM_KEY;
3926         TDB_DATA key, data;
3927         struct ctdb_ltdb_header header;
3928         int ret;
3929
3930         key.dptr = (uint8_t *)discard_const(keyname);
3931         key.dsize = strlen(keyname) + 1;
3932
3933         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
3934         if (ret != 0) {
3935                 *seqnum = 0;
3936                 return 0;
3937         }
3938
3939         if (data.dsize == 0) {
3940                 *seqnum = 0;
3941                 return 0;
3942         }
3943
3944         if (data.dsize != sizeof(*seqnum)) {
3945                 DEBUG(DEBUG_ERR, (__location__ " Invalid data received len=%zi\n",
3946                                   data.dsize));
3947                 talloc_free(data.dptr);
3948                 return -1;
3949         }
3950
3951         *seqnum = *(uint64_t *)data.dptr;
3952         talloc_free(data.dptr);
3953
3954         return 0;
3955 }
3956
3957
3958 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
3959                                 uint64_t seqnum)
3960 {
3961         const char *keyname = CTDB_DB_SEQNUM_KEY;
3962         TDB_DATA key, data;
3963
3964         key.dptr = (uint8_t *)discard_const(keyname);
3965         key.dsize = strlen(keyname) + 1;
3966
3967         data.dptr = (uint8_t *)&seqnum;
3968         data.dsize = sizeof(seqnum);
3969
3970         return ctdb_transaction_store(h, key, data);
3971 }
3972
3973
3974 /**
3975  * commit a transaction
3976  */
3977 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3978 {
3979         int ret;
3980         uint64_t old_seqnum, new_seqnum;
3981         int32_t status;
3982         struct timeval timeout;
3983
3984         if (h->m_write == NULL) {
3985                 /* no changes were made */
3986                 talloc_free(h);
3987                 return 0;
3988         }
3989
3990         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
3991         if (ret != 0) {
3992                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
3993                 ret = -1;
3994                 goto done;
3995         }
3996
3997         new_seqnum = old_seqnum + 1;
3998         ret = ctdb_store_db_seqnum(h, new_seqnum);
3999         if (ret != 0) {
4000                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4001                 ret = -1;
4002                 goto done;
4003         }
4004
4005 again:
4006         timeout = timeval_current_ofs(30,0);
4007         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4008                            h->ctdb_db->db_id,
4009                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4010                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4011                            &status, &timeout, NULL);
4012         if (ret != 0 || status != 0) {
4013                 /*
4014                  * TRANS3_COMMIT control will only fail if recovery has been
4015                  * triggered.  Check if the database has been updated or not.
4016                  */
4017                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4018                 if (ret != 0) {
4019                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4020                         goto done;
4021                 }
4022
4023                 if (new_seqnum == old_seqnum) {
4024                         /* Database not yet updated, try again */
4025                         goto again;
4026                 }
4027
4028                 if (new_seqnum != (old_seqnum + 1)) {
4029                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4030                                           (long long unsigned)new_seqnum,
4031                                           (long long unsigned)old_seqnum));
4032                         ret = -1;
4033                         goto done;
4034                 }
4035         }
4036
4037         ret = 0;
4038
4039 done:
4040         talloc_free(h);
4041         return ret;
4042 }
4043
4044 /**
4045  * cancel a transaction
4046  */
4047 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4048 {
4049         talloc_free(h);
4050         return 0;
4051 }
4052
4053
4054 /*
4055   recovery daemon ping to main daemon
4056  */
4057 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4058 {
4059         int ret;
4060         int32_t res;
4061
4062         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4063                            ctdb, NULL, &res, NULL, NULL);
4064         if (ret != 0 || res != 0) {
4065                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4066                 return -1;
4067         }
4068
4069         return 0;
4070 }
4071
4072 /*
4073   tell the main daemon how long it took to lock the reclock file
4074  */
4075 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4076 {
4077         int ret;
4078         int32_t res;
4079         TDB_DATA data;
4080
4081         data.dptr = (uint8_t *)&latency;
4082         data.dsize = sizeof(latency);
4083
4084         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4085                            ctdb, NULL, &res, NULL, NULL);
4086         if (ret != 0 || res != 0) {
4087                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4088                 return -1;
4089         }
4090
4091         return 0;
4092 }
4093
4094 /*
4095   get the name of the reclock file
4096  */
4097 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4098                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4099                          const char **name)
4100 {
4101         int ret;
4102         int32_t res;
4103         TDB_DATA data;
4104
4105         ret = ctdb_control(ctdb, destnode, 0, 
4106                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4107                            mem_ctx, &data, &res, &timeout, NULL);
4108         if (ret != 0 || res != 0) {
4109                 return -1;
4110         }
4111
4112         if (data.dsize == 0) {
4113                 *name = NULL;
4114         } else {
4115                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4116         }
4117         talloc_free(data.dptr);
4118
4119         return 0;
4120 }
4121
4122 /*
4123   stop a node
4124  */
4125 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4126 {
4127         int ret;
4128         int32_t res;
4129
4130         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4131                            ctdb, NULL, &res, &timeout, NULL);
4132         if (ret != 0 || res != 0) {
4133                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4134                 return -1;
4135         }
4136
4137         return 0;
4138 }
4139
4140 /*
4141   continue a node
4142  */
4143 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4144 {
4145         int ret;
4146
4147         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4148                            ctdb, NULL, NULL, &timeout, NULL);
4149         if (ret != 0) {
4150                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4151                 return -1;
4152         }
4153
4154         return 0;
4155 }
4156
4157 /*
4158   set the lmaster role for a node
4159  */
4160 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4161 {
4162         int ret;
4163         TDB_DATA data;
4164         int32_t res;
4165
4166         data.dsize = sizeof(lmasterrole);
4167         data.dptr  = (uint8_t *)&lmasterrole;
4168
4169         ret = ctdb_control(ctdb, destnode, 0, 
4170                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4171                            NULL, NULL, &res, &timeout, NULL);
4172         if (ret != 0 || res != 0) {
4173                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4174                 return -1;
4175         }
4176
4177         return 0;
4178 }
4179
4180 /*
4181   set the recmaster role for a node
4182  */
4183 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4184 {
4185         int ret;
4186         TDB_DATA data;
4187         int32_t res;
4188
4189         data.dsize = sizeof(recmasterrole);
4190         data.dptr  = (uint8_t *)&recmasterrole;
4191
4192         ret = ctdb_control(ctdb, destnode, 0, 
4193                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4194                            NULL, NULL, &res, &timeout, NULL);
4195         if (ret != 0 || res != 0) {
4196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4197                 return -1;
4198         }
4199
4200         return 0;
4201 }
4202
4203 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout,
4204                       uint32_t destnode, struct ctdb_ban_state *bantime)
4205 {
4206         int ret;
4207         TDB_DATA data;
4208         int32_t res;
4209
4210         data.dsize = sizeof(*bantime);
4211         data.dptr  = (uint8_t *)bantime;
4212
4213         ret = ctdb_control(ctdb, destnode, 0, 
4214                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4215                            NULL, NULL, &res, &timeout, NULL);
4216         if (ret != 0 || res != 0) {
4217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4218                 return -1;
4219         }
4220
4221         return 0;
4222 }
4223
4224
4225 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout,
4226                       uint32_t destnode, TALLOC_CTX *mem_ctx,
4227                       struct ctdb_ban_state **bantime)
4228 {
4229         int ret;
4230         TDB_DATA outdata;
4231         int32_t res;
4232         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4233
4234         ret = ctdb_control(ctdb, destnode, 0, 
4235                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4236                            tmp_ctx, &outdata, &res, &timeout, NULL);
4237         if (ret != 0 || res != 0) {
4238                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4239                 talloc_free(tmp_ctx);
4240                 return -1;
4241         }
4242
4243         *bantime = (struct ctdb_ban_state *)talloc_steal(mem_ctx, outdata.dptr);
4244         talloc_free(tmp_ctx);
4245
4246         return 0;
4247 }
4248
4249 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb,
4250                              struct timeval timeout, uint32_t destnode,
4251                              TALLOC_CTX *mem_ctx,
4252                              struct ctdb_statistics_list_old **stats)
4253 {
4254         int ret;
4255         TDB_DATA outdata;
4256         int32_t res;
4257
4258         ret = ctdb_control(ctdb, destnode, 0, 
4259                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4260                            mem_ctx, &outdata, &res, &timeout, NULL);
4261         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4262                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4263                 return -1;
4264         }
4265
4266         *stats = (struct ctdb_statistics_list_old *)talloc_memdup(mem_ctx,
4267                                                                   outdata.dptr,
4268                                                                   outdata.dsize);
4269         talloc_free(outdata.dptr);
4270
4271         return 0;
4272 }
4273
4274 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4275 {
4276         if (h == NULL) {
4277                 return NULL;
4278         }
4279
4280         return &h->header;
4281 }
4282
4283
4284 struct ctdb_client_control_state *
4285 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4286 {
4287         struct ctdb_client_control_state *handle;
4288         struct ctdb_marshall_buffer *m;
4289         struct ctdb_rec_data_old *rec;
4290         TDB_DATA outdata;
4291
4292         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4293         if (m == NULL) {
4294                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4295                 return NULL;
4296         }
4297
4298         m->db_id = ctdb_db->db_id;
4299
4300         rec = ctdb_marshall_record(m, 0, key, header, data);
4301         if (rec == NULL) {
4302                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4303                 talloc_free(m);
4304                 return NULL;
4305         }
4306         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4307         if (m == NULL) {
4308                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4309                 talloc_free(m);
4310                 return NULL;
4311         }
4312         m->count++;
4313         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4314
4315
4316         outdata.dptr = (uint8_t *)m;
4317         outdata.dsize = talloc_get_size(m);
4318
4319         handle = ctdb_control_send(ctdb, destnode, 0, 
4320                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4321                            mem_ctx, &timeout, NULL);
4322         talloc_free(m);
4323         return handle;
4324 }
4325
4326 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4327 {
4328         int ret;
4329         int32_t res;
4330
4331         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4332         if ( (ret != 0) || (res != 0) ){
4333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4334                 return -1;
4335         }
4336
4337         return 0;
4338 }
4339
4340 int
4341 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4342 {
4343         struct ctdb_client_control_state *state;
4344
4345         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4346         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4347 }
4348
4349
4350
4351
4352
4353
4354 /*
4355   set a database to be readonly
4356  */
4357 struct ctdb_client_control_state *
4358 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4359 {
4360         TDB_DATA data;
4361
4362         data.dptr = (uint8_t *)&dbid;
4363         data.dsize = sizeof(dbid);
4364
4365         return ctdb_control_send(ctdb, destnode, 0, 
4366                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4367                            ctdb, NULL, NULL);
4368 }
4369
4370 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4371 {
4372         int ret;
4373         int32_t res;
4374
4375         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4376         if (ret != 0 || res != 0) {
4377           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4378                 return -1;
4379         }
4380
4381         return 0;
4382 }
4383
4384 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4385 {
4386         struct ctdb_client_control_state *state;
4387
4388         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4389         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4390 }
4391
4392 /*
4393   set a database to be sticky
4394  */
4395 struct ctdb_client_control_state *
4396 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4397 {
4398         TDB_DATA data;
4399
4400         data.dptr = (uint8_t *)&dbid;
4401         data.dsize = sizeof(dbid);
4402
4403         return ctdb_control_send(ctdb, destnode, 0, 
4404                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4405                            ctdb, NULL, NULL);
4406 }
4407
4408 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4409 {
4410         int ret;
4411         int32_t res;
4412
4413         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4414         if (ret != 0 || res != 0) {
4415                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4416                 return -1;
4417         }
4418
4419         return 0;
4420 }
4421
4422 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4423 {
4424         struct ctdb_client_control_state *state;
4425
4426         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4427         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4428 }