ctdb: pass TDB_DISALLOW_NESTING to all tdb_open/tdb_wrap_open calls
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
279                 return -1;
280         }
281
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb);
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return 0;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
372
373         return state;
374 }
375
376 /*
377   make a ctdb call to the local daemon - async send. Called from client context.
378
379   This constructs a ctdb_call request and queues it for processing. 
380   This call never blocks.
381 */
382 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
383                                               struct ctdb_call *call)
384 {
385         struct ctdb_client_call_state *state;
386         struct ctdb_context *ctdb = ctdb_db->ctdb;
387         struct ctdb_ltdb_header header;
388         TDB_DATA data;
389         int ret;
390         size_t len;
391         struct ctdb_req_call *c;
392
393         /* if the domain socket is not yet open, open it */
394         if (ctdb->daemon.sd==-1) {
395                 ctdb_socket_connect(ctdb);
396         }
397
398         ret = ctdb_ltdb_lock(ctdb_db, call->key);
399         if (ret != 0) {
400                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
401                 return NULL;
402         }
403
404         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
405
406         if (ret == 0 && header.dmaster == ctdb->pnn) {
407                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
408                 talloc_free(data.dptr);
409                 ctdb_ltdb_unlock(ctdb_db, call->key);
410                 return state;
411         }
412
413         ctdb_ltdb_unlock(ctdb_db, call->key);
414         talloc_free(data.dptr);
415
416         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
417         if (state == NULL) {
418                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
419                 return NULL;
420         }
421         state->call = talloc_zero(state, struct ctdb_call);
422         if (state->call == NULL) {
423                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
424                 return NULL;
425         }
426
427         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
428         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
429         if (c == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
431                 return NULL;
432         }
433
434         state->reqid     = ctdb_reqid_new(ctdb, state);
435         state->ctdb_db = ctdb_db;
436         talloc_set_destructor(state, ctdb_client_call_destructor);
437
438         c->hdr.reqid     = state->reqid;
439         c->flags         = call->flags;
440         c->db_id         = ctdb_db->db_id;
441         c->callid        = call->call_id;
442         c->hopcount      = 0;
443         c->keylen        = call->key.dsize;
444         c->calldatalen   = call->call_data.dsize;
445         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
446         memcpy(&c->data[call->key.dsize], 
447                call->call_data.dptr, call->call_data.dsize);
448         *(state->call)              = *call;
449         state->call->call_data.dptr = &c->data[call->key.dsize];
450         state->call->key.dptr       = &c->data[0];
451
452         state->state  = CTDB_CALL_WAIT;
453
454
455         ctdb_client_queue_pkt(ctdb, &c->hdr);
456
457         return state;
458 }
459
460
461 /*
462   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
463 */
464 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
465 {
466         struct ctdb_client_call_state *state;
467
468         state = ctdb_call_send(ctdb_db, call);
469         return ctdb_call_recv(state, call);
470 }
471
472
473 /*
474   tell the daemon what messaging srvid we will use, and register the message
475   handler function in the client
476 */
477 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
478                              ctdb_message_fn_t handler,
479                              void *private_data)
480                                     
481 {
482         int res;
483         int32_t status;
484         
485         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
486                            tdb_null, NULL, NULL, &status, NULL, NULL);
487         if (res != 0 || status != 0) {
488                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
489                 return -1;
490         }
491
492         /* also need to register the handler with our own ctdb structure */
493         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
494 }
495
496 /*
497   tell the daemon we no longer want a srvid
498 */
499 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         ctdb_deregister_message_handler(ctdb, srvid, private_data);
513         return 0;
514 }
515
516
517 /*
518   send a message - from client context
519  */
520 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
521                       uint64_t srvid, TDB_DATA data)
522 {
523         struct ctdb_req_message *r;
524         int len, res;
525
526         len = offsetof(struct ctdb_req_message, data) + data.dsize;
527         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
528                                len, struct ctdb_req_message);
529         CTDB_NO_MEMORY(ctdb, r);
530
531         r->hdr.destnode  = pnn;
532         r->srvid         = srvid;
533         r->datalen       = data.dsize;
534         memcpy(&r->data[0], data.dptr, data.dsize);
535         
536         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
537         if (res != 0) {
538                 return res;
539         }
540
541         talloc_free(r);
542         return 0;
543 }
544
545
546 /*
547   cancel a ctdb_fetch_lock operation, releasing the lock
548  */
549 static int fetch_lock_destructor(struct ctdb_record_handle *h)
550 {
551         ctdb_ltdb_unlock(h->ctdb_db, h->key);
552         return 0;
553 }
554
555 /*
556   force the migration of a record to this node
557  */
558 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
559 {
560         struct ctdb_call call;
561         ZERO_STRUCT(call);
562         call.call_id = CTDB_NULL_FUNC;
563         call.key = key;
564         call.flags = CTDB_IMMEDIATE_MIGRATION;
565         return ctdb_call(ctdb_db, &call);
566 }
567
568 /*
569   get a lock on a record, and return the records data. Blocks until it gets the lock
570  */
571 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
572                                            TDB_DATA key, TDB_DATA *data)
573 {
574         int ret;
575         struct ctdb_record_handle *h;
576
577         /*
578           procedure is as follows:
579
580           1) get the chain lock. 
581           2) check if we are dmaster
582           3) if we are the dmaster then return handle 
583           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
584              reply from ctdbd
585           5) when we get the reply, goto (1)
586          */
587
588         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
589         if (h == NULL) {
590                 return NULL;
591         }
592
593         h->ctdb_db = ctdb_db;
594         h->key     = key;
595         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
596         if (h->key.dptr == NULL) {
597                 talloc_free(h);
598                 return NULL;
599         }
600         h->data    = data;
601
602         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
603                  (const char *)key.dptr));
604
605 again:
606         /* step 1 - get the chain lock */
607         ret = ctdb_ltdb_lock(ctdb_db, key);
608         if (ret != 0) {
609                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
610                 talloc_free(h);
611                 return NULL;
612         }
613
614         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
615
616         talloc_set_destructor(h, fetch_lock_destructor);
617
618         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
619
620         /* when torturing, ensure we test the remote path */
621         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
622             random() % 5 == 0) {
623                 h->header.dmaster = (uint32_t)-1;
624         }
625
626
627         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
628
629         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
630                 ctdb_ltdb_unlock(ctdb_db, key);
631                 ret = ctdb_client_force_migration(ctdb_db, key);
632                 if (ret != 0) {
633                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
634                         talloc_free(h);
635                         return NULL;
636                 }
637                 goto again;
638         }
639
640         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
641         return h;
642 }
643
644 /*
645   store some data to the record that was locked with ctdb_fetch_lock()
646 */
647 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
648 {
649         if (h->ctdb_db->persistent) {
650                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
651                 return -1;
652         }
653
654         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
655 }
656
657 /*
658   non-locking fetch of a record
659  */
660 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
661                TDB_DATA key, TDB_DATA *data)
662 {
663         struct ctdb_call call;
664         int ret;
665
666         call.call_id = CTDB_FETCH_FUNC;
667         call.call_data.dptr = NULL;
668         call.call_data.dsize = 0;
669
670         ret = ctdb_call(ctdb_db, &call);
671
672         if (ret == 0) {
673                 *data = call.reply_data;
674                 talloc_steal(mem_ctx, data->dptr);
675         }
676
677         return ret;
678 }
679
680
681
682 /*
683    called when a control completes or timesout to invoke the callback
684    function the user provided
685 */
686 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
687         struct timeval t, void *private_data)
688 {
689         struct ctdb_client_control_state *state;
690         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
691         int ret;
692
693         state = talloc_get_type(private_data, struct ctdb_client_control_state);
694         talloc_steal(tmp_ctx, state);
695
696         ret = ctdb_control_recv(state->ctdb, state, state,
697                         NULL, 
698                         NULL, 
699                         NULL);
700
701         talloc_free(tmp_ctx);
702 }
703
704 /*
705   called when a CTDB_REPLY_CONTROL packet comes in in the client
706
707   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
708   contains any reply data from the control
709 */
710 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
711                                       struct ctdb_req_header *hdr)
712 {
713         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
714         struct ctdb_client_control_state *state;
715
716         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
717         if (state == NULL) {
718                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
719                 return;
720         }
721
722         if (hdr->reqid != state->reqid) {
723                 /* we found a record  but it was the wrong one */
724                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
725                 return;
726         }
727
728         state->outdata.dptr = c->data;
729         state->outdata.dsize = c->datalen;
730         state->status = c->status;
731         if (c->errorlen) {
732                 state->errormsg = talloc_strndup(state, 
733                                                  (char *)&c->data[c->datalen], 
734                                                  c->errorlen);
735         }
736
737         /* state->outdata now uses resources from c so we dont want c
738            to just dissappear from under us while state is still alive
739         */
740         talloc_steal(state, c);
741
742         state->state = CTDB_CONTROL_DONE;
743
744         /* if we had a callback registered for this control, pull the response
745            and call the callback.
746         */
747         if (state->async.fn) {
748                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
749         }
750 }
751
752
753 /*
754   destroy a ctdb_control in client
755 */
756 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
757 {
758         ctdb_reqid_remove(state->ctdb, state->reqid);
759         return 0;
760 }
761
762
763 /* time out handler for ctdb_control */
764 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
765         struct timeval t, void *private_data)
766 {
767         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
768
769         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
770                          "dstnode:%u\n", state->reqid, state->c->opcode,
771                          state->c->hdr.destnode));
772
773         state->state = CTDB_CONTROL_TIMEOUT;
774
775         /* if we had a callback registered for this control, pull the response
776            and call the callback.
777         */
778         if (state->async.fn) {
779                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
780         }
781 }
782
783 /* async version of send control request */
784 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
785                 uint32_t destnode, uint64_t srvid, 
786                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
787                 TALLOC_CTX *mem_ctx,
788                 struct timeval *timeout,
789                 char **errormsg)
790 {
791         struct ctdb_client_control_state *state;
792         size_t len;
793         struct ctdb_req_control *c;
794         int ret;
795
796         if (errormsg) {
797                 *errormsg = NULL;
798         }
799
800         /* if the domain socket is not yet open, open it */
801         if (ctdb->daemon.sd==-1) {
802                 ctdb_socket_connect(ctdb);
803         }
804
805         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
806         CTDB_NO_MEMORY_NULL(ctdb, state);
807
808         state->ctdb       = ctdb;
809         state->reqid      = ctdb_reqid_new(ctdb, state);
810         state->state      = CTDB_CONTROL_WAIT;
811         state->errormsg   = NULL;
812
813         talloc_set_destructor(state, ctdb_control_destructor);
814
815         len = offsetof(struct ctdb_req_control, data) + data.dsize;
816         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
817                                len, struct ctdb_req_control);
818         state->c            = c;        
819         CTDB_NO_MEMORY_NULL(ctdb, c);
820         c->hdr.reqid        = state->reqid;
821         c->hdr.destnode     = destnode;
822         c->opcode           = opcode;
823         c->client_id        = 0;
824         c->flags            = flags;
825         c->srvid            = srvid;
826         c->datalen          = data.dsize;
827         if (data.dsize) {
828                 memcpy(&c->data[0], data.dptr, data.dsize);
829         }
830
831         /* timeout */
832         if (timeout && !timeval_is_zero(timeout)) {
833                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
834         }
835
836         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
837         if (ret != 0) {
838                 talloc_free(state);
839                 return NULL;
840         }
841
842         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
843                 talloc_free(state);
844                 return NULL;
845         }
846
847         return state;
848 }
849
850
851 /* async version of receive control reply */
852 int ctdb_control_recv(struct ctdb_context *ctdb, 
853                 struct ctdb_client_control_state *state, 
854                 TALLOC_CTX *mem_ctx,
855                 TDB_DATA *outdata, int32_t *status, char **errormsg)
856 {
857         TALLOC_CTX *tmp_ctx;
858
859         if (status != NULL) {
860                 *status = -1;
861         }
862         if (errormsg != NULL) {
863                 *errormsg = NULL;
864         }
865
866         if (state == NULL) {
867                 return -1;
868         }
869
870         /* prevent double free of state */
871         tmp_ctx = talloc_new(ctdb);
872         talloc_steal(tmp_ctx, state);
873
874         /* loop one event at a time until we either timeout or the control
875            completes.
876         */
877         while (state->state == CTDB_CONTROL_WAIT) {
878                 event_loop_once(ctdb->ev);
879         }
880
881         if (state->state != CTDB_CONTROL_DONE) {
882                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
883                 if (state->async.fn) {
884                         state->async.fn(state);
885                 }
886                 talloc_free(tmp_ctx);
887                 return -1;
888         }
889
890         if (state->errormsg) {
891                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
892                 if (errormsg) {
893                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
894                 }
895                 if (state->async.fn) {
896                         state->async.fn(state);
897                 }
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (outdata) {
903                 *outdata = state->outdata;
904                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
905         }
906
907         if (status) {
908                 *status = state->status;
909         }
910
911         if (state->async.fn) {
912                 state->async.fn(state);
913         }
914
915         talloc_free(tmp_ctx);
916         return 0;
917 }
918
919
920
921 /*
922   send a ctdb control message
923   timeout specifies how long we should wait for a reply.
924   if timeout is NULL we wait indefinitely
925  */
926 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
927                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
928                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
929                  struct timeval *timeout,
930                  char **errormsg)
931 {
932         struct ctdb_client_control_state *state;
933
934         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
935                         flags, data, mem_ctx,
936                         timeout, errormsg);
937         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
938                         errormsg);
939 }
940
941
942
943
944 /*
945   a process exists call. Returns 0 if process exists, -1 otherwise
946  */
947 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
948 {
949         int ret;
950         TDB_DATA data;
951         int32_t status;
952
953         data.dptr = (uint8_t*)&pid;
954         data.dsize = sizeof(pid);
955
956         ret = ctdb_control(ctdb, destnode, 0, 
957                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
958                            NULL, NULL, &status, NULL, NULL);
959         if (ret != 0) {
960                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
961                 return -1;
962         }
963
964         return status;
965 }
966
967 /*
968   get remote statistics
969  */
970 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
971 {
972         int ret;
973         TDB_DATA data;
974         int32_t res;
975
976         ret = ctdb_control(ctdb, destnode, 0, 
977                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
978                            ctdb, &data, &res, NULL, NULL);
979         if (ret != 0 || res != 0) {
980                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
981                 return -1;
982         }
983
984         if (data.dsize != sizeof(struct ctdb_statistics)) {
985                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
986                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
987                       return -1;
988         }
989
990         *status = *(struct ctdb_statistics *)data.dptr;
991         talloc_free(data.dptr);
992                         
993         return 0;
994 }
995
996 /*
997   shutdown a remote ctdb node
998  */
999 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1000 {
1001         struct ctdb_client_control_state *state;
1002
1003         state = ctdb_control_send(ctdb, destnode, 0, 
1004                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1005                            NULL, &timeout, NULL);
1006         if (state == NULL) {
1007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1008                 return -1;
1009         }
1010
1011         return 0;
1012 }
1013
1014 /*
1015   get vnn map from a remote node
1016  */
1017 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1018 {
1019         int ret;
1020         TDB_DATA outdata;
1021         int32_t res;
1022         struct ctdb_vnn_map_wire *map;
1023
1024         ret = ctdb_control(ctdb, destnode, 0, 
1025                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1026                            mem_ctx, &outdata, &res, &timeout, NULL);
1027         if (ret != 0 || res != 0) {
1028                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1029                 return -1;
1030         }
1031         
1032         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1033         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1034             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1035                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1036                 return -1;
1037         }
1038
1039         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1040         CTDB_NO_MEMORY(ctdb, *vnnmap);
1041         (*vnnmap)->generation = map->generation;
1042         (*vnnmap)->size       = map->size;
1043         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1044
1045         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1046         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1047         talloc_free(outdata.dptr);
1048                     
1049         return 0;
1050 }
1051
1052
1053 /*
1054   get the recovery mode of a remote node
1055  */
1056 struct ctdb_client_control_state *
1057 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1058 {
1059         return ctdb_control_send(ctdb, destnode, 0, 
1060                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1061                            mem_ctx, &timeout, NULL);
1062 }
1063
1064 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1065 {
1066         int ret;
1067         int32_t res;
1068
1069         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1070         if (ret != 0) {
1071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1072                 return -1;
1073         }
1074
1075         if (recmode) {
1076                 *recmode = (uint32_t)res;
1077         }
1078
1079         return 0;
1080 }
1081
1082 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1083 {
1084         struct ctdb_client_control_state *state;
1085
1086         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1087         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1088 }
1089
1090
1091
1092
1093 /*
1094   set the recovery mode of a remote node
1095  */
1096 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1097 {
1098         int ret;
1099         TDB_DATA data;
1100         int32_t res;
1101
1102         data.dsize = sizeof(uint32_t);
1103         data.dptr = (unsigned char *)&recmode;
1104
1105         ret = ctdb_control(ctdb, destnode, 0, 
1106                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1107                            NULL, NULL, &res, &timeout, NULL);
1108         if (ret != 0 || res != 0) {
1109                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1110                 return -1;
1111         }
1112
1113         return 0;
1114 }
1115
1116
1117
1118 /*
1119   get the recovery master of a remote node
1120  */
1121 struct ctdb_client_control_state *
1122 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1123                         struct timeval timeout, uint32_t destnode)
1124 {
1125         return ctdb_control_send(ctdb, destnode, 0, 
1126                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1127                            mem_ctx, &timeout, NULL);
1128 }
1129
1130 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1131 {
1132         int ret;
1133         int32_t res;
1134
1135         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1136         if (ret != 0) {
1137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1138                 return -1;
1139         }
1140
1141         if (recmaster) {
1142                 *recmaster = (uint32_t)res;
1143         }
1144
1145         return 0;
1146 }
1147
1148 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1149 {
1150         struct ctdb_client_control_state *state;
1151
1152         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1153         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1154 }
1155
1156
1157 /*
1158   set the recovery master of a remote node
1159  */
1160 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1161 {
1162         int ret;
1163         TDB_DATA data;
1164         int32_t res;
1165
1166         ZERO_STRUCT(data);
1167         data.dsize = sizeof(uint32_t);
1168         data.dptr = (unsigned char *)&recmaster;
1169
1170         ret = ctdb_control(ctdb, destnode, 0, 
1171                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1172                            NULL, NULL, &res, &timeout, NULL);
1173         if (ret != 0 || res != 0) {
1174                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1175                 return -1;
1176         }
1177
1178         return 0;
1179 }
1180
1181
1182 /*
1183   get a list of databases off a remote node
1184  */
1185 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1186                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1187 {
1188         int ret;
1189         TDB_DATA outdata;
1190         int32_t res;
1191
1192         ret = ctdb_control(ctdb, destnode, 0, 
1193                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1194                            mem_ctx, &outdata, &res, &timeout, NULL);
1195         if (ret != 0 || res != 0) {
1196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1197                 return -1;
1198         }
1199
1200         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1201         talloc_free(outdata.dptr);
1202                     
1203         return 0;
1204 }
1205
1206 /*
1207   get a list of nodes (vnn and flags ) from a remote node
1208  */
1209 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1210                 struct timeval timeout, uint32_t destnode, 
1211                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1212 {
1213         int ret;
1214         TDB_DATA outdata;
1215         int32_t res;
1216
1217         ret = ctdb_control(ctdb, destnode, 0, 
1218                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1219                            mem_ctx, &outdata, &res, &timeout, NULL);
1220         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1222                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1223         }
1224         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1226                 return -1;
1227         }
1228
1229         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1230         talloc_free(outdata.dptr);
1231                     
1232         return 0;
1233 }
1234
1235 /*
1236   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1237  */
1238 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1239                 struct timeval timeout, uint32_t destnode, 
1240                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1241 {
1242         int ret, i, len;
1243         TDB_DATA outdata;
1244         struct ctdb_node_mapv4 *nodemapv4;
1245         int32_t res;
1246
1247         ret = ctdb_control(ctdb, destnode, 0, 
1248                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1249                            mem_ctx, &outdata, &res, &timeout, NULL);
1250         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1251                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1252                 return -1;
1253         }
1254
1255         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1256
1257         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1258         (*nodemap) = talloc_zero_size(mem_ctx, len);
1259         CTDB_NO_MEMORY(ctdb, (*nodemap));
1260
1261         (*nodemap)->num = nodemapv4->num;
1262         for (i=0; i<nodemapv4->num; i++) {
1263                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1264                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1265                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1266                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1267         }
1268                 
1269         talloc_free(outdata.dptr);
1270                     
1271         return 0;
1272 }
1273
1274 /*
1275   drop the transport, reload the nodes file and restart the transport
1276  */
1277 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1278                     struct timeval timeout, uint32_t destnode)
1279 {
1280         int ret;
1281         int32_t res;
1282
1283         ret = ctdb_control(ctdb, destnode, 0, 
1284                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1285                            NULL, NULL, &res, &timeout, NULL);
1286         if (ret != 0 || res != 0) {
1287                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1288                 return -1;
1289         }
1290
1291         return 0;
1292 }
1293
1294
1295 /*
1296   set vnn map on a node
1297  */
1298 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1299                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1300 {
1301         int ret;
1302         TDB_DATA data;
1303         int32_t res;
1304         struct ctdb_vnn_map_wire *map;
1305         size_t len;
1306
1307         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1308         map = talloc_size(mem_ctx, len);
1309         CTDB_NO_MEMORY(ctdb, map);
1310
1311         map->generation = vnnmap->generation;
1312         map->size = vnnmap->size;
1313         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1314         
1315         data.dsize = len;
1316         data.dptr  = (uint8_t *)map;
1317
1318         ret = ctdb_control(ctdb, destnode, 0, 
1319                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1320                            NULL, NULL, &res, &timeout, NULL);
1321         if (ret != 0 || res != 0) {
1322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1323                 return -1;
1324         }
1325
1326         talloc_free(map);
1327
1328         return 0;
1329 }
1330
1331
1332 /*
1333   async send for pull database
1334  */
1335 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1336         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1337         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1338 {
1339         TDB_DATA indata;
1340         struct ctdb_control_pulldb *pull;
1341         struct ctdb_client_control_state *state;
1342
1343         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1344         CTDB_NO_MEMORY_NULL(ctdb, pull);
1345
1346         pull->db_id   = dbid;
1347         pull->lmaster = lmaster;
1348
1349         indata.dsize = sizeof(struct ctdb_control_pulldb);
1350         indata.dptr  = (unsigned char *)pull;
1351
1352         state = ctdb_control_send(ctdb, destnode, 0, 
1353                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1354                                   mem_ctx, &timeout, NULL);
1355         talloc_free(pull);
1356
1357         return state;
1358 }
1359
1360 /*
1361   async recv for pull database
1362  */
1363 int ctdb_ctrl_pulldb_recv(
1364         struct ctdb_context *ctdb, 
1365         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1366         TDB_DATA *outdata)
1367 {
1368         int ret;
1369         int32_t res;
1370
1371         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1372         if ( (ret != 0) || (res != 0) ){
1373                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1374                 return -1;
1375         }
1376
1377         return 0;
1378 }
1379
1380 /*
1381   pull all keys and records for a specific database on a node
1382  */
1383 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1384                 uint32_t dbid, uint32_t lmaster, 
1385                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1386                 TDB_DATA *outdata)
1387 {
1388         struct ctdb_client_control_state *state;
1389
1390         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1391                                       timeout);
1392         
1393         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1394 }
1395
1396
1397 /*
1398   change dmaster for all keys in the database to the new value
1399  */
1400 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1401                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1402 {
1403         int ret;
1404         TDB_DATA indata;
1405         int32_t res;
1406
1407         indata.dsize = 2*sizeof(uint32_t);
1408         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1409
1410         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1411         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424 /*
1425   ping a node, return number of clients connected
1426  */
1427 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1428 {
1429         int ret;
1430         int32_t res;
1431
1432         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1433                            tdb_null, NULL, NULL, &res, NULL, NULL);
1434         if (ret != 0) {
1435                 return -1;
1436         }
1437         return res;
1438 }
1439
1440 /*
1441   find the real path to a ltdb 
1442  */
1443 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1444                    const char **path)
1445 {
1446         int ret;
1447         int32_t res;
1448         TDB_DATA data;
1449
1450         data.dptr = (uint8_t *)&dbid;
1451         data.dsize = sizeof(dbid);
1452
1453         ret = ctdb_control(ctdb, destnode, 0, 
1454                            CTDB_CONTROL_GETDBPATH, 0, data, 
1455                            mem_ctx, &data, &res, &timeout, NULL);
1456         if (ret != 0 || res != 0) {
1457                 return -1;
1458         }
1459
1460         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1461         if ((*path) == NULL) {
1462                 return -1;
1463         }
1464
1465         talloc_free(data.dptr);
1466
1467         return 0;
1468 }
1469
1470 /*
1471   find the name of a db 
1472  */
1473 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1474                    const char **name)
1475 {
1476         int ret;
1477         int32_t res;
1478         TDB_DATA data;
1479
1480         data.dptr = (uint8_t *)&dbid;
1481         data.dsize = sizeof(dbid);
1482
1483         ret = ctdb_control(ctdb, destnode, 0, 
1484                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1485                            mem_ctx, &data, &res, &timeout, NULL);
1486         if (ret != 0 || res != 0) {
1487                 return -1;
1488         }
1489
1490         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1491         if ((*name) == NULL) {
1492                 return -1;
1493         }
1494
1495         talloc_free(data.dptr);
1496
1497         return 0;
1498 }
1499
1500 /*
1501   create a database
1502  */
1503 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1504                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1505 {
1506         int ret;
1507         int32_t res;
1508         TDB_DATA data;
1509
1510         data.dptr = discard_const(name);
1511         data.dsize = strlen(name)+1;
1512
1513         ret = ctdb_control(ctdb, destnode, 0, 
1514                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1515                            0, data, 
1516                            mem_ctx, &data, &res, &timeout, NULL);
1517
1518         if (ret != 0 || res != 0) {
1519                 return -1;
1520         }
1521
1522         return 0;
1523 }
1524
1525 /*
1526   get debug level on a node
1527  */
1528 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1529 {
1530         int ret;
1531         int32_t res;
1532         TDB_DATA data;
1533
1534         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1535                            ctdb, &data, &res, NULL, NULL);
1536         if (ret != 0 || res != 0) {
1537                 return -1;
1538         }
1539         if (data.dsize != sizeof(int32_t)) {
1540                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1541                          (unsigned)data.dsize));
1542                 return -1;
1543         }
1544         *level = *(int32_t *)data.dptr;
1545         talloc_free(data.dptr);
1546         return 0;
1547 }
1548
1549 /*
1550   set debug level on a node
1551  */
1552 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1553 {
1554         int ret;
1555         int32_t res;
1556         TDB_DATA data;
1557
1558         data.dptr = (uint8_t *)&level;
1559         data.dsize = sizeof(level);
1560
1561         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1562                            NULL, NULL, &res, NULL, NULL);
1563         if (ret != 0 || res != 0) {
1564                 return -1;
1565         }
1566         return 0;
1567 }
1568
1569
1570 /*
1571   get a list of connected nodes
1572  */
1573 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1574                                 struct timeval timeout,
1575                                 TALLOC_CTX *mem_ctx,
1576                                 uint32_t *num_nodes)
1577 {
1578         struct ctdb_node_map *map=NULL;
1579         int ret, i;
1580         uint32_t *nodes;
1581
1582         *num_nodes = 0;
1583
1584         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1585         if (ret != 0) {
1586                 return NULL;
1587         }
1588
1589         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1590         if (nodes == NULL) {
1591                 return NULL;
1592         }
1593
1594         for (i=0;i<map->num;i++) {
1595                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1596                         nodes[*num_nodes] = map->nodes[i].pnn;
1597                         (*num_nodes)++;
1598                 }
1599         }
1600
1601         return nodes;
1602 }
1603
1604
1605 /*
1606   reset remote status
1607  */
1608 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1609 {
1610         int ret;
1611         int32_t res;
1612
1613         ret = ctdb_control(ctdb, destnode, 0, 
1614                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1615                            NULL, NULL, &res, NULL, NULL);
1616         if (ret != 0 || res != 0) {
1617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1618                 return -1;
1619         }
1620         return 0;
1621 }
1622
1623 /*
1624   this is the dummy null procedure that all databases support
1625 */
1626 static int ctdb_null_func(struct ctdb_call_info *call)
1627 {
1628         return 0;
1629 }
1630
1631 /*
1632   this is a plain fetch procedure that all databases support
1633 */
1634 static int ctdb_fetch_func(struct ctdb_call_info *call)
1635 {
1636         call->reply_data = &call->record_data;
1637         return 0;
1638 }
1639
1640 /*
1641   attach to a specific database - client call
1642 */
1643 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1644 {
1645         struct ctdb_db_context *ctdb_db;
1646         TDB_DATA data;
1647         int ret;
1648         int32_t res;
1649
1650         ctdb_db = ctdb_db_handle(ctdb, name);
1651         if (ctdb_db) {
1652                 return ctdb_db;
1653         }
1654
1655         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1656         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1657
1658         ctdb_db->ctdb = ctdb;
1659         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1660         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1661
1662         data.dptr = discard_const(name);
1663         data.dsize = strlen(name)+1;
1664
1665         /* tell ctdb daemon to attach */
1666         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1667                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1668                            0, data, ctdb_db, &data, &res, NULL, NULL);
1669         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1670                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1671                 talloc_free(ctdb_db);
1672                 return NULL;
1673         }
1674         
1675         ctdb_db->db_id = *(uint32_t *)data.dptr;
1676         talloc_free(data.dptr);
1677
1678         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1679         if (ret != 0) {
1680                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1681                 talloc_free(ctdb_db);
1682                 return NULL;
1683         }
1684
1685         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1686         if (!ctdb->do_setsched) {
1687                 tdb_flags |= TDB_NOMMAP;
1688         }
1689         tdb_flags |= TDB_DISALLOW_NESTING;
1690
1691         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1692         if (ctdb_db->ltdb == NULL) {
1693                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1694                 talloc_free(ctdb_db);
1695                 return NULL;
1696         }
1697
1698         ctdb_db->persistent = persistent;
1699
1700         DLIST_ADD(ctdb->db_list, ctdb_db);
1701
1702         /* add well known functions */
1703         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1704         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1705
1706         return ctdb_db;
1707 }
1708
1709
1710 /*
1711   setup a call for a database
1712  */
1713 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1714 {
1715         struct ctdb_registered_call *call;
1716
1717 #if 0
1718         TDB_DATA data;
1719         int32_t status;
1720         struct ctdb_control_set_call c;
1721         int ret;
1722
1723         /* this is no longer valid with the separate daemon architecture */
1724         c.db_id = ctdb_db->db_id;
1725         c.fn    = fn;
1726         c.id    = id;
1727
1728         data.dptr = (uint8_t *)&c;
1729         data.dsize = sizeof(c);
1730
1731         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1732                            data, NULL, NULL, &status, NULL, NULL);
1733         if (ret != 0 || status != 0) {
1734                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1735                 return -1;
1736         }
1737 #endif
1738
1739         /* also register locally */
1740         call = talloc(ctdb_db, struct ctdb_registered_call);
1741         call->fn = fn;
1742         call->id = id;
1743
1744         DLIST_ADD(ctdb_db->calls, call);        
1745         return 0;
1746 }
1747
1748
1749 struct traverse_state {
1750         bool done;
1751         uint32_t count;
1752         ctdb_traverse_func fn;
1753         void *private_data;
1754 };
1755
1756 /*
1757   called on each key during a ctdb_traverse
1758  */
1759 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1760 {
1761         struct traverse_state *state = (struct traverse_state *)p;
1762         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1763         TDB_DATA key;
1764
1765         if (data.dsize < sizeof(uint32_t) ||
1766             d->length != data.dsize) {
1767                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1768                 state->done = True;
1769                 return;
1770         }
1771
1772         key.dsize = d->keylen;
1773         key.dptr  = &d->data[0];
1774         data.dsize = d->datalen;
1775         data.dptr = &d->data[d->keylen];
1776
1777         if (key.dsize == 0 && data.dsize == 0) {
1778                 /* end of traverse */
1779                 state->done = True;
1780                 return;
1781         }
1782
1783         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1784                 /* empty records are deleted records in ctdb */
1785                 return;
1786         }
1787
1788         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1789                 state->done = True;
1790         }
1791
1792         state->count++;
1793 }
1794
1795
1796 /*
1797   start a cluster wide traverse, calling the supplied fn on each record
1798   return the number of records traversed, or -1 on error
1799  */
1800 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1801 {
1802         TDB_DATA data;
1803         struct ctdb_traverse_start t;
1804         int32_t status;
1805         int ret;
1806         uint64_t srvid = (getpid() | 0xFLL<<60);
1807         struct traverse_state state;
1808
1809         state.done = False;
1810         state.count = 0;
1811         state.private_data = private_data;
1812         state.fn = fn;
1813
1814         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1815         if (ret != 0) {
1816                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1817                 return -1;
1818         }
1819
1820         t.db_id = ctdb_db->db_id;
1821         t.srvid = srvid;
1822         t.reqid = 0;
1823
1824         data.dptr = (uint8_t *)&t;
1825         data.dsize = sizeof(t);
1826
1827         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1828                            data, NULL, NULL, &status, NULL, NULL);
1829         if (ret != 0 || status != 0) {
1830                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1831                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1832                 return -1;
1833         }
1834
1835         while (!state.done) {
1836                 event_loop_once(ctdb_db->ctdb->ev);
1837         }
1838
1839         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1840         if (ret != 0) {
1841                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1842                 return -1;
1843         }
1844
1845         return state.count;
1846 }
1847
1848 #define ISASCII(x) ((x>31)&&(x<128))
1849 /*
1850   called on each key during a catdb
1851  */
1852 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1853 {
1854         int i;
1855         FILE *f = (FILE *)p;
1856         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1857
1858         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1859         for (i=0;i<key.dsize;i++) {
1860                 if (ISASCII(key.dptr[i])) {
1861                         fprintf(f, "%c", key.dptr[i]);
1862                 } else {
1863                         fprintf(f, "\\%02X", key.dptr[i]);
1864                 }
1865         }
1866         fprintf(f, "\"\n");
1867
1868         fprintf(f, "dmaster: %u\n", h->dmaster);
1869         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1870
1871         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1872         for (i=sizeof(*h);i<data.dsize;i++) {
1873                 if (ISASCII(data.dptr[i])) {
1874                         fprintf(f, "%c", data.dptr[i]);
1875                 } else {
1876                         fprintf(f, "\\%02X", data.dptr[i]);
1877                 }
1878         }
1879         fprintf(f, "\"\n");
1880
1881         fprintf(f, "\n");
1882
1883         return 0;
1884 }
1885
1886 /*
1887   convenience function to list all keys to stdout
1888  */
1889 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1890 {
1891         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1892 }
1893
1894 /*
1895   get the pid of a ctdb daemon
1896  */
1897 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1898 {
1899         int ret;
1900         int32_t res;
1901
1902         ret = ctdb_control(ctdb, destnode, 0, 
1903                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1904                            NULL, NULL, &res, &timeout, NULL);
1905         if (ret != 0) {
1906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1907                 return -1;
1908         }
1909
1910         *pid = res;
1911
1912         return 0;
1913 }
1914
1915
1916 /*
1917   async freeze send control
1918  */
1919 struct ctdb_client_control_state *
1920 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1921 {
1922         return ctdb_control_send(ctdb, destnode, priority, 
1923                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1924                            mem_ctx, &timeout, NULL);
1925 }
1926
1927 /* 
1928    async freeze recv control
1929 */
1930 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1931 {
1932         int ret;
1933         int32_t res;
1934
1935         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1936         if ( (ret != 0) || (res != 0) ){
1937                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1938                 return -1;
1939         }
1940
1941         return 0;
1942 }
1943
1944 /*
1945   freeze databases of a certain priority
1946  */
1947 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1948 {
1949         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1950         struct ctdb_client_control_state *state;
1951         int ret;
1952
1953         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1954         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1955         talloc_free(tmp_ctx);
1956
1957         return ret;
1958 }
1959
1960 /* Freeze all databases */
1961 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1962 {
1963         int i;
1964
1965         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1966                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1967                         return -1;
1968                 }
1969         }
1970         return 0;
1971 }
1972
1973 /*
1974   thaw databases of a certain priority
1975  */
1976 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1977 {
1978         int ret;
1979         int32_t res;
1980
1981         ret = ctdb_control(ctdb, destnode, priority, 
1982                            CTDB_CONTROL_THAW, 0, tdb_null, 
1983                            NULL, NULL, &res, &timeout, NULL);
1984         if (ret != 0 || res != 0) {
1985                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1986                 return -1;
1987         }
1988
1989         return 0;
1990 }
1991
1992 /* thaw all databases */
1993 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1994 {
1995         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1996 }
1997
1998 /*
1999   get pnn of a node, or -1
2000  */
2001 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2002 {
2003         int ret;
2004         int32_t res;
2005
2006         ret = ctdb_control(ctdb, destnode, 0, 
2007                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2008                            NULL, NULL, &res, &timeout, NULL);
2009         if (ret != 0) {
2010                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2011                 return -1;
2012         }
2013
2014         return res;
2015 }
2016
2017 /*
2018   get the monitoring mode of a remote node
2019  */
2020 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2021 {
2022         int ret;
2023         int32_t res;
2024
2025         ret = ctdb_control(ctdb, destnode, 0, 
2026                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2027                            NULL, NULL, &res, &timeout, NULL);
2028         if (ret != 0) {
2029                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2030                 return -1;
2031         }
2032
2033         *monmode = res;
2034
2035         return 0;
2036 }
2037
2038
2039 /*
2040  set the monitoring mode of a remote node to active
2041  */
2042 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2043 {
2044         int ret;
2045         
2046
2047         ret = ctdb_control(ctdb, destnode, 0, 
2048                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2049                            NULL, NULL,NULL, &timeout, NULL);
2050         if (ret != 0) {
2051                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2052                 return -1;
2053         }
2054
2055         
2056
2057         return 0;
2058 }
2059
2060 /*
2061   set the monitoring mode of a remote node to disable
2062  */
2063 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2064 {
2065         int ret;
2066         
2067
2068         ret = ctdb_control(ctdb, destnode, 0, 
2069                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2070                            NULL, NULL, NULL, &timeout, NULL);
2071         if (ret != 0) {
2072                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2073                 return -1;
2074         }
2075
2076         
2077
2078         return 0;
2079 }
2080
2081
2082
2083 /* 
2084   sent to a node to make it take over an ip address
2085 */
2086 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2087                           uint32_t destnode, struct ctdb_public_ip *ip)
2088 {
2089         TDB_DATA data;
2090         struct ctdb_public_ipv4 ipv4;
2091         int ret;
2092         int32_t res;
2093
2094         if (ip->addr.sa.sa_family == AF_INET) {
2095                 ipv4.pnn = ip->pnn;
2096                 ipv4.sin = ip->addr.ip;
2097
2098                 data.dsize = sizeof(ipv4);
2099                 data.dptr  = (uint8_t *)&ipv4;
2100
2101                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2102                            NULL, &res, &timeout, NULL);
2103         } else {
2104                 data.dsize = sizeof(*ip);
2105                 data.dptr  = (uint8_t *)ip;
2106
2107                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2108                            NULL, &res, &timeout, NULL);
2109         }
2110
2111         if (ret != 0 || res != 0) {
2112                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2113                 return -1;
2114         }
2115
2116         return 0;       
2117 }
2118
2119
2120 /* 
2121   sent to a node to make it release an ip address
2122 */
2123 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2124                          uint32_t destnode, struct ctdb_public_ip *ip)
2125 {
2126         TDB_DATA data;
2127         struct ctdb_public_ipv4 ipv4;
2128         int ret;
2129         int32_t res;
2130
2131         if (ip->addr.sa.sa_family == AF_INET) {
2132                 ipv4.pnn = ip->pnn;
2133                 ipv4.sin = ip->addr.ip;
2134
2135                 data.dsize = sizeof(ipv4);
2136                 data.dptr  = (uint8_t *)&ipv4;
2137
2138                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2139                                    NULL, &res, &timeout, NULL);
2140         } else {
2141                 data.dsize = sizeof(*ip);
2142                 data.dptr  = (uint8_t *)ip;
2143
2144                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2145                                    NULL, &res, &timeout, NULL);
2146         }
2147
2148         if (ret != 0 || res != 0) {
2149                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2150                 return -1;
2151         }
2152
2153         return 0;       
2154 }
2155
2156
2157 /*
2158   get a tunable
2159  */
2160 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2161                           struct timeval timeout, 
2162                           uint32_t destnode,
2163                           const char *name, uint32_t *value)
2164 {
2165         struct ctdb_control_get_tunable *t;
2166         TDB_DATA data, outdata;
2167         int32_t res;
2168         int ret;
2169
2170         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2171         data.dptr  = talloc_size(ctdb, data.dsize);
2172         CTDB_NO_MEMORY(ctdb, data.dptr);
2173
2174         t = (struct ctdb_control_get_tunable *)data.dptr;
2175         t->length = strlen(name)+1;
2176         memcpy(t->name, name, t->length);
2177
2178         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2179                            &outdata, &res, &timeout, NULL);
2180         talloc_free(data.dptr);
2181         if (ret != 0 || res != 0) {
2182                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2183                 return -1;
2184         }
2185
2186         if (outdata.dsize != sizeof(uint32_t)) {
2187                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2188                 talloc_free(outdata.dptr);
2189                 return -1;
2190         }
2191         
2192         *value = *(uint32_t *)outdata.dptr;
2193         talloc_free(outdata.dptr);
2194
2195         return 0;
2196 }
2197
2198 /*
2199   set a tunable
2200  */
2201 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2202                           struct timeval timeout, 
2203                           uint32_t destnode,
2204                           const char *name, uint32_t value)
2205 {
2206         struct ctdb_control_set_tunable *t;
2207         TDB_DATA data;
2208         int32_t res;
2209         int ret;
2210
2211         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2212         data.dptr  = talloc_size(ctdb, data.dsize);
2213         CTDB_NO_MEMORY(ctdb, data.dptr);
2214
2215         t = (struct ctdb_control_set_tunable *)data.dptr;
2216         t->length = strlen(name)+1;
2217         memcpy(t->name, name, t->length);
2218         t->value = value;
2219
2220         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2221                            NULL, &res, &timeout, NULL);
2222         talloc_free(data.dptr);
2223         if (ret != 0 || res != 0) {
2224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2225                 return -1;
2226         }
2227
2228         return 0;
2229 }
2230
2231 /*
2232   list tunables
2233  */
2234 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2235                             struct timeval timeout, 
2236                             uint32_t destnode,
2237                             TALLOC_CTX *mem_ctx,
2238                             const char ***list, uint32_t *count)
2239 {
2240         TDB_DATA outdata;
2241         int32_t res;
2242         int ret;
2243         struct ctdb_control_list_tunable *t;
2244         char *p, *s, *ptr;
2245
2246         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2247                            mem_ctx, &outdata, &res, &timeout, NULL);
2248         if (ret != 0 || res != 0) {
2249                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2250                 return -1;
2251         }
2252
2253         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2254         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2255             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2256                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2257                 talloc_free(outdata.dptr);
2258                 return -1;              
2259         }
2260         
2261         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2262         CTDB_NO_MEMORY(ctdb, p);
2263
2264         talloc_free(outdata.dptr);
2265         
2266         (*list) = NULL;
2267         (*count) = 0;
2268
2269         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2270                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2271                 CTDB_NO_MEMORY(ctdb, *list);
2272                 (*list)[*count] = talloc_strdup(*list, s);
2273                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2274                 (*count)++;
2275         }
2276
2277         talloc_free(p);
2278
2279         return 0;
2280 }
2281
2282
2283 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2284                         struct timeval timeout, uint32_t destnode, 
2285                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2286 {
2287         int ret;
2288         TDB_DATA outdata;
2289         int32_t res;
2290
2291         ret = ctdb_control(ctdb, destnode, 0, 
2292                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2293                            mem_ctx, &outdata, &res, &timeout, NULL);
2294         if (ret == 0 && res == -1) {
2295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2296                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2297         }
2298         if (ret != 0 || res != 0) {
2299           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2300                 return -1;
2301         }
2302
2303         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2304         talloc_free(outdata.dptr);
2305                     
2306         return 0;
2307 }
2308
2309 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2310                         struct timeval timeout, uint32_t destnode, 
2311                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2312 {
2313         int ret, i, len;
2314         TDB_DATA outdata;
2315         int32_t res;
2316         struct ctdb_all_public_ipsv4 *ipsv4;
2317
2318         ret = ctdb_control(ctdb, destnode, 0, 
2319                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2320                            mem_ctx, &outdata, &res, &timeout, NULL);
2321         if (ret != 0 || res != 0) {
2322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2323                 return -1;
2324         }
2325
2326         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2327         len = offsetof(struct ctdb_all_public_ips, ips) +
2328                 ipsv4->num*sizeof(struct ctdb_public_ip);
2329         *ips = talloc_zero_size(mem_ctx, len);
2330         CTDB_NO_MEMORY(ctdb, *ips);
2331         (*ips)->num = ipsv4->num;
2332         for (i=0; i<ipsv4->num; i++) {
2333                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2334                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2335         }
2336
2337         talloc_free(outdata.dptr);
2338                     
2339         return 0;
2340 }
2341
2342 /*
2343   set/clear the permanent disabled bit on a remote node
2344  */
2345 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2346                        uint32_t set, uint32_t clear)
2347 {
2348         int ret;
2349         TDB_DATA data;
2350         struct ctdb_node_map *nodemap=NULL;
2351         struct ctdb_node_flag_change c;
2352         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2353         uint32_t recmaster;
2354         uint32_t *nodes;
2355
2356
2357         /* find the recovery master */
2358         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2359         if (ret != 0) {
2360                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2361                 talloc_free(tmp_ctx);
2362                 return ret;
2363         }
2364
2365
2366         /* read the node flags from the recmaster */
2367         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2368         if (ret != 0) {
2369                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2370                 talloc_free(tmp_ctx);
2371                 return -1;
2372         }
2373         if (destnode >= nodemap->num) {
2374                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2375                 talloc_free(tmp_ctx);
2376                 return -1;
2377         }
2378
2379         c.pnn       = destnode;
2380         c.old_flags = nodemap->nodes[destnode].flags;
2381         c.new_flags = c.old_flags;
2382         c.new_flags |= set;
2383         c.new_flags &= ~clear;
2384
2385         data.dsize = sizeof(c);
2386         data.dptr = (unsigned char *)&c;
2387
2388         /* send the flags update to all connected nodes */
2389         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2390
2391         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2392                                         nodes, 0,
2393                                         timeout, false, data,
2394                                         NULL, NULL,
2395                                         NULL) != 0) {
2396                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2397
2398                 talloc_free(tmp_ctx);
2399                 return -1;
2400         }
2401
2402         talloc_free(tmp_ctx);
2403         return 0;
2404 }
2405
2406
2407 /*
2408   get all tunables
2409  */
2410 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2411                                struct timeval timeout, 
2412                                uint32_t destnode,
2413                                struct ctdb_tunable *tunables)
2414 {
2415         TDB_DATA outdata;
2416         int ret;
2417         int32_t res;
2418
2419         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2420                            &outdata, &res, &timeout, NULL);
2421         if (ret != 0 || res != 0) {
2422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2423                 return -1;
2424         }
2425
2426         if (outdata.dsize != sizeof(*tunables)) {
2427                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2428                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2429                 return -1;              
2430         }
2431
2432         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2433         talloc_free(outdata.dptr);
2434         return 0;
2435 }
2436
2437 /*
2438   add a public address to a node
2439  */
2440 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2441                       struct timeval timeout, 
2442                       uint32_t destnode,
2443                       struct ctdb_control_ip_iface *pub)
2444 {
2445         TDB_DATA data;
2446         int32_t res;
2447         int ret;
2448
2449         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2450         data.dptr  = (unsigned char *)pub;
2451
2452         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2453                            NULL, &res, &timeout, NULL);
2454         if (ret != 0 || res != 0) {
2455                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2456                 return -1;
2457         }
2458
2459         return 0;
2460 }
2461
2462 /*
2463   delete a public address from a node
2464  */
2465 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2466                       struct timeval timeout, 
2467                       uint32_t destnode,
2468                       struct ctdb_control_ip_iface *pub)
2469 {
2470         TDB_DATA data;
2471         int32_t res;
2472         int ret;
2473
2474         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2475         data.dptr  = (unsigned char *)pub;
2476
2477         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2478                            NULL, &res, &timeout, NULL);
2479         if (ret != 0 || res != 0) {
2480                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2481                 return -1;
2482         }
2483
2484         return 0;
2485 }
2486
2487 /*
2488   kill a tcp connection
2489  */
2490 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2491                       struct timeval timeout, 
2492                       uint32_t destnode,
2493                       struct ctdb_control_killtcp *killtcp)
2494 {
2495         TDB_DATA data;
2496         int32_t res;
2497         int ret;
2498
2499         data.dsize = sizeof(struct ctdb_control_killtcp);
2500         data.dptr  = (unsigned char *)killtcp;
2501
2502         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2503                            NULL, &res, &timeout, NULL);
2504         if (ret != 0 || res != 0) {
2505                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2506                 return -1;
2507         }
2508
2509         return 0;
2510 }
2511
2512 /*
2513   send a gratious arp
2514  */
2515 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2516                       struct timeval timeout, 
2517                       uint32_t destnode,
2518                       ctdb_sock_addr *addr,
2519                       const char *ifname)
2520 {
2521         TDB_DATA data;
2522         int32_t res;
2523         int ret, len;
2524         struct ctdb_control_gratious_arp *gratious_arp;
2525         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2526
2527
2528         len = strlen(ifname)+1;
2529         gratious_arp = talloc_size(tmp_ctx, 
2530                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2531         CTDB_NO_MEMORY(ctdb, gratious_arp);
2532
2533         gratious_arp->addr = *addr;
2534         gratious_arp->len = len;
2535         memcpy(&gratious_arp->iface[0], ifname, len);
2536
2537
2538         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2539         data.dptr  = (unsigned char *)gratious_arp;
2540
2541         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2542                            NULL, &res, &timeout, NULL);
2543         if (ret != 0 || res != 0) {
2544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2545                 talloc_free(tmp_ctx);
2546                 return -1;
2547         }
2548
2549         talloc_free(tmp_ctx);
2550         return 0;
2551 }
2552
2553 /*
2554   get a list of all tcp tickles that a node knows about for a particular vnn
2555  */
2556 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2557                               struct timeval timeout, uint32_t destnode, 
2558                               TALLOC_CTX *mem_ctx, 
2559                               ctdb_sock_addr *addr,
2560                               struct ctdb_control_tcp_tickle_list **list)
2561 {
2562         int ret;
2563         TDB_DATA data, outdata;
2564         int32_t status;
2565
2566         data.dptr = (uint8_t*)addr;
2567         data.dsize = sizeof(ctdb_sock_addr);
2568
2569         ret = ctdb_control(ctdb, destnode, 0, 
2570                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2571                            mem_ctx, &outdata, &status, NULL, NULL);
2572         if (ret != 0 || status != 0) {
2573                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2574                 return -1;
2575         }
2576
2577         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2578
2579         return status;
2580 }
2581
2582 /*
2583   register a server id
2584  */
2585 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2586                       struct timeval timeout, 
2587                       struct ctdb_server_id *id)
2588 {
2589         TDB_DATA data;
2590         int32_t res;
2591         int ret;
2592
2593         data.dsize = sizeof(struct ctdb_server_id);
2594         data.dptr  = (unsigned char *)id;
2595
2596         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2597                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2598                         0, data, NULL,
2599                         NULL, &res, &timeout, NULL);
2600         if (ret != 0 || res != 0) {
2601                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2602                 return -1;
2603         }
2604
2605         return 0;
2606 }
2607
2608 /*
2609   unregister a server id
2610  */
2611 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2612                       struct timeval timeout, 
2613                       struct ctdb_server_id *id)
2614 {
2615         TDB_DATA data;
2616         int32_t res;
2617         int ret;
2618
2619         data.dsize = sizeof(struct ctdb_server_id);
2620         data.dptr  = (unsigned char *)id;
2621
2622         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2623                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2624                         0, data, NULL,
2625                         NULL, &res, &timeout, NULL);
2626         if (ret != 0 || res != 0) {
2627                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2628                 return -1;
2629         }
2630
2631         return 0;
2632 }
2633
2634
2635 /*
2636   check if a server id exists
2637
2638   if a server id does exist, return *status == 1, otherwise *status == 0
2639  */
2640 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2641                       struct timeval timeout, 
2642                       uint32_t destnode,
2643                       struct ctdb_server_id *id,
2644                       uint32_t *status)
2645 {
2646         TDB_DATA data;
2647         int32_t res;
2648         int ret;
2649
2650         data.dsize = sizeof(struct ctdb_server_id);
2651         data.dptr  = (unsigned char *)id;
2652
2653         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2654                         0, data, NULL,
2655                         NULL, &res, &timeout, NULL);
2656         if (ret != 0) {
2657                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2658                 return -1;
2659         }
2660
2661         if (res) {
2662                 *status = 1;
2663         } else {
2664                 *status = 0;
2665         }
2666
2667         return 0;
2668 }
2669
2670 /*
2671    get the list of server ids that are registered on a node
2672 */
2673 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2674                 TALLOC_CTX *mem_ctx,
2675                 struct timeval timeout, uint32_t destnode, 
2676                 struct ctdb_server_id_list **svid_list)
2677 {
2678         int ret;
2679         TDB_DATA outdata;
2680         int32_t res;
2681
2682         ret = ctdb_control(ctdb, destnode, 0, 
2683                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2684                            mem_ctx, &outdata, &res, &timeout, NULL);
2685         if (ret != 0 || res != 0) {
2686                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2687                 return -1;
2688         }
2689
2690         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2691                     
2692         return 0;
2693 }
2694
2695 /*
2696   initialise the ctdb daemon for client applications
2697
2698   NOTE: In current code the daemon does not fork. This is for testing purposes only
2699   and to simplify the code.
2700 */
2701 struct ctdb_context *ctdb_init(struct event_context *ev)
2702 {
2703         int ret;
2704         struct ctdb_context *ctdb;
2705
2706         ctdb = talloc_zero(ev, struct ctdb_context);
2707         if (ctdb == NULL) {
2708                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2709                 return NULL;
2710         }
2711         ctdb->ev  = ev;
2712         ctdb->idr = idr_init(ctdb);
2713         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2714
2715         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2716         if (ret != 0) {
2717                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2718                 talloc_free(ctdb);
2719                 return NULL;
2720         }
2721
2722         return ctdb;
2723 }
2724
2725
2726 /*
2727   set some ctdb flags
2728 */
2729 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2730 {
2731         ctdb->flags |= flags;
2732 }
2733
2734 /*
2735   setup the local socket name
2736 */
2737 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2738 {
2739         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2740         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2741
2742         return 0;
2743 }
2744
2745 /*
2746   return the pnn of this node
2747 */
2748 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2749 {
2750         return ctdb->pnn;
2751 }
2752
2753
2754 /*
2755   get the uptime of a remote node
2756  */
2757 struct ctdb_client_control_state *
2758 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2759 {
2760         return ctdb_control_send(ctdb, destnode, 0, 
2761                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2762                            mem_ctx, &timeout, NULL);
2763 }
2764
2765 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2766 {
2767         int ret;
2768         int32_t res;
2769         TDB_DATA outdata;
2770
2771         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2772         if (ret != 0 || res != 0) {
2773                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2774                 return -1;
2775         }
2776
2777         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2778
2779         return 0;
2780 }
2781
2782 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2783 {
2784         struct ctdb_client_control_state *state;
2785
2786         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2787         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2788 }
2789
2790 /*
2791   send a control to execute the "recovered" event script on a node
2792  */
2793 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2794 {
2795         int ret;
2796         int32_t status;
2797
2798         ret = ctdb_control(ctdb, destnode, 0, 
2799                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2800                            NULL, NULL, &status, &timeout, NULL);
2801         if (ret != 0 || status != 0) {
2802                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2803                 return -1;
2804         }
2805
2806         return 0;
2807 }
2808
2809 /* 
2810   callback for the async helpers used when sending the same control
2811   to multiple nodes in parallell.
2812 */
2813 static void async_callback(struct ctdb_client_control_state *state)
2814 {
2815         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2816         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2817         int ret;
2818         TDB_DATA outdata;
2819         int32_t res;
2820         uint32_t destnode = state->c->hdr.destnode;
2821
2822         /* one more node has responded with recmode data */
2823         data->count--;
2824
2825         /* if we failed to push the db, then return an error and let
2826            the main loop try again.
2827         */
2828         if (state->state != CTDB_CONTROL_DONE) {
2829                 if ( !data->dont_log_errors) {
2830                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2831                 }
2832                 data->fail_count++;
2833                 if (data->fail_callback) {
2834                         data->fail_callback(ctdb, destnode, res, outdata,
2835                                         data->callback_data);
2836                 }
2837                 return;
2838         }
2839         
2840         state->async.fn = NULL;
2841
2842         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2843         if ((ret != 0) || (res != 0)) {
2844                 if ( !data->dont_log_errors) {
2845                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2846                 }
2847                 data->fail_count++;
2848                 if (data->fail_callback) {
2849                         data->fail_callback(ctdb, destnode, res, outdata,
2850                                         data->callback_data);
2851                 }
2852         }
2853         if ((ret == 0) && (data->callback != NULL)) {
2854                 data->callback(ctdb, destnode, res, outdata,
2855                                         data->callback_data);
2856         }
2857 }
2858
2859
2860 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2861 {
2862         /* set up the callback functions */
2863         state->async.fn = async_callback;
2864         state->async.private_data = data;
2865         
2866         /* one more control to wait for to complete */
2867         data->count++;
2868 }
2869
2870
2871 /* wait for up to the maximum number of seconds allowed
2872    or until all nodes we expect a response from has replied
2873 */
2874 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2875 {
2876         while (data->count > 0) {
2877                 event_loop_once(ctdb->ev);
2878         }
2879         if (data->fail_count != 0) {
2880                 if (!data->dont_log_errors) {
2881                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2882                                  data->fail_count));
2883                 }
2884                 return -1;
2885         }
2886         return 0;
2887 }
2888
2889
2890 /* 
2891    perform a simple control on the listed nodes
2892    The control cannot return data
2893  */
2894 int ctdb_client_async_control(struct ctdb_context *ctdb,
2895                                 enum ctdb_controls opcode,
2896                                 uint32_t *nodes,
2897                                 uint64_t srvid,
2898                                 struct timeval timeout,
2899                                 bool dont_log_errors,
2900                                 TDB_DATA data,
2901                                 client_async_callback client_callback,
2902                                 client_async_callback fail_callback,
2903                                 void *callback_data)
2904 {
2905         struct client_async_data *async_data;
2906         struct ctdb_client_control_state *state;
2907         int j, num_nodes;
2908
2909         async_data = talloc_zero(ctdb, struct client_async_data);
2910         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2911         async_data->dont_log_errors = dont_log_errors;
2912         async_data->callback = client_callback;
2913         async_data->fail_callback = fail_callback;
2914         async_data->callback_data = callback_data;
2915         async_data->opcode        = opcode;
2916
2917         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2918
2919         /* loop over all nodes and send an async control to each of them */
2920         for (j=0; j<num_nodes; j++) {
2921                 uint32_t pnn = nodes[j];
2922
2923                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2924                                           0, data, async_data, &timeout, NULL);
2925                 if (state == NULL) {
2926                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2927                         talloc_free(async_data);
2928                         return -1;
2929                 }
2930                 
2931                 ctdb_client_async_add(async_data, state);
2932         }
2933
2934         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2935                 talloc_free(async_data);
2936                 return -1;
2937         }
2938
2939         talloc_free(async_data);
2940         return 0;
2941 }
2942
2943 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2944                                 struct ctdb_vnn_map *vnn_map,
2945                                 TALLOC_CTX *mem_ctx,
2946                                 bool include_self)
2947 {
2948         int i, j, num_nodes;
2949         uint32_t *nodes;
2950
2951         for (i=num_nodes=0;i<vnn_map->size;i++) {
2952                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2953                         continue;
2954                 }
2955                 num_nodes++;
2956         } 
2957
2958         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2959         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2960
2961         for (i=j=0;i<vnn_map->size;i++) {
2962                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2963                         continue;
2964                 }
2965                 nodes[j++] = vnn_map->map[i];
2966         } 
2967
2968         return nodes;
2969 }
2970
2971 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2972                                 struct ctdb_node_map *node_map,
2973                                 TALLOC_CTX *mem_ctx,
2974                                 bool include_self)
2975 {
2976         int i, j, num_nodes;
2977         uint32_t *nodes;
2978
2979         for (i=num_nodes=0;i<node_map->num;i++) {
2980                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2981                         continue;
2982                 }
2983                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2984                         continue;
2985                 }
2986                 num_nodes++;
2987         } 
2988
2989         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2990         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2991
2992         for (i=j=0;i<node_map->num;i++) {
2993                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2994                         continue;
2995                 }
2996                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2997                         continue;
2998                 }
2999                 nodes[j++] = node_map->nodes[i].pnn;
3000         } 
3001
3002         return nodes;
3003 }
3004
3005 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3006                                 struct ctdb_node_map *node_map,
3007                                 TALLOC_CTX *mem_ctx,
3008                                 uint32_t pnn)
3009 {
3010         int i, j, num_nodes;
3011         uint32_t *nodes;
3012
3013         for (i=num_nodes=0;i<node_map->num;i++) {
3014                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3015                         continue;
3016                 }
3017                 if (node_map->nodes[i].pnn == pnn) {
3018                         continue;
3019                 }
3020                 num_nodes++;
3021         } 
3022
3023         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3024         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3025
3026         for (i=j=0;i<node_map->num;i++) {
3027                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3028                         continue;
3029                 }
3030                 if (node_map->nodes[i].pnn == pnn) {
3031                         continue;
3032                 }
3033                 nodes[j++] = node_map->nodes[i].pnn;
3034         } 
3035
3036         return nodes;
3037 }
3038
3039 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3040                                 struct ctdb_node_map *node_map,
3041                                 TALLOC_CTX *mem_ctx,
3042                                 bool include_self)
3043 {
3044         int i, j, num_nodes;
3045         uint32_t *nodes;
3046
3047         for (i=num_nodes=0;i<node_map->num;i++) {
3048                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3049                         continue;
3050                 }
3051                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3052                         continue;
3053                 }
3054                 num_nodes++;
3055         } 
3056
3057         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3058         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3059
3060         for (i=j=0;i<node_map->num;i++) {
3061                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3062                         continue;
3063                 }
3064                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3065                         continue;
3066                 }
3067                 nodes[j++] = node_map->nodes[i].pnn;
3068         } 
3069
3070         return nodes;
3071 }
3072
3073 /* 
3074   this is used to test if a pnn lock exists and if it exists will return
3075   the number of connections that pnn has reported or -1 if that recovery
3076   daemon is not running.
3077 */
3078 int
3079 ctdb_read_pnn_lock(int fd, int32_t pnn)
3080 {
3081         struct flock lock;
3082         char c;
3083
3084         lock.l_type = F_WRLCK;
3085         lock.l_whence = SEEK_SET;
3086         lock.l_start = pnn;
3087         lock.l_len = 1;
3088         lock.l_pid = 0;
3089
3090         if (fcntl(fd, F_GETLK, &lock) != 0) {
3091                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3092                 return -1;
3093         }
3094
3095         if (lock.l_type == F_UNLCK) {
3096                 return -1;
3097         }
3098
3099         if (pread(fd, &c, 1, pnn) == -1) {
3100                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3101                 return -1;
3102         }
3103
3104         return c;
3105 }
3106
3107 /*
3108   get capabilities of a remote node
3109  */
3110 struct ctdb_client_control_state *
3111 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3112 {
3113         return ctdb_control_send(ctdb, destnode, 0, 
3114                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3115                            mem_ctx, &timeout, NULL);
3116 }
3117
3118 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3119 {
3120         int ret;
3121         int32_t res;
3122         TDB_DATA outdata;
3123
3124         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3125         if ( (ret != 0) || (res != 0) ) {
3126                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3127                 return -1;
3128         }
3129
3130         if (capabilities) {
3131                 *capabilities = *((uint32_t *)outdata.dptr);
3132         }
3133
3134         return 0;
3135 }
3136
3137 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3138 {
3139         struct ctdb_client_control_state *state;
3140         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3141         int ret;
3142
3143         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3144         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3145         talloc_free(tmp_ctx);
3146         return ret;
3147 }
3148
3149 /**
3150  * check whether a transaction is active on a given db on a given node
3151  */
3152 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3153                                      uint32_t destnode,
3154                                      uint32_t db_id)
3155 {
3156         int32_t status;
3157         int ret;
3158         TDB_DATA indata;
3159
3160         indata.dptr = (uint8_t *)&db_id;
3161         indata.dsize = sizeof(db_id);
3162
3163         ret = ctdb_control(ctdb, destnode, 0,
3164                            CTDB_CONTROL_TRANS2_ACTIVE,
3165                            0, indata, NULL, NULL, &status,
3166                            NULL, NULL);
3167
3168         if (ret != 0) {
3169                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3170                 return -1;
3171         }
3172
3173         return status;
3174 }
3175
3176
3177 struct ctdb_transaction_handle {
3178         struct ctdb_db_context *ctdb_db;
3179         bool in_replay;
3180         /*
3181          * we store the reads and writes done under a transaction:
3182          * - one list stores both reads and writes (m_all),
3183          * - the other just writes (m_write)
3184          */
3185         struct ctdb_marshall_buffer *m_all;
3186         struct ctdb_marshall_buffer *m_write;
3187 };
3188
3189 /* start a transaction on a database */
3190 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3191 {
3192         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3193         return 0;
3194 }
3195
3196 /* start a transaction on a database */
3197 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3198 {
3199         struct ctdb_record_handle *rh;
3200         TDB_DATA key;
3201         TDB_DATA data;
3202         struct ctdb_ltdb_header header;
3203         TALLOC_CTX *tmp_ctx;
3204         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3205         int ret;
3206         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3207         pid_t pid;
3208         int32_t status;
3209
3210         key.dptr = discard_const(keyname);
3211         key.dsize = strlen(keyname);
3212
3213         if (!ctdb_db->persistent) {
3214                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3215                 return -1;
3216         }
3217
3218 again:
3219         tmp_ctx = talloc_new(h);
3220
3221         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3222         if (rh == NULL) {
3223                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3224                 talloc_free(tmp_ctx);
3225                 return -1;
3226         }
3227
3228         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3229                                               CTDB_CURRENT_NODE,
3230                                               ctdb_db->db_id);
3231         if (status == 1) {
3232                 unsigned long int usec = (1000 + random()) % 100000;
3233                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3234                                     "on db_id[0x%08x]. waiting for %lu "
3235                                     "microseconds\n",
3236                                     ctdb_db->db_id, usec));
3237                 talloc_free(tmp_ctx);
3238                 usleep(usec);
3239                 goto again;
3240         }
3241
3242         /*
3243          * store the pid in the database:
3244          * it is not enough that the node is dmaster...
3245          */
3246         pid = getpid();
3247         data.dptr = (unsigned char *)&pid;
3248         data.dsize = sizeof(pid_t);
3249         rh->header.rsn++;
3250         rh->header.dmaster = ctdb_db->ctdb->pnn;
3251         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3252         if (ret != 0) {
3253                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3254                                   "transaction record\n"));
3255                 talloc_free(tmp_ctx);
3256                 return -1;
3257         }
3258
3259         talloc_free(rh);
3260
3261         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3262         if (ret != 0) {
3263                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3264                 talloc_free(tmp_ctx);
3265                 return -1;
3266         }
3267
3268         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3269         if (ret != 0) {
3270                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3271                                  "lock record inside transaction\n"));
3272                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3273                 talloc_free(tmp_ctx);
3274                 goto again;
3275         }
3276
3277         if (header.dmaster != ctdb_db->ctdb->pnn) {
3278                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3279                                    "transaction lock record\n"));
3280                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3281                 talloc_free(tmp_ctx);
3282                 goto again;
3283         }
3284
3285         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3286                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3287                                     "the transaction lock record\n"));
3288                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3289                 talloc_free(tmp_ctx);
3290                 goto again;
3291         }
3292
3293         talloc_free(tmp_ctx);
3294
3295         return 0;
3296 }
3297
3298
3299 /* start a transaction on a database */
3300 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3301                                                        TALLOC_CTX *mem_ctx)
3302 {
3303         struct ctdb_transaction_handle *h;
3304         int ret;
3305
3306         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3307         if (h == NULL) {
3308                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3309                 return NULL;
3310         }
3311
3312         h->ctdb_db = ctdb_db;
3313
3314         ret = ctdb_transaction_fetch_start(h);
3315         if (ret != 0) {
3316                 talloc_free(h);
3317                 return NULL;
3318         }
3319
3320         talloc_set_destructor(h, ctdb_transaction_destructor);
3321
3322         return h;
3323 }
3324
3325
3326
3327 /*
3328   fetch a record inside a transaction
3329  */
3330 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3331                            TALLOC_CTX *mem_ctx, 
3332                            TDB_DATA key, TDB_DATA *data)
3333 {
3334         struct ctdb_ltdb_header header;
3335         int ret;
3336
3337         ZERO_STRUCT(header);
3338
3339         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3340         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3341                 /* record doesn't exist yet */
3342                 *data = tdb_null;
3343                 ret = 0;
3344         }
3345         
3346         if (ret != 0) {
3347                 return ret;
3348         }
3349
3350         if (!h->in_replay) {
3351                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3352                 if (h->m_all == NULL) {
3353                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3354                         return -1;
3355                 }
3356         }
3357
3358         return 0;
3359 }
3360
3361 /*
3362   stores a record inside a transaction
3363  */
3364 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3365                            TDB_DATA key, TDB_DATA data)
3366 {
3367         TALLOC_CTX *tmp_ctx = talloc_new(h);
3368         struct ctdb_ltdb_header header;
3369         TDB_DATA olddata;
3370         int ret;
3371
3372         ZERO_STRUCT(header);
3373
3374         /* we need the header so we can update the RSN */
3375         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3376         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3377                 /* the record doesn't exist - create one with us as dmaster.
3378                    This is only safe because we are in a transaction and this
3379                    is a persistent database */
3380                 ZERO_STRUCT(header);
3381         } else if (ret != 0) {
3382                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3383                 talloc_free(tmp_ctx);
3384                 return ret;
3385         }
3386
3387         if (data.dsize == olddata.dsize &&
3388             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3389                 /* save writing the same data */
3390                 talloc_free(tmp_ctx);
3391                 return 0;
3392         }
3393
3394         header.dmaster = h->ctdb_db->ctdb->pnn;
3395         header.rsn++;
3396
3397         if (!h->in_replay) {
3398                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3399                 if (h->m_all == NULL) {
3400                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3401                         talloc_free(tmp_ctx);
3402                         return -1;
3403                 }
3404         }               
3405
3406         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3407         if (h->m_write == NULL) {
3408                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3409                 talloc_free(tmp_ctx);
3410                 return -1;
3411         }
3412         
3413         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3414
3415         talloc_free(tmp_ctx);
3416         
3417         return ret;
3418 }
3419
3420 /*
3421   replay a transaction
3422  */
3423 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3424 {
3425         int ret, i;
3426         struct ctdb_rec_data *rec = NULL;
3427
3428         h->in_replay = true;
3429         talloc_free(h->m_write);
3430         h->m_write = NULL;
3431
3432         ret = ctdb_transaction_fetch_start(h);
3433         if (ret != 0) {
3434                 return ret;
3435         }
3436
3437         for (i=0;i<h->m_all->count;i++) {
3438                 TDB_DATA key, data;
3439
3440                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3441                 if (rec == NULL) {
3442                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3443                         goto failed;
3444                 }
3445
3446                 if (rec->reqid == 0) {
3447                         /* its a store */
3448                         if (ctdb_transaction_store(h, key, data) != 0) {
3449                                 goto failed;
3450                         }
3451                 } else {
3452                         TDB_DATA data2;
3453                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3454
3455                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3456                                 talloc_free(tmp_ctx);
3457                                 goto failed;
3458                         }
3459                         if (data2.dsize != data.dsize ||
3460                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3461                                 /* the record has changed on us - we have to give up */
3462                                 talloc_free(tmp_ctx);
3463                                 goto failed;
3464                         }
3465                         talloc_free(tmp_ctx);
3466                 }
3467         }
3468         
3469         return 0;
3470
3471 failed:
3472         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3473         return -1;
3474 }
3475
3476
3477 /*
3478   commit a transaction
3479  */
3480 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3481 {
3482         int ret, retries=0;
3483         int32_t status;
3484         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3485         struct timeval timeout;
3486         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3487
3488         talloc_set_destructor(h, NULL);
3489
3490         /* our commit strategy is quite complex.
3491
3492            - we first try to commit the changes to all other nodes
3493
3494            - if that works, then we commit locally and we are done
3495
3496            - if a commit on another node fails, then we need to cancel
3497              the transaction, then restart the transaction (thus
3498              opening a window of time for a pending recovery to
3499              complete), then replay the transaction, checking all the
3500              reads and writes (checking that reads give the same data,
3501              and writes succeed). Then we retry the transaction to the
3502              other nodes
3503         */
3504
3505 again:
3506         if (h->m_write == NULL) {
3507                 /* no changes were made */
3508                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3509                 talloc_free(h);
3510                 return 0;
3511         }
3512
3513         /* tell ctdbd to commit to the other nodes */
3514         timeout = timeval_current_ofs(1, 0);
3515         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3516                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3517                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3518                            &timeout, NULL);
3519         if (ret != 0 || status != 0) {
3520                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3521                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3522                                      ", retrying after 1 second...\n",
3523                                      (retries==0)?"":"retry "));
3524                 sleep(1);
3525
3526                 if (ret != 0) {
3527                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3528                 } else {
3529                         /* work out what error code we will give if we 
3530                            have to fail the operation */
3531                         switch ((enum ctdb_trans2_commit_error)status) {
3532                         case CTDB_TRANS2_COMMIT_SUCCESS:
3533                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3534                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3535                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3536                                 break;
3537                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3538                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3539                                 break;
3540                         }
3541                 }
3542
3543                 if (++retries == 100) {
3544                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3545                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3546                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3547                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3548                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3549                         talloc_free(h);
3550                         return -1;
3551                 }               
3552
3553                 if (ctdb_replay_transaction(h) != 0) {
3554                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3555                                           "transaction on db 0x%08x, "
3556                                           "failure control =%u\n",
3557                                           h->ctdb_db->db_id,
3558                                           (unsigned)failure_control));
3559                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3560                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3561                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3562                         talloc_free(h);
3563                         return -1;
3564                 }
3565                 goto again;
3566         } else {
3567                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3568         }
3569
3570         /* do the real commit locally */
3571         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3572         if (ret != 0) {
3573                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3574                                   "on db id 0x%08x locally, "
3575                                   "failure_control=%u\n",
3576                                   h->ctdb_db->db_id,
3577                                   (unsigned)failure_control));
3578                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3579                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3580                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3581                 talloc_free(h);
3582                 return ret;
3583         }
3584
3585         /* tell ctdbd that we are finished with our local commit */
3586         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3587                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3588                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3589         talloc_free(h);
3590         return 0;
3591 }
3592
3593 /*
3594   recovery daemon ping to main daemon
3595  */
3596 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3597 {
3598         int ret;
3599         int32_t res;
3600
3601         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3602                            ctdb, NULL, &res, NULL, NULL);
3603         if (ret != 0 || res != 0) {
3604                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3605                 return -1;
3606         }
3607
3608         return 0;
3609 }
3610
3611 /* when forking the main daemon and the child process needs to connect back
3612  * to the daemon as a client process, this function can be used to change
3613  * the ctdb context from daemon into client mode
3614  */
3615 int switch_from_server_to_client(struct ctdb_context *ctdb)
3616 {
3617         int ret;
3618
3619         /* shutdown the transport */
3620         if (ctdb->methods) {
3621                 ctdb->methods->shutdown(ctdb);
3622         }
3623
3624         /* get a new event context */
3625         talloc_free(ctdb->ev);
3626         ctdb->ev = event_context_init(ctdb);
3627
3628         close(ctdb->daemon.sd);
3629         ctdb->daemon.sd = -1;
3630
3631         /* the client does not need to be realtime */
3632         if (ctdb->do_setsched) {
3633                 ctdb_restore_scheduler(ctdb);
3634         }
3635
3636         /* initialise ctdb */
3637         ret = ctdb_socket_connect(ctdb);
3638         if (ret != 0) {
3639                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3640                 return -1;
3641         }
3642
3643          return 0;
3644 }
3645
3646 /*
3647   get the status of running the monitor eventscripts: NULL means never run.
3648  */
3649 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3650                 struct timeval timeout, uint32_t destnode, 
3651                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3652                 struct ctdb_scripts_wire **script_status)
3653 {
3654         int ret;
3655         TDB_DATA outdata, indata;
3656         int32_t res;
3657         uint32_t uinttype = type;
3658
3659         indata.dptr = (uint8_t *)&uinttype;
3660         indata.dsize = sizeof(uinttype);
3661
3662         ret = ctdb_control(ctdb, destnode, 0, 
3663                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3664                            mem_ctx, &outdata, &res, &timeout, NULL);
3665         if (ret != 0 || res != 0) {
3666                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3667                 return -1;
3668         }
3669
3670         if (outdata.dsize == 0) {
3671                 *script_status = NULL;
3672         } else {
3673                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3674                 talloc_free(outdata.dptr);
3675         }
3676                     
3677         return 0;
3678 }
3679
3680 /*
3681   tell the main daemon how long it took to lock the reclock file
3682  */
3683 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3684 {
3685         int ret;
3686         int32_t res;
3687         TDB_DATA data;
3688
3689         data.dptr = (uint8_t *)&latency;
3690         data.dsize = sizeof(latency);
3691
3692         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3693                            ctdb, NULL, &res, NULL, NULL);
3694         if (ret != 0 || res != 0) {
3695                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3696                 return -1;
3697         }
3698
3699         return 0;
3700 }
3701
3702 /*
3703   get the name of the reclock file
3704  */
3705 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3706                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3707                          const char **name)
3708 {
3709         int ret;
3710         int32_t res;
3711         TDB_DATA data;
3712
3713         ret = ctdb_control(ctdb, destnode, 0, 
3714                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3715                            mem_ctx, &data, &res, &timeout, NULL);
3716         if (ret != 0 || res != 0) {
3717                 return -1;
3718         }
3719
3720         if (data.dsize == 0) {
3721                 *name = NULL;
3722         } else {
3723                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3724         }
3725         talloc_free(data.dptr);
3726
3727         return 0;
3728 }
3729
3730 /*
3731   set the reclock filename for a node
3732  */
3733 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3734 {
3735         int ret;
3736         TDB_DATA data;
3737         int32_t res;
3738
3739         if (reclock == NULL) {
3740                 data.dsize = 0;
3741                 data.dptr  = NULL;
3742         } else {
3743                 data.dsize = strlen(reclock) + 1;
3744                 data.dptr  = discard_const(reclock);
3745         }
3746
3747         ret = ctdb_control(ctdb, destnode, 0, 
3748                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3749                            NULL, NULL, &res, &timeout, NULL);
3750         if (ret != 0 || res != 0) {
3751                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3752                 return -1;
3753         }
3754
3755         return 0;
3756 }
3757
3758 /*
3759   stop a node
3760  */
3761 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3762 {
3763         int ret;
3764         int32_t res;
3765
3766         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3767                            ctdb, NULL, &res, &timeout, NULL);
3768         if (ret != 0 || res != 0) {
3769                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3770                 return -1;
3771         }
3772
3773         return 0;
3774 }
3775
3776 /*
3777   continue a node
3778  */
3779 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3780 {
3781         int ret;
3782
3783         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3784                            ctdb, NULL, NULL, &timeout, NULL);
3785         if (ret != 0) {
3786                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3787                 return -1;
3788         }
3789
3790         return 0;
3791 }
3792
3793 /*
3794   set the natgw state for a node
3795  */
3796 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3797 {
3798         int ret;
3799         TDB_DATA data;
3800         int32_t res;
3801
3802         data.dsize = sizeof(natgwstate);
3803         data.dptr  = (uint8_t *)&natgwstate;
3804
3805         ret = ctdb_control(ctdb, destnode, 0, 
3806                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3807                            NULL, NULL, &res, &timeout, NULL);
3808         if (ret != 0 || res != 0) {
3809                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3810                 return -1;
3811         }
3812
3813         return 0;
3814 }
3815
3816 /*
3817   set the lmaster role for a node
3818  */
3819 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3820 {
3821         int ret;
3822         TDB_DATA data;
3823         int32_t res;
3824
3825         data.dsize = sizeof(lmasterrole);
3826         data.dptr  = (uint8_t *)&lmasterrole;
3827
3828         ret = ctdb_control(ctdb, destnode, 0, 
3829                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3830                            NULL, NULL, &res, &timeout, NULL);
3831         if (ret != 0 || res != 0) {
3832                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3833                 return -1;
3834         }
3835
3836         return 0;
3837 }
3838
3839 /*
3840   set the recmaster role for a node
3841  */
3842 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3843 {
3844         int ret;
3845         TDB_DATA data;
3846         int32_t res;
3847
3848         data.dsize = sizeof(recmasterrole);
3849         data.dptr  = (uint8_t *)&recmasterrole;
3850
3851         ret = ctdb_control(ctdb, destnode, 0, 
3852                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3853                            NULL, NULL, &res, &timeout, NULL);
3854         if (ret != 0 || res != 0) {
3855                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3856                 return -1;
3857         }
3858
3859         return 0;
3860 }
3861
3862 /* enable an eventscript
3863  */
3864 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3865 {
3866         int ret;
3867         TDB_DATA data;
3868         int32_t res;
3869
3870         data.dsize = strlen(script) + 1;
3871         data.dptr  = discard_const(script);
3872
3873         ret = ctdb_control(ctdb, destnode, 0, 
3874                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3875                            NULL, NULL, &res, &timeout, NULL);
3876         if (ret != 0 || res != 0) {
3877                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3878                 return -1;
3879         }
3880
3881         return 0;
3882 }
3883
3884 /* disable an eventscript
3885  */
3886 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3887 {
3888         int ret;
3889         TDB_DATA data;
3890         int32_t res;
3891
3892         data.dsize = strlen(script) + 1;
3893         data.dptr  = discard_const(script);
3894
3895         ret = ctdb_control(ctdb, destnode, 0, 
3896                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3897                            NULL, NULL, &res, &timeout, NULL);
3898         if (ret != 0 || res != 0) {
3899                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3900                 return -1;
3901         }
3902
3903         return 0;
3904 }
3905
3906
3907 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3908 {
3909         int ret;
3910         TDB_DATA data;
3911         int32_t res;
3912
3913         data.dsize = sizeof(*bantime);
3914         data.dptr  = (uint8_t *)bantime;
3915
3916         ret = ctdb_control(ctdb, destnode, 0, 
3917                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3918                            NULL, NULL, &res, &timeout, NULL);
3919         if (ret != 0 || res != 0) {
3920                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3921                 return -1;
3922         }
3923
3924         return 0;
3925 }
3926
3927
3928 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3929 {
3930         int ret;
3931         TDB_DATA outdata;
3932         int32_t res;
3933         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3934
3935         ret = ctdb_control(ctdb, destnode, 0, 
3936                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3937                            tmp_ctx, &outdata, &res, &timeout, NULL);
3938         if (ret != 0 || res != 0) {
3939                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3940                 talloc_free(tmp_ctx);
3941                 return -1;
3942         }
3943
3944         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3945         talloc_free(tmp_ctx);
3946
3947         return 0;
3948 }
3949
3950
3951 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3952 {
3953         int ret;
3954         int32_t res;
3955         TDB_DATA data;
3956         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3957
3958         data.dptr = (uint8_t*)db_prio;
3959         data.dsize = sizeof(*db_prio);
3960
3961         ret = ctdb_control(ctdb, destnode, 0, 
3962                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3963                            tmp_ctx, NULL, &res, &timeout, NULL);
3964         if (ret != 0 || res != 0) {
3965                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3966                 talloc_free(tmp_ctx);
3967                 return -1;
3968         }
3969
3970         talloc_free(tmp_ctx);
3971
3972         return 0;
3973 }
3974
3975 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3976 {
3977         int ret;
3978         int32_t res;
3979         TDB_DATA data;
3980         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3981
3982         data.dptr = (uint8_t*)&db_id;
3983         data.dsize = sizeof(db_id);
3984
3985         ret = ctdb_control(ctdb, destnode, 0, 
3986                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3987                            tmp_ctx, NULL, &res, &timeout, NULL);
3988         if (ret != 0 || res < 0) {
3989                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3990                 talloc_free(tmp_ctx);
3991                 return -1;
3992         }
3993
3994         if (priority) {
3995                 *priority = res;
3996         }
3997
3998         talloc_free(tmp_ctx);
3999
4000         return 0;
4001 }