Add a variable for start/current time to ctdb statistics
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
279                 return -1;
280         }
281
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb);
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return 0;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
372
373         return state;
374 }
375
376 /*
377   make a ctdb call to the local daemon - async send. Called from client context.
378
379   This constructs a ctdb_call request and queues it for processing. 
380   This call never blocks.
381 */
382 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
383                                               struct ctdb_call *call)
384 {
385         struct ctdb_client_call_state *state;
386         struct ctdb_context *ctdb = ctdb_db->ctdb;
387         struct ctdb_ltdb_header header;
388         TDB_DATA data;
389         int ret;
390         size_t len;
391         struct ctdb_req_call *c;
392
393         /* if the domain socket is not yet open, open it */
394         if (ctdb->daemon.sd==-1) {
395                 ctdb_socket_connect(ctdb);
396         }
397
398         ret = ctdb_ltdb_lock(ctdb_db, call->key);
399         if (ret != 0) {
400                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
401                 return NULL;
402         }
403
404         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
405
406         if (ret == 0 && header.dmaster == ctdb->pnn) {
407                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
408                 talloc_free(data.dptr);
409                 ctdb_ltdb_unlock(ctdb_db, call->key);
410                 return state;
411         }
412
413         ctdb_ltdb_unlock(ctdb_db, call->key);
414         talloc_free(data.dptr);
415
416         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
417         if (state == NULL) {
418                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
419                 return NULL;
420         }
421         state->call = talloc_zero(state, struct ctdb_call);
422         if (state->call == NULL) {
423                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
424                 return NULL;
425         }
426
427         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
428         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
429         if (c == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
431                 return NULL;
432         }
433
434         state->reqid     = ctdb_reqid_new(ctdb, state);
435         state->ctdb_db = ctdb_db;
436         talloc_set_destructor(state, ctdb_client_call_destructor);
437
438         c->hdr.reqid     = state->reqid;
439         c->flags         = call->flags;
440         c->db_id         = ctdb_db->db_id;
441         c->callid        = call->call_id;
442         c->hopcount      = 0;
443         c->keylen        = call->key.dsize;
444         c->calldatalen   = call->call_data.dsize;
445         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
446         memcpy(&c->data[call->key.dsize], 
447                call->call_data.dptr, call->call_data.dsize);
448         *(state->call)              = *call;
449         state->call->call_data.dptr = &c->data[call->key.dsize];
450         state->call->key.dptr       = &c->data[0];
451
452         state->state  = CTDB_CALL_WAIT;
453
454
455         ctdb_client_queue_pkt(ctdb, &c->hdr);
456
457         return state;
458 }
459
460
461 /*
462   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
463 */
464 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
465 {
466         struct ctdb_client_call_state *state;
467
468         state = ctdb_call_send(ctdb_db, call);
469         return ctdb_call_recv(state, call);
470 }
471
472
473 /*
474   tell the daemon what messaging srvid we will use, and register the message
475   handler function in the client
476 */
477 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
478                              ctdb_msg_fn_t handler,
479                              void *private_data)
480                                     
481 {
482         int res;
483         int32_t status;
484         
485         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
486                            tdb_null, NULL, NULL, &status, NULL, NULL);
487         if (res != 0 || status != 0) {
488                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
489                 return -1;
490         }
491
492         /* also need to register the handler with our own ctdb structure */
493         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
494 }
495
496 /*
497   tell the daemon we no longer want a srvid
498 */
499 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         ctdb_deregister_message_handler(ctdb, srvid, private_data);
513         return 0;
514 }
515
516
517 /*
518   send a message - from client context
519  */
520 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
521                       uint64_t srvid, TDB_DATA data)
522 {
523         struct ctdb_req_message *r;
524         int len, res;
525
526         len = offsetof(struct ctdb_req_message, data) + data.dsize;
527         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
528                                len, struct ctdb_req_message);
529         CTDB_NO_MEMORY(ctdb, r);
530
531         r->hdr.destnode  = pnn;
532         r->srvid         = srvid;
533         r->datalen       = data.dsize;
534         memcpy(&r->data[0], data.dptr, data.dsize);
535         
536         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
537         if (res != 0) {
538                 return res;
539         }
540
541         talloc_free(r);
542         return 0;
543 }
544
545
546 /*
547   cancel a ctdb_fetch_lock operation, releasing the lock
548  */
549 static int fetch_lock_destructor(struct ctdb_record_handle *h)
550 {
551         ctdb_ltdb_unlock(h->ctdb_db, h->key);
552         return 0;
553 }
554
555 /*
556   force the migration of a record to this node
557  */
558 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
559 {
560         struct ctdb_call call;
561         ZERO_STRUCT(call);
562         call.call_id = CTDB_NULL_FUNC;
563         call.key = key;
564         call.flags = CTDB_IMMEDIATE_MIGRATION;
565         return ctdb_call(ctdb_db, &call);
566 }
567
568 /*
569   get a lock on a record, and return the records data. Blocks until it gets the lock
570  */
571 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
572                                            TDB_DATA key, TDB_DATA *data)
573 {
574         int ret;
575         struct ctdb_record_handle *h;
576
577         /*
578           procedure is as follows:
579
580           1) get the chain lock. 
581           2) check if we are dmaster
582           3) if we are the dmaster then return handle 
583           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
584              reply from ctdbd
585           5) when we get the reply, goto (1)
586          */
587
588         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
589         if (h == NULL) {
590                 return NULL;
591         }
592
593         h->ctdb_db = ctdb_db;
594         h->key     = key;
595         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
596         if (h->key.dptr == NULL) {
597                 talloc_free(h);
598                 return NULL;
599         }
600         h->data    = data;
601
602         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
603                  (const char *)key.dptr));
604
605 again:
606         /* step 1 - get the chain lock */
607         ret = ctdb_ltdb_lock(ctdb_db, key);
608         if (ret != 0) {
609                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
610                 talloc_free(h);
611                 return NULL;
612         }
613
614         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
615
616         talloc_set_destructor(h, fetch_lock_destructor);
617
618         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
619
620         /* when torturing, ensure we test the remote path */
621         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
622             random() % 5 == 0) {
623                 h->header.dmaster = (uint32_t)-1;
624         }
625
626
627         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
628
629         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
630                 ctdb_ltdb_unlock(ctdb_db, key);
631                 ret = ctdb_client_force_migration(ctdb_db, key);
632                 if (ret != 0) {
633                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
634                         talloc_free(h);
635                         return NULL;
636                 }
637                 goto again;
638         }
639
640         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
641         return h;
642 }
643
644 /*
645   store some data to the record that was locked with ctdb_fetch_lock()
646 */
647 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
648 {
649         if (h->ctdb_db->persistent) {
650                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
651                 return -1;
652         }
653
654         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
655 }
656
657 /*
658   non-locking fetch of a record
659  */
660 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
661                TDB_DATA key, TDB_DATA *data)
662 {
663         struct ctdb_call call;
664         int ret;
665
666         call.call_id = CTDB_FETCH_FUNC;
667         call.call_data.dptr = NULL;
668         call.call_data.dsize = 0;
669
670         ret = ctdb_call(ctdb_db, &call);
671
672         if (ret == 0) {
673                 *data = call.reply_data;
674                 talloc_steal(mem_ctx, data->dptr);
675         }
676
677         return ret;
678 }
679
680
681
682 /*
683    called when a control completes or timesout to invoke the callback
684    function the user provided
685 */
686 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
687         struct timeval t, void *private_data)
688 {
689         struct ctdb_client_control_state *state;
690         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
691         int ret;
692
693         state = talloc_get_type(private_data, struct ctdb_client_control_state);
694         talloc_steal(tmp_ctx, state);
695
696         ret = ctdb_control_recv(state->ctdb, state, state,
697                         NULL, 
698                         NULL, 
699                         NULL);
700
701         talloc_free(tmp_ctx);
702 }
703
704 /*
705   called when a CTDB_REPLY_CONTROL packet comes in in the client
706
707   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
708   contains any reply data from the control
709 */
710 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
711                                       struct ctdb_req_header *hdr)
712 {
713         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
714         struct ctdb_client_control_state *state;
715
716         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
717         if (state == NULL) {
718                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
719                 return;
720         }
721
722         if (hdr->reqid != state->reqid) {
723                 /* we found a record  but it was the wrong one */
724                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
725                 return;
726         }
727
728         state->outdata.dptr = c->data;
729         state->outdata.dsize = c->datalen;
730         state->status = c->status;
731         if (c->errorlen) {
732                 state->errormsg = talloc_strndup(state, 
733                                                  (char *)&c->data[c->datalen], 
734                                                  c->errorlen);
735         }
736
737         /* state->outdata now uses resources from c so we dont want c
738            to just dissappear from under us while state is still alive
739         */
740         talloc_steal(state, c);
741
742         state->state = CTDB_CONTROL_DONE;
743
744         /* if we had a callback registered for this control, pull the response
745            and call the callback.
746         */
747         if (state->async.fn) {
748                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
749         }
750 }
751
752
753 /*
754   destroy a ctdb_control in client
755 */
756 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
757 {
758         ctdb_reqid_remove(state->ctdb, state->reqid);
759         return 0;
760 }
761
762
763 /* time out handler for ctdb_control */
764 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
765         struct timeval t, void *private_data)
766 {
767         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
768
769         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
770                          "dstnode:%u\n", state->reqid, state->c->opcode,
771                          state->c->hdr.destnode));
772
773         state->state = CTDB_CONTROL_TIMEOUT;
774
775         /* if we had a callback registered for this control, pull the response
776            and call the callback.
777         */
778         if (state->async.fn) {
779                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
780         }
781 }
782
783 /* async version of send control request */
784 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
785                 uint32_t destnode, uint64_t srvid, 
786                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
787                 TALLOC_CTX *mem_ctx,
788                 struct timeval *timeout,
789                 char **errormsg)
790 {
791         struct ctdb_client_control_state *state;
792         size_t len;
793         struct ctdb_req_control *c;
794         int ret;
795
796         if (errormsg) {
797                 *errormsg = NULL;
798         }
799
800         /* if the domain socket is not yet open, open it */
801         if (ctdb->daemon.sd==-1) {
802                 ctdb_socket_connect(ctdb);
803         }
804
805         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
806         CTDB_NO_MEMORY_NULL(ctdb, state);
807
808         state->ctdb       = ctdb;
809         state->reqid      = ctdb_reqid_new(ctdb, state);
810         state->state      = CTDB_CONTROL_WAIT;
811         state->errormsg   = NULL;
812
813         talloc_set_destructor(state, ctdb_control_destructor);
814
815         len = offsetof(struct ctdb_req_control, data) + data.dsize;
816         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
817                                len, struct ctdb_req_control);
818         state->c            = c;        
819         CTDB_NO_MEMORY_NULL(ctdb, c);
820         c->hdr.reqid        = state->reqid;
821         c->hdr.destnode     = destnode;
822         c->opcode           = opcode;
823         c->client_id        = 0;
824         c->flags            = flags;
825         c->srvid            = srvid;
826         c->datalen          = data.dsize;
827         if (data.dsize) {
828                 memcpy(&c->data[0], data.dptr, data.dsize);
829         }
830
831         /* timeout */
832         if (timeout && !timeval_is_zero(timeout)) {
833                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
834         }
835
836         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
837         if (ret != 0) {
838                 talloc_free(state);
839                 return NULL;
840         }
841
842         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
843                 talloc_free(state);
844                 return NULL;
845         }
846
847         return state;
848 }
849
850
851 /* async version of receive control reply */
852 int ctdb_control_recv(struct ctdb_context *ctdb, 
853                 struct ctdb_client_control_state *state, 
854                 TALLOC_CTX *mem_ctx,
855                 TDB_DATA *outdata, int32_t *status, char **errormsg)
856 {
857         TALLOC_CTX *tmp_ctx;
858
859         if (status != NULL) {
860                 *status = -1;
861         }
862         if (errormsg != NULL) {
863                 *errormsg = NULL;
864         }
865
866         if (state == NULL) {
867                 return -1;
868         }
869
870         /* prevent double free of state */
871         tmp_ctx = talloc_new(ctdb);
872         talloc_steal(tmp_ctx, state);
873
874         /* loop one event at a time until we either timeout or the control
875            completes.
876         */
877         while (state->state == CTDB_CONTROL_WAIT) {
878                 event_loop_once(ctdb->ev);
879         }
880
881         if (state->state != CTDB_CONTROL_DONE) {
882                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
883                 if (state->async.fn) {
884                         state->async.fn(state);
885                 }
886                 talloc_free(tmp_ctx);
887                 return -1;
888         }
889
890         if (state->errormsg) {
891                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
892                 if (errormsg) {
893                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
894                 }
895                 if (state->async.fn) {
896                         state->async.fn(state);
897                 }
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (outdata) {
903                 *outdata = state->outdata;
904                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
905         }
906
907         if (status) {
908                 *status = state->status;
909         }
910
911         if (state->async.fn) {
912                 state->async.fn(state);
913         }
914
915         talloc_free(tmp_ctx);
916         return 0;
917 }
918
919
920
921 /*
922   send a ctdb control message
923   timeout specifies how long we should wait for a reply.
924   if timeout is NULL we wait indefinitely
925  */
926 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
927                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
928                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
929                  struct timeval *timeout,
930                  char **errormsg)
931 {
932         struct ctdb_client_control_state *state;
933
934         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
935                         flags, data, mem_ctx,
936                         timeout, errormsg);
937         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
938                         errormsg);
939 }
940
941
942
943
944 /*
945   a process exists call. Returns 0 if process exists, -1 otherwise
946  */
947 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
948 {
949         int ret;
950         TDB_DATA data;
951         int32_t status;
952
953         data.dptr = (uint8_t*)&pid;
954         data.dsize = sizeof(pid);
955
956         ret = ctdb_control(ctdb, destnode, 0, 
957                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
958                            NULL, NULL, &status, NULL, NULL);
959         if (ret != 0) {
960                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
961                 return -1;
962         }
963
964         return status;
965 }
966
967 /*
968   get remote statistics
969  */
970 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
971 {
972         int ret;
973         TDB_DATA data;
974         int32_t res;
975
976         ret = ctdb_control(ctdb, destnode, 0, 
977                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
978                            ctdb, &data, &res, NULL, NULL);
979         if (ret != 0 || res != 0) {
980                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
981                 return -1;
982         }
983
984         if (data.dsize != sizeof(struct ctdb_statistics)) {
985                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
986                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
987                       return -1;
988         }
989
990         *status = *(struct ctdb_statistics *)data.dptr;
991         talloc_free(data.dptr);
992                         
993         return 0;
994 }
995
996 /*
997   shutdown a remote ctdb node
998  */
999 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1000 {
1001         struct ctdb_client_control_state *state;
1002
1003         state = ctdb_control_send(ctdb, destnode, 0, 
1004                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1005                            NULL, &timeout, NULL);
1006         if (state == NULL) {
1007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1008                 return -1;
1009         }
1010
1011         return 0;
1012 }
1013
1014 /*
1015   get vnn map from a remote node
1016  */
1017 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1018 {
1019         int ret;
1020         TDB_DATA outdata;
1021         int32_t res;
1022         struct ctdb_vnn_map_wire *map;
1023
1024         ret = ctdb_control(ctdb, destnode, 0, 
1025                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1026                            mem_ctx, &outdata, &res, &timeout, NULL);
1027         if (ret != 0 || res != 0) {
1028                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1029                 return -1;
1030         }
1031         
1032         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1033         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1034             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1035                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1036                 return -1;
1037         }
1038
1039         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1040         CTDB_NO_MEMORY(ctdb, *vnnmap);
1041         (*vnnmap)->generation = map->generation;
1042         (*vnnmap)->size       = map->size;
1043         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1044
1045         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1046         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1047         talloc_free(outdata.dptr);
1048                     
1049         return 0;
1050 }
1051
1052
1053 /*
1054   get the recovery mode of a remote node
1055  */
1056 struct ctdb_client_control_state *
1057 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1058 {
1059         return ctdb_control_send(ctdb, destnode, 0, 
1060                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1061                            mem_ctx, &timeout, NULL);
1062 }
1063
1064 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1065 {
1066         int ret;
1067         int32_t res;
1068
1069         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1070         if (ret != 0) {
1071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1072                 return -1;
1073         }
1074
1075         if (recmode) {
1076                 *recmode = (uint32_t)res;
1077         }
1078
1079         return 0;
1080 }
1081
1082 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1083 {
1084         struct ctdb_client_control_state *state;
1085
1086         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1087         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1088 }
1089
1090
1091
1092
1093 /*
1094   set the recovery mode of a remote node
1095  */
1096 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1097 {
1098         int ret;
1099         TDB_DATA data;
1100         int32_t res;
1101
1102         data.dsize = sizeof(uint32_t);
1103         data.dptr = (unsigned char *)&recmode;
1104
1105         ret = ctdb_control(ctdb, destnode, 0, 
1106                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1107                            NULL, NULL, &res, &timeout, NULL);
1108         if (ret != 0 || res != 0) {
1109                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1110                 return -1;
1111         }
1112
1113         return 0;
1114 }
1115
1116
1117
1118 /*
1119   get the recovery master of a remote node
1120  */
1121 struct ctdb_client_control_state *
1122 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1123                         struct timeval timeout, uint32_t destnode)
1124 {
1125         return ctdb_control_send(ctdb, destnode, 0, 
1126                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1127                            mem_ctx, &timeout, NULL);
1128 }
1129
1130 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1131 {
1132         int ret;
1133         int32_t res;
1134
1135         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1136         if (ret != 0) {
1137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1138                 return -1;
1139         }
1140
1141         if (recmaster) {
1142                 *recmaster = (uint32_t)res;
1143         }
1144
1145         return 0;
1146 }
1147
1148 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1149 {
1150         struct ctdb_client_control_state *state;
1151
1152         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1153         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1154 }
1155
1156
1157 /*
1158   set the recovery master of a remote node
1159  */
1160 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1161 {
1162         int ret;
1163         TDB_DATA data;
1164         int32_t res;
1165
1166         ZERO_STRUCT(data);
1167         data.dsize = sizeof(uint32_t);
1168         data.dptr = (unsigned char *)&recmaster;
1169
1170         ret = ctdb_control(ctdb, destnode, 0, 
1171                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1172                            NULL, NULL, &res, &timeout, NULL);
1173         if (ret != 0 || res != 0) {
1174                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1175                 return -1;
1176         }
1177
1178         return 0;
1179 }
1180
1181
1182 /*
1183   get a list of databases off a remote node
1184  */
1185 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1186                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1187 {
1188         int ret;
1189         TDB_DATA outdata;
1190         int32_t res;
1191
1192         ret = ctdb_control(ctdb, destnode, 0, 
1193                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1194                            mem_ctx, &outdata, &res, &timeout, NULL);
1195         if (ret != 0 || res != 0) {
1196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1197                 return -1;
1198         }
1199
1200         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1201         talloc_free(outdata.dptr);
1202                     
1203         return 0;
1204 }
1205
1206 /*
1207   get a list of nodes (vnn and flags ) from a remote node
1208  */
1209 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1210                 struct timeval timeout, uint32_t destnode, 
1211                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1212 {
1213         int ret;
1214         TDB_DATA outdata;
1215         int32_t res;
1216
1217         ret = ctdb_control(ctdb, destnode, 0, 
1218                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1219                            mem_ctx, &outdata, &res, &timeout, NULL);
1220         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1222                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1223         }
1224         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1226                 return -1;
1227         }
1228
1229         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1230         talloc_free(outdata.dptr);
1231                     
1232         return 0;
1233 }
1234
1235 /*
1236   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1237  */
1238 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1239                 struct timeval timeout, uint32_t destnode, 
1240                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1241 {
1242         int ret, i, len;
1243         TDB_DATA outdata;
1244         struct ctdb_node_mapv4 *nodemapv4;
1245         int32_t res;
1246
1247         ret = ctdb_control(ctdb, destnode, 0, 
1248                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1249                            mem_ctx, &outdata, &res, &timeout, NULL);
1250         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1251                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1252                 return -1;
1253         }
1254
1255         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1256
1257         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1258         (*nodemap) = talloc_zero_size(mem_ctx, len);
1259         CTDB_NO_MEMORY(ctdb, (*nodemap));
1260
1261         (*nodemap)->num = nodemapv4->num;
1262         for (i=0; i<nodemapv4->num; i++) {
1263                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1264                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1265                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1266                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1267         }
1268                 
1269         talloc_free(outdata.dptr);
1270                     
1271         return 0;
1272 }
1273
1274 /*
1275   drop the transport, reload the nodes file and restart the transport
1276  */
1277 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1278                     struct timeval timeout, uint32_t destnode)
1279 {
1280         int ret;
1281         int32_t res;
1282
1283         ret = ctdb_control(ctdb, destnode, 0, 
1284                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1285                            NULL, NULL, &res, &timeout, NULL);
1286         if (ret != 0 || res != 0) {
1287                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1288                 return -1;
1289         }
1290
1291         return 0;
1292 }
1293
1294
1295 /*
1296   set vnn map on a node
1297  */
1298 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1299                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1300 {
1301         int ret;
1302         TDB_DATA data;
1303         int32_t res;
1304         struct ctdb_vnn_map_wire *map;
1305         size_t len;
1306
1307         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1308         map = talloc_size(mem_ctx, len);
1309         CTDB_NO_MEMORY(ctdb, map);
1310
1311         map->generation = vnnmap->generation;
1312         map->size = vnnmap->size;
1313         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1314         
1315         data.dsize = len;
1316         data.dptr  = (uint8_t *)map;
1317
1318         ret = ctdb_control(ctdb, destnode, 0, 
1319                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1320                            NULL, NULL, &res, &timeout, NULL);
1321         if (ret != 0 || res != 0) {
1322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1323                 return -1;
1324         }
1325
1326         talloc_free(map);
1327
1328         return 0;
1329 }
1330
1331
1332 /*
1333   async send for pull database
1334  */
1335 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1336         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1337         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1338 {
1339         TDB_DATA indata;
1340         struct ctdb_control_pulldb *pull;
1341         struct ctdb_client_control_state *state;
1342
1343         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1344         CTDB_NO_MEMORY_NULL(ctdb, pull);
1345
1346         pull->db_id   = dbid;
1347         pull->lmaster = lmaster;
1348
1349         indata.dsize = sizeof(struct ctdb_control_pulldb);
1350         indata.dptr  = (unsigned char *)pull;
1351
1352         state = ctdb_control_send(ctdb, destnode, 0, 
1353                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1354                                   mem_ctx, &timeout, NULL);
1355         talloc_free(pull);
1356
1357         return state;
1358 }
1359
1360 /*
1361   async recv for pull database
1362  */
1363 int ctdb_ctrl_pulldb_recv(
1364         struct ctdb_context *ctdb, 
1365         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1366         TDB_DATA *outdata)
1367 {
1368         int ret;
1369         int32_t res;
1370
1371         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1372         if ( (ret != 0) || (res != 0) ){
1373                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1374                 return -1;
1375         }
1376
1377         return 0;
1378 }
1379
1380 /*
1381   pull all keys and records for a specific database on a node
1382  */
1383 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1384                 uint32_t dbid, uint32_t lmaster, 
1385                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1386                 TDB_DATA *outdata)
1387 {
1388         struct ctdb_client_control_state *state;
1389
1390         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1391                                       timeout);
1392         
1393         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1394 }
1395
1396
1397 /*
1398   change dmaster for all keys in the database to the new value
1399  */
1400 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1401                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1402 {
1403         int ret;
1404         TDB_DATA indata;
1405         int32_t res;
1406
1407         indata.dsize = 2*sizeof(uint32_t);
1408         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1409
1410         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1411         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424 /*
1425   ping a node, return number of clients connected
1426  */
1427 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1428 {
1429         int ret;
1430         int32_t res;
1431
1432         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1433                            tdb_null, NULL, NULL, &res, NULL, NULL);
1434         if (ret != 0) {
1435                 return -1;
1436         }
1437         return res;
1438 }
1439
1440 /*
1441   find the real path to a ltdb 
1442  */
1443 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1444                    const char **path)
1445 {
1446         int ret;
1447         int32_t res;
1448         TDB_DATA data;
1449
1450         data.dptr = (uint8_t *)&dbid;
1451         data.dsize = sizeof(dbid);
1452
1453         ret = ctdb_control(ctdb, destnode, 0, 
1454                            CTDB_CONTROL_GETDBPATH, 0, data, 
1455                            mem_ctx, &data, &res, &timeout, NULL);
1456         if (ret != 0 || res != 0) {
1457                 return -1;
1458         }
1459
1460         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1461         if ((*path) == NULL) {
1462                 return -1;
1463         }
1464
1465         talloc_free(data.dptr);
1466
1467         return 0;
1468 }
1469
1470 /*
1471   find the name of a db 
1472  */
1473 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1474                    const char **name)
1475 {
1476         int ret;
1477         int32_t res;
1478         TDB_DATA data;
1479
1480         data.dptr = (uint8_t *)&dbid;
1481         data.dsize = sizeof(dbid);
1482
1483         ret = ctdb_control(ctdb, destnode, 0, 
1484                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1485                            mem_ctx, &data, &res, &timeout, NULL);
1486         if (ret != 0 || res != 0) {
1487                 return -1;
1488         }
1489
1490         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1491         if ((*name) == NULL) {
1492                 return -1;
1493         }
1494
1495         talloc_free(data.dptr);
1496
1497         return 0;
1498 }
1499
1500 /*
1501   get the health status of a db
1502  */
1503 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1504                           struct timeval timeout,
1505                           uint32_t destnode,
1506                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1507                           const char **reason)
1508 {
1509         int ret;
1510         int32_t res;
1511         TDB_DATA data;
1512
1513         data.dptr = (uint8_t *)&dbid;
1514         data.dsize = sizeof(dbid);
1515
1516         ret = ctdb_control(ctdb, destnode, 0,
1517                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1518                            mem_ctx, &data, &res, &timeout, NULL);
1519         if (ret != 0 || res != 0) {
1520                 return -1;
1521         }
1522
1523         if (data.dsize == 0) {
1524                 (*reason) = NULL;
1525                 return 0;
1526         }
1527
1528         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1529         if ((*reason) == NULL) {
1530                 return -1;
1531         }
1532
1533         talloc_free(data.dptr);
1534
1535         return 0;
1536 }
1537
1538 /*
1539   create a database
1540  */
1541 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1542                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1543 {
1544         int ret;
1545         int32_t res;
1546         TDB_DATA data;
1547
1548         data.dptr = discard_const(name);
1549         data.dsize = strlen(name)+1;
1550
1551         ret = ctdb_control(ctdb, destnode, 0, 
1552                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1553                            0, data, 
1554                            mem_ctx, &data, &res, &timeout, NULL);
1555
1556         if (ret != 0 || res != 0) {
1557                 return -1;
1558         }
1559
1560         return 0;
1561 }
1562
1563 /*
1564   get debug level on a node
1565  */
1566 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1567 {
1568         int ret;
1569         int32_t res;
1570         TDB_DATA data;
1571
1572         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1573                            ctdb, &data, &res, NULL, NULL);
1574         if (ret != 0 || res != 0) {
1575                 return -1;
1576         }
1577         if (data.dsize != sizeof(int32_t)) {
1578                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1579                          (unsigned)data.dsize));
1580                 return -1;
1581         }
1582         *level = *(int32_t *)data.dptr;
1583         talloc_free(data.dptr);
1584         return 0;
1585 }
1586
1587 /*
1588   set debug level on a node
1589  */
1590 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1591 {
1592         int ret;
1593         int32_t res;
1594         TDB_DATA data;
1595
1596         data.dptr = (uint8_t *)&level;
1597         data.dsize = sizeof(level);
1598
1599         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1600                            NULL, NULL, &res, NULL, NULL);
1601         if (ret != 0 || res != 0) {
1602                 return -1;
1603         }
1604         return 0;
1605 }
1606
1607
1608 /*
1609   get a list of connected nodes
1610  */
1611 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1612                                 struct timeval timeout,
1613                                 TALLOC_CTX *mem_ctx,
1614                                 uint32_t *num_nodes)
1615 {
1616         struct ctdb_node_map *map=NULL;
1617         int ret, i;
1618         uint32_t *nodes;
1619
1620         *num_nodes = 0;
1621
1622         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1623         if (ret != 0) {
1624                 return NULL;
1625         }
1626
1627         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1628         if (nodes == NULL) {
1629                 return NULL;
1630         }
1631
1632         for (i=0;i<map->num;i++) {
1633                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1634                         nodes[*num_nodes] = map->nodes[i].pnn;
1635                         (*num_nodes)++;
1636                 }
1637         }
1638
1639         return nodes;
1640 }
1641
1642
1643 /*
1644   reset remote status
1645  */
1646 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1647 {
1648         int ret;
1649         int32_t res;
1650
1651         ret = ctdb_control(ctdb, destnode, 0, 
1652                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1653                            NULL, NULL, &res, NULL, NULL);
1654         if (ret != 0 || res != 0) {
1655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1656                 return -1;
1657         }
1658         return 0;
1659 }
1660
1661 /*
1662   this is the dummy null procedure that all databases support
1663 */
1664 static int ctdb_null_func(struct ctdb_call_info *call)
1665 {
1666         return 0;
1667 }
1668
1669 /*
1670   this is a plain fetch procedure that all databases support
1671 */
1672 static int ctdb_fetch_func(struct ctdb_call_info *call)
1673 {
1674         call->reply_data = &call->record_data;
1675         return 0;
1676 }
1677
1678 /*
1679   attach to a specific database - client call
1680 */
1681 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1682 {
1683         struct ctdb_db_context *ctdb_db;
1684         TDB_DATA data;
1685         int ret;
1686         int32_t res;
1687
1688         ctdb_db = ctdb_db_handle(ctdb, name);
1689         if (ctdb_db) {
1690                 return ctdb_db;
1691         }
1692
1693         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1694         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1695
1696         ctdb_db->ctdb = ctdb;
1697         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1698         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1699
1700         data.dptr = discard_const(name);
1701         data.dsize = strlen(name)+1;
1702
1703         /* tell ctdb daemon to attach */
1704         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1705                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1706                            0, data, ctdb_db, &data, &res, NULL, NULL);
1707         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1708                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1709                 talloc_free(ctdb_db);
1710                 return NULL;
1711         }
1712         
1713         ctdb_db->db_id = *(uint32_t *)data.dptr;
1714         talloc_free(data.dptr);
1715
1716         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1717         if (ret != 0) {
1718                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1719                 talloc_free(ctdb_db);
1720                 return NULL;
1721         }
1722
1723         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1724         if (ctdb->valgrinding) {
1725                 tdb_flags |= TDB_NOMMAP;
1726         }
1727         tdb_flags |= TDB_DISALLOW_NESTING;
1728
1729         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1730         if (ctdb_db->ltdb == NULL) {
1731                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1732                 talloc_free(ctdb_db);
1733                 return NULL;
1734         }
1735
1736         ctdb_db->persistent = persistent;
1737
1738         DLIST_ADD(ctdb->db_list, ctdb_db);
1739
1740         /* add well known functions */
1741         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1742         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1743
1744         return ctdb_db;
1745 }
1746
1747
1748 /*
1749   setup a call for a database
1750  */
1751 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1752 {
1753         struct ctdb_registered_call *call;
1754
1755 #if 0
1756         TDB_DATA data;
1757         int32_t status;
1758         struct ctdb_control_set_call c;
1759         int ret;
1760
1761         /* this is no longer valid with the separate daemon architecture */
1762         c.db_id = ctdb_db->db_id;
1763         c.fn    = fn;
1764         c.id    = id;
1765
1766         data.dptr = (uint8_t *)&c;
1767         data.dsize = sizeof(c);
1768
1769         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1770                            data, NULL, NULL, &status, NULL, NULL);
1771         if (ret != 0 || status != 0) {
1772                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1773                 return -1;
1774         }
1775 #endif
1776
1777         /* also register locally */
1778         call = talloc(ctdb_db, struct ctdb_registered_call);
1779         call->fn = fn;
1780         call->id = id;
1781
1782         DLIST_ADD(ctdb_db->calls, call);        
1783         return 0;
1784 }
1785
1786
1787 struct traverse_state {
1788         bool done;
1789         uint32_t count;
1790         ctdb_traverse_func fn;
1791         void *private_data;
1792 };
1793
1794 /*
1795   called on each key during a ctdb_traverse
1796  */
1797 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1798 {
1799         struct traverse_state *state = (struct traverse_state *)p;
1800         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1801         TDB_DATA key;
1802
1803         if (data.dsize < sizeof(uint32_t) ||
1804             d->length != data.dsize) {
1805                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1806                 state->done = True;
1807                 return;
1808         }
1809
1810         key.dsize = d->keylen;
1811         key.dptr  = &d->data[0];
1812         data.dsize = d->datalen;
1813         data.dptr = &d->data[d->keylen];
1814
1815         if (key.dsize == 0 && data.dsize == 0) {
1816                 /* end of traverse */
1817                 state->done = True;
1818                 return;
1819         }
1820
1821         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1822                 /* empty records are deleted records in ctdb */
1823                 return;
1824         }
1825
1826         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1827                 state->done = True;
1828         }
1829
1830         state->count++;
1831 }
1832
1833
1834 /*
1835   start a cluster wide traverse, calling the supplied fn on each record
1836   return the number of records traversed, or -1 on error
1837  */
1838 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1839 {
1840         TDB_DATA data;
1841         struct ctdb_traverse_start t;
1842         int32_t status;
1843         int ret;
1844         uint64_t srvid = (getpid() | 0xFLL<<60);
1845         struct traverse_state state;
1846
1847         state.done = False;
1848         state.count = 0;
1849         state.private_data = private_data;
1850         state.fn = fn;
1851
1852         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1855                 return -1;
1856         }
1857
1858         t.db_id = ctdb_db->db_id;
1859         t.srvid = srvid;
1860         t.reqid = 0;
1861
1862         data.dptr = (uint8_t *)&t;
1863         data.dsize = sizeof(t);
1864
1865         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1866                            data, NULL, NULL, &status, NULL, NULL);
1867         if (ret != 0 || status != 0) {
1868                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1869                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1870                 return -1;
1871         }
1872
1873         while (!state.done) {
1874                 event_loop_once(ctdb_db->ctdb->ev);
1875         }
1876
1877         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1878         if (ret != 0) {
1879                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1880                 return -1;
1881         }
1882
1883         return state.count;
1884 }
1885
1886 #define ISASCII(x) ((x>31)&&(x<128))
1887 /*
1888   called on each key during a catdb
1889  */
1890 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1891 {
1892         int i;
1893         FILE *f = (FILE *)p;
1894         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1895
1896         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1897         for (i=0;i<key.dsize;i++) {
1898                 if (ISASCII(key.dptr[i])) {
1899                         fprintf(f, "%c", key.dptr[i]);
1900                 } else {
1901                         fprintf(f, "\\%02X", key.dptr[i]);
1902                 }
1903         }
1904         fprintf(f, "\"\n");
1905
1906         fprintf(f, "dmaster: %u\n", h->dmaster);
1907         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1908
1909         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1910         for (i=sizeof(*h);i<data.dsize;i++) {
1911                 if (ISASCII(data.dptr[i])) {
1912                         fprintf(f, "%c", data.dptr[i]);
1913                 } else {
1914                         fprintf(f, "\\%02X", data.dptr[i]);
1915                 }
1916         }
1917         fprintf(f, "\"\n");
1918
1919         fprintf(f, "\n");
1920
1921         return 0;
1922 }
1923
1924 /*
1925   convenience function to list all keys to stdout
1926  */
1927 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1928 {
1929         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1930 }
1931
1932 /*
1933   get the pid of a ctdb daemon
1934  */
1935 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1936 {
1937         int ret;
1938         int32_t res;
1939
1940         ret = ctdb_control(ctdb, destnode, 0, 
1941                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1942                            NULL, NULL, &res, &timeout, NULL);
1943         if (ret != 0) {
1944                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1945                 return -1;
1946         }
1947
1948         *pid = res;
1949
1950         return 0;
1951 }
1952
1953
1954 /*
1955   async freeze send control
1956  */
1957 struct ctdb_client_control_state *
1958 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1959 {
1960         return ctdb_control_send(ctdb, destnode, priority, 
1961                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1962                            mem_ctx, &timeout, NULL);
1963 }
1964
1965 /* 
1966    async freeze recv control
1967 */
1968 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1969 {
1970         int ret;
1971         int32_t res;
1972
1973         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1974         if ( (ret != 0) || (res != 0) ){
1975                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1976                 return -1;
1977         }
1978
1979         return 0;
1980 }
1981
1982 /*
1983   freeze databases of a certain priority
1984  */
1985 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1986 {
1987         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1988         struct ctdb_client_control_state *state;
1989         int ret;
1990
1991         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1992         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1993         talloc_free(tmp_ctx);
1994
1995         return ret;
1996 }
1997
1998 /* Freeze all databases */
1999 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2000 {
2001         int i;
2002
2003         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2004                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2005                         return -1;
2006                 }
2007         }
2008         return 0;
2009 }
2010
2011 /*
2012   thaw databases of a certain priority
2013  */
2014 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2015 {
2016         int ret;
2017         int32_t res;
2018
2019         ret = ctdb_control(ctdb, destnode, priority, 
2020                            CTDB_CONTROL_THAW, 0, tdb_null, 
2021                            NULL, NULL, &res, &timeout, NULL);
2022         if (ret != 0 || res != 0) {
2023                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2024                 return -1;
2025         }
2026
2027         return 0;
2028 }
2029
2030 /* thaw all databases */
2031 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2032 {
2033         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2034 }
2035
2036 /*
2037   get pnn of a node, or -1
2038  */
2039 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2040 {
2041         int ret;
2042         int32_t res;
2043
2044         ret = ctdb_control(ctdb, destnode, 0, 
2045                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2046                            NULL, NULL, &res, &timeout, NULL);
2047         if (ret != 0) {
2048                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2049                 return -1;
2050         }
2051
2052         return res;
2053 }
2054
2055 /*
2056   get the monitoring mode of a remote node
2057  */
2058 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2059 {
2060         int ret;
2061         int32_t res;
2062
2063         ret = ctdb_control(ctdb, destnode, 0, 
2064                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2065                            NULL, NULL, &res, &timeout, NULL);
2066         if (ret != 0) {
2067                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2068                 return -1;
2069         }
2070
2071         *monmode = res;
2072
2073         return 0;
2074 }
2075
2076
2077 /*
2078  set the monitoring mode of a remote node to active
2079  */
2080 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2081 {
2082         int ret;
2083         
2084
2085         ret = ctdb_control(ctdb, destnode, 0, 
2086                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2087                            NULL, NULL,NULL, &timeout, NULL);
2088         if (ret != 0) {
2089                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2090                 return -1;
2091         }
2092
2093         
2094
2095         return 0;
2096 }
2097
2098 /*
2099   set the monitoring mode of a remote node to disable
2100  */
2101 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2102 {
2103         int ret;
2104         
2105
2106         ret = ctdb_control(ctdb, destnode, 0, 
2107                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2108                            NULL, NULL, NULL, &timeout, NULL);
2109         if (ret != 0) {
2110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2111                 return -1;
2112         }
2113
2114         
2115
2116         return 0;
2117 }
2118
2119
2120
2121 /* 
2122   sent to a node to make it take over an ip address
2123 */
2124 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2125                           uint32_t destnode, struct ctdb_public_ip *ip)
2126 {
2127         TDB_DATA data;
2128         struct ctdb_public_ipv4 ipv4;
2129         int ret;
2130         int32_t res;
2131
2132         if (ip->addr.sa.sa_family == AF_INET) {
2133                 ipv4.pnn = ip->pnn;
2134                 ipv4.sin = ip->addr.ip;
2135
2136                 data.dsize = sizeof(ipv4);
2137                 data.dptr  = (uint8_t *)&ipv4;
2138
2139                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2140                            NULL, &res, &timeout, NULL);
2141         } else {
2142                 data.dsize = sizeof(*ip);
2143                 data.dptr  = (uint8_t *)ip;
2144
2145                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2146                            NULL, &res, &timeout, NULL);
2147         }
2148
2149         if (ret != 0 || res != 0) {
2150                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2151                 return -1;
2152         }
2153
2154         return 0;       
2155 }
2156
2157
2158 /* 
2159   sent to a node to make it release an ip address
2160 */
2161 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2162                          uint32_t destnode, struct ctdb_public_ip *ip)
2163 {
2164         TDB_DATA data;
2165         struct ctdb_public_ipv4 ipv4;
2166         int ret;
2167         int32_t res;
2168
2169         if (ip->addr.sa.sa_family == AF_INET) {
2170                 ipv4.pnn = ip->pnn;
2171                 ipv4.sin = ip->addr.ip;
2172
2173                 data.dsize = sizeof(ipv4);
2174                 data.dptr  = (uint8_t *)&ipv4;
2175
2176                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2177                                    NULL, &res, &timeout, NULL);
2178         } else {
2179                 data.dsize = sizeof(*ip);
2180                 data.dptr  = (uint8_t *)ip;
2181
2182                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2183                                    NULL, &res, &timeout, NULL);
2184         }
2185
2186         if (ret != 0 || res != 0) {
2187                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2188                 return -1;
2189         }
2190
2191         return 0;       
2192 }
2193
2194
2195 /*
2196   get a tunable
2197  */
2198 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2199                           struct timeval timeout, 
2200                           uint32_t destnode,
2201                           const char *name, uint32_t *value)
2202 {
2203         struct ctdb_control_get_tunable *t;
2204         TDB_DATA data, outdata;
2205         int32_t res;
2206         int ret;
2207
2208         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2209         data.dptr  = talloc_size(ctdb, data.dsize);
2210         CTDB_NO_MEMORY(ctdb, data.dptr);
2211
2212         t = (struct ctdb_control_get_tunable *)data.dptr;
2213         t->length = strlen(name)+1;
2214         memcpy(t->name, name, t->length);
2215
2216         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2217                            &outdata, &res, &timeout, NULL);
2218         talloc_free(data.dptr);
2219         if (ret != 0 || res != 0) {
2220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2221                 return -1;
2222         }
2223
2224         if (outdata.dsize != sizeof(uint32_t)) {
2225                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2226                 talloc_free(outdata.dptr);
2227                 return -1;
2228         }
2229         
2230         *value = *(uint32_t *)outdata.dptr;
2231         talloc_free(outdata.dptr);
2232
2233         return 0;
2234 }
2235
2236 /*
2237   set a tunable
2238  */
2239 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2240                           struct timeval timeout, 
2241                           uint32_t destnode,
2242                           const char *name, uint32_t value)
2243 {
2244         struct ctdb_control_set_tunable *t;
2245         TDB_DATA data;
2246         int32_t res;
2247         int ret;
2248
2249         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2250         data.dptr  = talloc_size(ctdb, data.dsize);
2251         CTDB_NO_MEMORY(ctdb, data.dptr);
2252
2253         t = (struct ctdb_control_set_tunable *)data.dptr;
2254         t->length = strlen(name)+1;
2255         memcpy(t->name, name, t->length);
2256         t->value = value;
2257
2258         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2259                            NULL, &res, &timeout, NULL);
2260         talloc_free(data.dptr);
2261         if (ret != 0 || res != 0) {
2262                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2263                 return -1;
2264         }
2265
2266         return 0;
2267 }
2268
2269 /*
2270   list tunables
2271  */
2272 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2273                             struct timeval timeout, 
2274                             uint32_t destnode,
2275                             TALLOC_CTX *mem_ctx,
2276                             const char ***list, uint32_t *count)
2277 {
2278         TDB_DATA outdata;
2279         int32_t res;
2280         int ret;
2281         struct ctdb_control_list_tunable *t;
2282         char *p, *s, *ptr;
2283
2284         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2285                            mem_ctx, &outdata, &res, &timeout, NULL);
2286         if (ret != 0 || res != 0) {
2287                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2288                 return -1;
2289         }
2290
2291         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2292         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2293             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2294                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2295                 talloc_free(outdata.dptr);
2296                 return -1;              
2297         }
2298         
2299         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2300         CTDB_NO_MEMORY(ctdb, p);
2301
2302         talloc_free(outdata.dptr);
2303         
2304         (*list) = NULL;
2305         (*count) = 0;
2306
2307         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2308                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2309                 CTDB_NO_MEMORY(ctdb, *list);
2310                 (*list)[*count] = talloc_strdup(*list, s);
2311                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2312                 (*count)++;
2313         }
2314
2315         talloc_free(p);
2316
2317         return 0;
2318 }
2319
2320
2321 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2322                                    struct timeval timeout, uint32_t destnode,
2323                                    TALLOC_CTX *mem_ctx,
2324                                    uint32_t flags,
2325                                    struct ctdb_all_public_ips **ips)
2326 {
2327         int ret;
2328         TDB_DATA outdata;
2329         int32_t res;
2330
2331         ret = ctdb_control(ctdb, destnode, 0, 
2332                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2333                            mem_ctx, &outdata, &res, &timeout, NULL);
2334         if (ret == 0 && res == -1) {
2335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2336                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2337         }
2338         if (ret != 0 || res != 0) {
2339           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2340                 return -1;
2341         }
2342
2343         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2344         talloc_free(outdata.dptr);
2345                     
2346         return 0;
2347 }
2348
2349 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2350                              struct timeval timeout, uint32_t destnode,
2351                              TALLOC_CTX *mem_ctx,
2352                              struct ctdb_all_public_ips **ips)
2353 {
2354         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2355                                               destnode, mem_ctx,
2356                                               0, ips);
2357 }
2358
2359 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2360                         struct timeval timeout, uint32_t destnode, 
2361                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2362 {
2363         int ret, i, len;
2364         TDB_DATA outdata;
2365         int32_t res;
2366         struct ctdb_all_public_ipsv4 *ipsv4;
2367
2368         ret = ctdb_control(ctdb, destnode, 0, 
2369                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2370                            mem_ctx, &outdata, &res, &timeout, NULL);
2371         if (ret != 0 || res != 0) {
2372                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2373                 return -1;
2374         }
2375
2376         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2377         len = offsetof(struct ctdb_all_public_ips, ips) +
2378                 ipsv4->num*sizeof(struct ctdb_public_ip);
2379         *ips = talloc_zero_size(mem_ctx, len);
2380         CTDB_NO_MEMORY(ctdb, *ips);
2381         (*ips)->num = ipsv4->num;
2382         for (i=0; i<ipsv4->num; i++) {
2383                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2384                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2385         }
2386
2387         talloc_free(outdata.dptr);
2388                     
2389         return 0;
2390 }
2391
2392 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2393                                  struct timeval timeout, uint32_t destnode,
2394                                  TALLOC_CTX *mem_ctx,
2395                                  const ctdb_sock_addr *addr,
2396                                  struct ctdb_control_public_ip_info **_info)
2397 {
2398         int ret;
2399         TDB_DATA indata;
2400         TDB_DATA outdata;
2401         int32_t res;
2402         struct ctdb_control_public_ip_info *info;
2403         uint32_t len;
2404         uint32_t i;
2405
2406         indata.dptr = discard_const_p(uint8_t, addr);
2407         indata.dsize = sizeof(*addr);
2408
2409         ret = ctdb_control(ctdb, destnode, 0,
2410                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2411                            mem_ctx, &outdata, &res, &timeout, NULL);
2412         if (ret != 0 || res != 0) {
2413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2414                                 "failed ret:%d res:%d\n",
2415                                 ret, res));
2416                 return -1;
2417         }
2418
2419         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2420         if (len > outdata.dsize) {
2421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2422                                 "returned invalid data with size %u > %u\n",
2423                                 (unsigned int)outdata.dsize,
2424                                 (unsigned int)len));
2425                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2426                 return -1;
2427         }
2428
2429         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2430         len += info->num*sizeof(struct ctdb_control_iface_info);
2431
2432         if (len > outdata.dsize) {
2433                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2434                                 "returned invalid data with size %u > %u\n",
2435                                 (unsigned int)outdata.dsize,
2436                                 (unsigned int)len));
2437                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2438                 return -1;
2439         }
2440
2441         /* make sure we null terminate the returned strings */
2442         for (i=0; i < info->num; i++) {
2443                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2444         }
2445
2446         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2447                                                                 outdata.dptr,
2448                                                                 outdata.dsize);
2449         talloc_free(outdata.dptr);
2450         if (*_info == NULL) {
2451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2452                                 "talloc_memdup size %u failed\n",
2453                                 (unsigned int)outdata.dsize));
2454                 return -1;
2455         }
2456
2457         return 0;
2458 }
2459
2460 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2461                          struct timeval timeout, uint32_t destnode,
2462                          TALLOC_CTX *mem_ctx,
2463                          struct ctdb_control_get_ifaces **_ifaces)
2464 {
2465         int ret;
2466         TDB_DATA outdata;
2467         int32_t res;
2468         struct ctdb_control_get_ifaces *ifaces;
2469         uint32_t len;
2470         uint32_t i;
2471
2472         ret = ctdb_control(ctdb, destnode, 0,
2473                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2474                            mem_ctx, &outdata, &res, &timeout, NULL);
2475         if (ret != 0 || res != 0) {
2476                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2477                                 "failed ret:%d res:%d\n",
2478                                 ret, res));
2479                 return -1;
2480         }
2481
2482         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2483         if (len > outdata.dsize) {
2484                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2485                                 "returned invalid data with size %u > %u\n",
2486                                 (unsigned int)outdata.dsize,
2487                                 (unsigned int)len));
2488                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2489                 return -1;
2490         }
2491
2492         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2493         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2494
2495         if (len > outdata.dsize) {
2496                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2497                                 "returned invalid data with size %u > %u\n",
2498                                 (unsigned int)outdata.dsize,
2499                                 (unsigned int)len));
2500                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2501                 return -1;
2502         }
2503
2504         /* make sure we null terminate the returned strings */
2505         for (i=0; i < ifaces->num; i++) {
2506                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2507         }
2508
2509         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2510                                                                   outdata.dptr,
2511                                                                   outdata.dsize);
2512         talloc_free(outdata.dptr);
2513         if (*_ifaces == NULL) {
2514                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2515                                 "talloc_memdup size %u failed\n",
2516                                 (unsigned int)outdata.dsize));
2517                 return -1;
2518         }
2519
2520         return 0;
2521 }
2522
2523 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2524                              struct timeval timeout, uint32_t destnode,
2525                              TALLOC_CTX *mem_ctx,
2526                              const struct ctdb_control_iface_info *info)
2527 {
2528         int ret;
2529         TDB_DATA indata;
2530         int32_t res;
2531
2532         indata.dptr = discard_const_p(uint8_t, info);
2533         indata.dsize = sizeof(*info);
2534
2535         ret = ctdb_control(ctdb, destnode, 0,
2536                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2537                            mem_ctx, NULL, &res, &timeout, NULL);
2538         if (ret != 0 || res != 0) {
2539                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2540                                 "failed ret:%d res:%d\n",
2541                                 ret, res));
2542                 return -1;
2543         }
2544
2545         return 0;
2546 }
2547
2548 /*
2549   set/clear the permanent disabled bit on a remote node
2550  */
2551 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2552                        uint32_t set, uint32_t clear)
2553 {
2554         int ret;
2555         TDB_DATA data;
2556         struct ctdb_node_map *nodemap=NULL;
2557         struct ctdb_node_flag_change c;
2558         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2559         uint32_t recmaster;
2560         uint32_t *nodes;
2561
2562
2563         /* find the recovery master */
2564         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2565         if (ret != 0) {
2566                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2567                 talloc_free(tmp_ctx);
2568                 return ret;
2569         }
2570
2571
2572         /* read the node flags from the recmaster */
2573         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2574         if (ret != 0) {
2575                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2576                 talloc_free(tmp_ctx);
2577                 return -1;
2578         }
2579         if (destnode >= nodemap->num) {
2580                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2581                 talloc_free(tmp_ctx);
2582                 return -1;
2583         }
2584
2585         c.pnn       = destnode;
2586         c.old_flags = nodemap->nodes[destnode].flags;
2587         c.new_flags = c.old_flags;
2588         c.new_flags |= set;
2589         c.new_flags &= ~clear;
2590
2591         data.dsize = sizeof(c);
2592         data.dptr = (unsigned char *)&c;
2593
2594         /* send the flags update to all connected nodes */
2595         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2596
2597         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2598                                         nodes, 0,
2599                                         timeout, false, data,
2600                                         NULL, NULL,
2601                                         NULL) != 0) {
2602                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2603
2604                 talloc_free(tmp_ctx);
2605                 return -1;
2606         }
2607
2608         talloc_free(tmp_ctx);
2609         return 0;
2610 }
2611
2612
2613 /*
2614   get all tunables
2615  */
2616 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2617                                struct timeval timeout, 
2618                                uint32_t destnode,
2619                                struct ctdb_tunable *tunables)
2620 {
2621         TDB_DATA outdata;
2622         int ret;
2623         int32_t res;
2624
2625         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2626                            &outdata, &res, &timeout, NULL);
2627         if (ret != 0 || res != 0) {
2628                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2629                 return -1;
2630         }
2631
2632         if (outdata.dsize != sizeof(*tunables)) {
2633                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2634                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2635                 return -1;              
2636         }
2637
2638         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2639         talloc_free(outdata.dptr);
2640         return 0;
2641 }
2642
2643 /*
2644   add a public address to a node
2645  */
2646 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2647                       struct timeval timeout, 
2648                       uint32_t destnode,
2649                       struct ctdb_control_ip_iface *pub)
2650 {
2651         TDB_DATA data;
2652         int32_t res;
2653         int ret;
2654
2655         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2656         data.dptr  = (unsigned char *)pub;
2657
2658         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2659                            NULL, &res, &timeout, NULL);
2660         if (ret != 0 || res != 0) {
2661                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2662                 return -1;
2663         }
2664
2665         return 0;
2666 }
2667
2668 /*
2669   delete a public address from a node
2670  */
2671 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2672                       struct timeval timeout, 
2673                       uint32_t destnode,
2674                       struct ctdb_control_ip_iface *pub)
2675 {
2676         TDB_DATA data;
2677         int32_t res;
2678         int ret;
2679
2680         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2681         data.dptr  = (unsigned char *)pub;
2682
2683         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2684                            NULL, &res, &timeout, NULL);
2685         if (ret != 0 || res != 0) {
2686                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2687                 return -1;
2688         }
2689
2690         return 0;
2691 }
2692
2693 /*
2694   kill a tcp connection
2695  */
2696 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2697                       struct timeval timeout, 
2698                       uint32_t destnode,
2699                       struct ctdb_control_killtcp *killtcp)
2700 {
2701         TDB_DATA data;
2702         int32_t res;
2703         int ret;
2704
2705         data.dsize = sizeof(struct ctdb_control_killtcp);
2706         data.dptr  = (unsigned char *)killtcp;
2707
2708         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2709                            NULL, &res, &timeout, NULL);
2710         if (ret != 0 || res != 0) {
2711                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2712                 return -1;
2713         }
2714
2715         return 0;
2716 }
2717
2718 /*
2719   send a gratious arp
2720  */
2721 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2722                       struct timeval timeout, 
2723                       uint32_t destnode,
2724                       ctdb_sock_addr *addr,
2725                       const char *ifname)
2726 {
2727         TDB_DATA data;
2728         int32_t res;
2729         int ret, len;
2730         struct ctdb_control_gratious_arp *gratious_arp;
2731         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2732
2733
2734         len = strlen(ifname)+1;
2735         gratious_arp = talloc_size(tmp_ctx, 
2736                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2737         CTDB_NO_MEMORY(ctdb, gratious_arp);
2738
2739         gratious_arp->addr = *addr;
2740         gratious_arp->len = len;
2741         memcpy(&gratious_arp->iface[0], ifname, len);
2742
2743
2744         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2745         data.dptr  = (unsigned char *)gratious_arp;
2746
2747         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2748                            NULL, &res, &timeout, NULL);
2749         if (ret != 0 || res != 0) {
2750                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2751                 talloc_free(tmp_ctx);
2752                 return -1;
2753         }
2754
2755         talloc_free(tmp_ctx);
2756         return 0;
2757 }
2758
2759 /*
2760   get a list of all tcp tickles that a node knows about for a particular vnn
2761  */
2762 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2763                               struct timeval timeout, uint32_t destnode, 
2764                               TALLOC_CTX *mem_ctx, 
2765                               ctdb_sock_addr *addr,
2766                               struct ctdb_control_tcp_tickle_list **list)
2767 {
2768         int ret;
2769         TDB_DATA data, outdata;
2770         int32_t status;
2771
2772         data.dptr = (uint8_t*)addr;
2773         data.dsize = sizeof(ctdb_sock_addr);
2774
2775         ret = ctdb_control(ctdb, destnode, 0, 
2776                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2777                            mem_ctx, &outdata, &status, NULL, NULL);
2778         if (ret != 0 || status != 0) {
2779                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2780                 return -1;
2781         }
2782
2783         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2784
2785         return status;
2786 }
2787
2788 /*
2789   register a server id
2790  */
2791 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2792                       struct timeval timeout, 
2793                       struct ctdb_server_id *id)
2794 {
2795         TDB_DATA data;
2796         int32_t res;
2797         int ret;
2798
2799         data.dsize = sizeof(struct ctdb_server_id);
2800         data.dptr  = (unsigned char *)id;
2801
2802         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2803                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2804                         0, data, NULL,
2805                         NULL, &res, &timeout, NULL);
2806         if (ret != 0 || res != 0) {
2807                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2808                 return -1;
2809         }
2810
2811         return 0;
2812 }
2813
2814 /*
2815   unregister a server id
2816  */
2817 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2818                       struct timeval timeout, 
2819                       struct ctdb_server_id *id)
2820 {
2821         TDB_DATA data;
2822         int32_t res;
2823         int ret;
2824
2825         data.dsize = sizeof(struct ctdb_server_id);
2826         data.dptr  = (unsigned char *)id;
2827
2828         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2829                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2830                         0, data, NULL,
2831                         NULL, &res, &timeout, NULL);
2832         if (ret != 0 || res != 0) {
2833                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2834                 return -1;
2835         }
2836
2837         return 0;
2838 }
2839
2840
2841 /*
2842   check if a server id exists
2843
2844   if a server id does exist, return *status == 1, otherwise *status == 0
2845  */
2846 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2847                       struct timeval timeout, 
2848                       uint32_t destnode,
2849                       struct ctdb_server_id *id,
2850                       uint32_t *status)
2851 {
2852         TDB_DATA data;
2853         int32_t res;
2854         int ret;
2855
2856         data.dsize = sizeof(struct ctdb_server_id);
2857         data.dptr  = (unsigned char *)id;
2858
2859         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2860                         0, data, NULL,
2861                         NULL, &res, &timeout, NULL);
2862         if (ret != 0) {
2863                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2864                 return -1;
2865         }
2866
2867         if (res) {
2868                 *status = 1;
2869         } else {
2870                 *status = 0;
2871         }
2872
2873         return 0;
2874 }
2875
2876 /*
2877    get the list of server ids that are registered on a node
2878 */
2879 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2880                 TALLOC_CTX *mem_ctx,
2881                 struct timeval timeout, uint32_t destnode, 
2882                 struct ctdb_server_id_list **svid_list)
2883 {
2884         int ret;
2885         TDB_DATA outdata;
2886         int32_t res;
2887
2888         ret = ctdb_control(ctdb, destnode, 0, 
2889                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2890                            mem_ctx, &outdata, &res, &timeout, NULL);
2891         if (ret != 0 || res != 0) {
2892                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2893                 return -1;
2894         }
2895
2896         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2897                     
2898         return 0;
2899 }
2900
2901 /*
2902   initialise the ctdb daemon for client applications
2903
2904   NOTE: In current code the daemon does not fork. This is for testing purposes only
2905   and to simplify the code.
2906 */
2907 struct ctdb_context *ctdb_init(struct event_context *ev)
2908 {
2909         int ret;
2910         struct ctdb_context *ctdb;
2911
2912         ctdb = talloc_zero(ev, struct ctdb_context);
2913         if (ctdb == NULL) {
2914                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2915                 return NULL;
2916         }
2917         ctdb->ev  = ev;
2918         ctdb->idr = idr_init(ctdb);
2919         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2920
2921         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2922         if (ret != 0) {
2923                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2924                 talloc_free(ctdb);
2925                 return NULL;
2926         }
2927
2928         ctdb->statistics.statistics_start_time = timeval_current();
2929
2930         return ctdb;
2931 }
2932
2933
2934 /*
2935   set some ctdb flags
2936 */
2937 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2938 {
2939         ctdb->flags |= flags;
2940 }
2941
2942 /*
2943   setup the local socket name
2944 */
2945 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2946 {
2947         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2948         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2949
2950         return 0;
2951 }
2952
2953 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2954 {
2955         return ctdb->daemon.name;
2956 }
2957
2958 /*
2959   return the pnn of this node
2960 */
2961 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2962 {
2963         return ctdb->pnn;
2964 }
2965
2966
2967 /*
2968   get the uptime of a remote node
2969  */
2970 struct ctdb_client_control_state *
2971 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2972 {
2973         return ctdb_control_send(ctdb, destnode, 0, 
2974                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2975                            mem_ctx, &timeout, NULL);
2976 }
2977
2978 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2979 {
2980         int ret;
2981         int32_t res;
2982         TDB_DATA outdata;
2983
2984         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2985         if (ret != 0 || res != 0) {
2986                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2987                 return -1;
2988         }
2989
2990         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2991
2992         return 0;
2993 }
2994
2995 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2996 {
2997         struct ctdb_client_control_state *state;
2998
2999         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3000         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3001 }
3002
3003 /*
3004   send a control to execute the "recovered" event script on a node
3005  */
3006 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3007 {
3008         int ret;
3009         int32_t status;
3010
3011         ret = ctdb_control(ctdb, destnode, 0, 
3012                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3013                            NULL, NULL, &status, &timeout, NULL);
3014         if (ret != 0 || status != 0) {
3015                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3016                 return -1;
3017         }
3018
3019         return 0;
3020 }
3021
3022 /* 
3023   callback for the async helpers used when sending the same control
3024   to multiple nodes in parallell.
3025 */
3026 static void async_callback(struct ctdb_client_control_state *state)
3027 {
3028         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3029         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3030         int ret;
3031         TDB_DATA outdata;
3032         int32_t res;
3033         uint32_t destnode = state->c->hdr.destnode;
3034
3035         /* one more node has responded with recmode data */
3036         data->count--;
3037
3038         /* if we failed to push the db, then return an error and let
3039            the main loop try again.
3040         */
3041         if (state->state != CTDB_CONTROL_DONE) {
3042                 if ( !data->dont_log_errors) {
3043                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3044                 }
3045                 data->fail_count++;
3046                 if (data->fail_callback) {
3047                         data->fail_callback(ctdb, destnode, res, outdata,
3048                                         data->callback_data);
3049                 }
3050                 return;
3051         }
3052         
3053         state->async.fn = NULL;
3054
3055         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3056         if ((ret != 0) || (res != 0)) {
3057                 if ( !data->dont_log_errors) {
3058                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3059                 }
3060                 data->fail_count++;
3061                 if (data->fail_callback) {
3062                         data->fail_callback(ctdb, destnode, res, outdata,
3063                                         data->callback_data);
3064                 }
3065         }
3066         if ((ret == 0) && (data->callback != NULL)) {
3067                 data->callback(ctdb, destnode, res, outdata,
3068                                         data->callback_data);
3069         }
3070 }
3071
3072
3073 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3074 {
3075         /* set up the callback functions */
3076         state->async.fn = async_callback;
3077         state->async.private_data = data;
3078         
3079         /* one more control to wait for to complete */
3080         data->count++;
3081 }
3082
3083
3084 /* wait for up to the maximum number of seconds allowed
3085    or until all nodes we expect a response from has replied
3086 */
3087 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3088 {
3089         while (data->count > 0) {
3090                 event_loop_once(ctdb->ev);
3091         }
3092         if (data->fail_count != 0) {
3093                 if (!data->dont_log_errors) {
3094                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3095                                  data->fail_count));
3096                 }
3097                 return -1;
3098         }
3099         return 0;
3100 }
3101
3102
3103 /* 
3104    perform a simple control on the listed nodes
3105    The control cannot return data
3106  */
3107 int ctdb_client_async_control(struct ctdb_context *ctdb,
3108                                 enum ctdb_controls opcode,
3109                                 uint32_t *nodes,
3110                                 uint64_t srvid,
3111                                 struct timeval timeout,
3112                                 bool dont_log_errors,
3113                                 TDB_DATA data,
3114                                 client_async_callback client_callback,
3115                                 client_async_callback fail_callback,
3116                                 void *callback_data)
3117 {
3118         struct client_async_data *async_data;
3119         struct ctdb_client_control_state *state;
3120         int j, num_nodes;
3121
3122         async_data = talloc_zero(ctdb, struct client_async_data);
3123         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3124         async_data->dont_log_errors = dont_log_errors;
3125         async_data->callback = client_callback;
3126         async_data->fail_callback = fail_callback;
3127         async_data->callback_data = callback_data;
3128         async_data->opcode        = opcode;
3129
3130         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3131
3132         /* loop over all nodes and send an async control to each of them */
3133         for (j=0; j<num_nodes; j++) {
3134                 uint32_t pnn = nodes[j];
3135
3136                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3137                                           0, data, async_data, &timeout, NULL);
3138                 if (state == NULL) {
3139                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3140                         talloc_free(async_data);
3141                         return -1;
3142                 }
3143                 
3144                 ctdb_client_async_add(async_data, state);
3145         }
3146
3147         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3148                 talloc_free(async_data);
3149                 return -1;
3150         }
3151
3152         talloc_free(async_data);
3153         return 0;
3154 }
3155
3156 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3157                                 struct ctdb_vnn_map *vnn_map,
3158                                 TALLOC_CTX *mem_ctx,
3159                                 bool include_self)
3160 {
3161         int i, j, num_nodes;
3162         uint32_t *nodes;
3163
3164         for (i=num_nodes=0;i<vnn_map->size;i++) {
3165                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3166                         continue;
3167                 }
3168                 num_nodes++;
3169         } 
3170
3171         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3172         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3173
3174         for (i=j=0;i<vnn_map->size;i++) {
3175                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3176                         continue;
3177                 }
3178                 nodes[j++] = vnn_map->map[i];
3179         } 
3180
3181         return nodes;
3182 }
3183
3184 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3185                                 struct ctdb_node_map *node_map,
3186                                 TALLOC_CTX *mem_ctx,
3187                                 bool include_self)
3188 {
3189         int i, j, num_nodes;
3190         uint32_t *nodes;
3191
3192         for (i=num_nodes=0;i<node_map->num;i++) {
3193                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3194                         continue;
3195                 }
3196                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3197                         continue;
3198                 }
3199                 num_nodes++;
3200         } 
3201
3202         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3203         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3204
3205         for (i=j=0;i<node_map->num;i++) {
3206                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3207                         continue;
3208                 }
3209                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3210                         continue;
3211                 }
3212                 nodes[j++] = node_map->nodes[i].pnn;
3213         } 
3214
3215         return nodes;
3216 }
3217
3218 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3219                                 struct ctdb_node_map *node_map,
3220                                 TALLOC_CTX *mem_ctx,
3221                                 uint32_t pnn)
3222 {
3223         int i, j, num_nodes;
3224         uint32_t *nodes;
3225
3226         for (i=num_nodes=0;i<node_map->num;i++) {
3227                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3228                         continue;
3229                 }
3230                 if (node_map->nodes[i].pnn == pnn) {
3231                         continue;
3232                 }
3233                 num_nodes++;
3234         } 
3235
3236         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3237         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3238
3239         for (i=j=0;i<node_map->num;i++) {
3240                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3241                         continue;
3242                 }
3243                 if (node_map->nodes[i].pnn == pnn) {
3244                         continue;
3245                 }
3246                 nodes[j++] = node_map->nodes[i].pnn;
3247         } 
3248
3249         return nodes;
3250 }
3251
3252 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3253                                 struct ctdb_node_map *node_map,
3254                                 TALLOC_CTX *mem_ctx,
3255                                 bool include_self)
3256 {
3257         int i, j, num_nodes;
3258         uint32_t *nodes;
3259
3260         for (i=num_nodes=0;i<node_map->num;i++) {
3261                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3262                         continue;
3263                 }
3264                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3265                         continue;
3266                 }
3267                 num_nodes++;
3268         } 
3269
3270         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3271         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3272
3273         for (i=j=0;i<node_map->num;i++) {
3274                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3275                         continue;
3276                 }
3277                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3278                         continue;
3279                 }
3280                 nodes[j++] = node_map->nodes[i].pnn;
3281         } 
3282
3283         return nodes;
3284 }
3285
3286 /* 
3287   this is used to test if a pnn lock exists and if it exists will return
3288   the number of connections that pnn has reported or -1 if that recovery
3289   daemon is not running.
3290 */
3291 int
3292 ctdb_read_pnn_lock(int fd, int32_t pnn)
3293 {
3294         struct flock lock;
3295         char c;
3296
3297         lock.l_type = F_WRLCK;
3298         lock.l_whence = SEEK_SET;
3299         lock.l_start = pnn;
3300         lock.l_len = 1;
3301         lock.l_pid = 0;
3302
3303         if (fcntl(fd, F_GETLK, &lock) != 0) {
3304                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3305                 return -1;
3306         }
3307
3308         if (lock.l_type == F_UNLCK) {
3309                 return -1;
3310         }
3311
3312         if (pread(fd, &c, 1, pnn) == -1) {
3313                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3314                 return -1;
3315         }
3316
3317         return c;
3318 }
3319
3320 /*
3321   get capabilities of a remote node
3322  */
3323 struct ctdb_client_control_state *
3324 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3325 {
3326         return ctdb_control_send(ctdb, destnode, 0, 
3327                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3328                            mem_ctx, &timeout, NULL);
3329 }
3330
3331 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3332 {
3333         int ret;
3334         int32_t res;
3335         TDB_DATA outdata;
3336
3337         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3338         if ( (ret != 0) || (res != 0) ) {
3339                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3340                 return -1;
3341         }
3342
3343         if (capabilities) {
3344                 *capabilities = *((uint32_t *)outdata.dptr);
3345         }
3346
3347         return 0;
3348 }
3349
3350 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3351 {
3352         struct ctdb_client_control_state *state;
3353         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3354         int ret;
3355
3356         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3357         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3358         talloc_free(tmp_ctx);
3359         return ret;
3360 }
3361
3362 /**
3363  * check whether a transaction is active on a given db on a given node
3364  */
3365 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3366                                      uint32_t destnode,
3367                                      uint32_t db_id)
3368 {
3369         int32_t status;
3370         int ret;
3371         TDB_DATA indata;
3372
3373         indata.dptr = (uint8_t *)&db_id;
3374         indata.dsize = sizeof(db_id);
3375
3376         ret = ctdb_control(ctdb, destnode, 0,
3377                            CTDB_CONTROL_TRANS2_ACTIVE,
3378                            0, indata, NULL, NULL, &status,
3379                            NULL, NULL);
3380
3381         if (ret != 0) {
3382                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3383                 return -1;
3384         }
3385
3386         return status;
3387 }
3388
3389
3390 struct ctdb_transaction_handle {
3391         struct ctdb_db_context *ctdb_db;
3392         bool in_replay;
3393         /*
3394          * we store the reads and writes done under a transaction:
3395          * - one list stores both reads and writes (m_all),
3396          * - the other just writes (m_write)
3397          */
3398         struct ctdb_marshall_buffer *m_all;
3399         struct ctdb_marshall_buffer *m_write;
3400 };
3401
3402 /* start a transaction on a database */
3403 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3404 {
3405         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3406         return 0;
3407 }
3408
3409 /* start a transaction on a database */
3410 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3411 {
3412         struct ctdb_record_handle *rh;
3413         TDB_DATA key;
3414         TDB_DATA data;
3415         struct ctdb_ltdb_header header;
3416         TALLOC_CTX *tmp_ctx;
3417         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3418         int ret;
3419         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3420         pid_t pid;
3421         int32_t status;
3422
3423         key.dptr = discard_const(keyname);
3424         key.dsize = strlen(keyname);
3425
3426         if (!ctdb_db->persistent) {
3427                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3428                 return -1;
3429         }
3430
3431 again:
3432         tmp_ctx = talloc_new(h);
3433
3434         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3435         if (rh == NULL) {
3436                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3437                 talloc_free(tmp_ctx);
3438                 return -1;
3439         }
3440
3441         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3442                                               CTDB_CURRENT_NODE,
3443                                               ctdb_db->db_id);
3444         if (status == 1) {
3445                 unsigned long int usec = (1000 + random()) % 100000;
3446                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3447                                     "on db_id[0x%08x]. waiting for %lu "
3448                                     "microseconds\n",
3449                                     ctdb_db->db_id, usec));
3450                 talloc_free(tmp_ctx);
3451                 usleep(usec);
3452                 goto again;
3453         }
3454
3455         /*
3456          * store the pid in the database:
3457          * it is not enough that the node is dmaster...
3458          */
3459         pid = getpid();
3460         data.dptr = (unsigned char *)&pid;
3461         data.dsize = sizeof(pid_t);
3462         rh->header.rsn++;
3463         rh->header.dmaster = ctdb_db->ctdb->pnn;
3464         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3465         if (ret != 0) {
3466                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3467                                   "transaction record\n"));
3468                 talloc_free(tmp_ctx);
3469                 return -1;
3470         }
3471
3472         talloc_free(rh);
3473
3474         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3475         if (ret != 0) {
3476                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3477                 talloc_free(tmp_ctx);
3478                 return -1;
3479         }
3480
3481         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3482         if (ret != 0) {
3483                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3484                                  "lock record inside transaction\n"));
3485                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3486                 talloc_free(tmp_ctx);
3487                 goto again;
3488         }
3489
3490         if (header.dmaster != ctdb_db->ctdb->pnn) {
3491                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3492                                    "transaction lock record\n"));
3493                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3494                 talloc_free(tmp_ctx);
3495                 goto again;
3496         }
3497
3498         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3499                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3500                                     "the transaction lock record\n"));
3501                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3502                 talloc_free(tmp_ctx);
3503                 goto again;
3504         }
3505
3506         talloc_free(tmp_ctx);
3507
3508         return 0;
3509 }
3510
3511
3512 /* start a transaction on a database */
3513 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3514                                                        TALLOC_CTX *mem_ctx)
3515 {
3516         struct ctdb_transaction_handle *h;
3517         int ret;
3518
3519         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3520         if (h == NULL) {
3521                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3522                 return NULL;
3523         }
3524
3525         h->ctdb_db = ctdb_db;
3526
3527         ret = ctdb_transaction_fetch_start(h);
3528         if (ret != 0) {
3529                 talloc_free(h);
3530                 return NULL;
3531         }
3532
3533         talloc_set_destructor(h, ctdb_transaction_destructor);
3534
3535         return h;
3536 }
3537
3538
3539
3540 /*
3541   fetch a record inside a transaction
3542  */
3543 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3544                            TALLOC_CTX *mem_ctx, 
3545                            TDB_DATA key, TDB_DATA *data)
3546 {
3547         struct ctdb_ltdb_header header;
3548         int ret;
3549
3550         ZERO_STRUCT(header);
3551
3552         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3553         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3554                 /* record doesn't exist yet */
3555                 *data = tdb_null;
3556                 ret = 0;
3557         }
3558         
3559         if (ret != 0) {
3560                 return ret;
3561         }
3562
3563         if (!h->in_replay) {
3564                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3565                 if (h->m_all == NULL) {
3566                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3567                         return -1;
3568                 }
3569         }
3570
3571         return 0;
3572 }
3573
3574 /*
3575   stores a record inside a transaction
3576  */
3577 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3578                            TDB_DATA key, TDB_DATA data)
3579 {
3580         TALLOC_CTX *tmp_ctx = talloc_new(h);
3581         struct ctdb_ltdb_header header;
3582         TDB_DATA olddata;
3583         int ret;
3584
3585         ZERO_STRUCT(header);
3586
3587         /* we need the header so we can update the RSN */
3588         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3589         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3590                 /* the record doesn't exist - create one with us as dmaster.
3591                    This is only safe because we are in a transaction and this
3592                    is a persistent database */
3593                 ZERO_STRUCT(header);
3594         } else if (ret != 0) {
3595                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3596                 talloc_free(tmp_ctx);
3597                 return ret;
3598         }
3599
3600         if (data.dsize == olddata.dsize &&
3601             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3602                 /* save writing the same data */
3603                 talloc_free(tmp_ctx);
3604                 return 0;
3605         }
3606
3607         header.dmaster = h->ctdb_db->ctdb->pnn;
3608         header.rsn++;
3609
3610         if (!h->in_replay) {
3611                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3612                 if (h->m_all == NULL) {
3613                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3614                         talloc_free(tmp_ctx);
3615                         return -1;
3616                 }
3617         }               
3618
3619         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3620         if (h->m_write == NULL) {
3621                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3622                 talloc_free(tmp_ctx);
3623                 return -1;
3624         }
3625         
3626         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3627
3628         talloc_free(tmp_ctx);
3629         
3630         return ret;
3631 }
3632
3633 /*
3634   replay a transaction
3635  */
3636 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3637 {
3638         int ret, i;
3639         struct ctdb_rec_data *rec = NULL;
3640
3641         h->in_replay = true;
3642         talloc_free(h->m_write);
3643         h->m_write = NULL;
3644
3645         ret = ctdb_transaction_fetch_start(h);
3646         if (ret != 0) {
3647                 return ret;
3648         }
3649
3650         for (i=0;i<h->m_all->count;i++) {
3651                 TDB_DATA key, data;
3652
3653                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3654                 if (rec == NULL) {
3655                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3656                         goto failed;
3657                 }
3658
3659                 if (rec->reqid == 0) {
3660                         /* its a store */
3661                         if (ctdb_transaction_store(h, key, data) != 0) {
3662                                 goto failed;
3663                         }
3664                 } else {
3665                         TDB_DATA data2;
3666                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3667
3668                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3669                                 talloc_free(tmp_ctx);
3670                                 goto failed;
3671                         }
3672                         if (data2.dsize != data.dsize ||
3673                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3674                                 /* the record has changed on us - we have to give up */
3675                                 talloc_free(tmp_ctx);
3676                                 goto failed;
3677                         }
3678                         talloc_free(tmp_ctx);
3679                 }
3680         }
3681         
3682         return 0;
3683
3684 failed:
3685         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3686         return -1;
3687 }
3688
3689
3690 /*
3691   commit a transaction
3692  */
3693 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3694 {
3695         int ret, retries=0;
3696         int32_t status;
3697         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3698         struct timeval timeout;
3699         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3700
3701         talloc_set_destructor(h, NULL);
3702
3703         /* our commit strategy is quite complex.
3704
3705            - we first try to commit the changes to all other nodes
3706
3707            - if that works, then we commit locally and we are done
3708
3709            - if a commit on another node fails, then we need to cancel
3710              the transaction, then restart the transaction (thus
3711              opening a window of time for a pending recovery to
3712              complete), then replay the transaction, checking all the
3713              reads and writes (checking that reads give the same data,
3714              and writes succeed). Then we retry the transaction to the
3715              other nodes
3716         */
3717
3718 again:
3719         if (h->m_write == NULL) {
3720                 /* no changes were made */
3721                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3722                 talloc_free(h);
3723                 return 0;
3724         }
3725
3726         /* tell ctdbd to commit to the other nodes */
3727         timeout = timeval_current_ofs(1, 0);
3728         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3729                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3730                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3731                            &timeout, NULL);
3732         if (ret != 0 || status != 0) {
3733                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3734                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3735                                      ", retrying after 1 second...\n",
3736                                      (retries==0)?"":"retry "));
3737                 sleep(1);
3738
3739                 if (ret != 0) {
3740                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3741                 } else {
3742                         /* work out what error code we will give if we 
3743                            have to fail the operation */
3744                         switch ((enum ctdb_trans2_commit_error)status) {
3745                         case CTDB_TRANS2_COMMIT_SUCCESS:
3746                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3747                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3748                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3749                                 break;
3750                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3751                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3752                                 break;
3753                         }
3754                 }
3755
3756                 if (++retries == 100) {
3757                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3758                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3759                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3760                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3761                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3762                         talloc_free(h);
3763                         return -1;
3764                 }               
3765
3766                 if (ctdb_replay_transaction(h) != 0) {
3767                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3768                                           "transaction on db 0x%08x, "
3769                                           "failure control =%u\n",
3770                                           h->ctdb_db->db_id,
3771                                           (unsigned)failure_control));
3772                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3773                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3774                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3775                         talloc_free(h);
3776                         return -1;
3777                 }
3778                 goto again;
3779         } else {
3780                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3781         }
3782
3783         /* do the real commit locally */
3784         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3785         if (ret != 0) {
3786                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3787                                   "on db id 0x%08x locally, "
3788                                   "failure_control=%u\n",
3789                                   h->ctdb_db->db_id,
3790                                   (unsigned)failure_control));
3791                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3792                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3793                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3794                 talloc_free(h);
3795                 return ret;
3796         }
3797
3798         /* tell ctdbd that we are finished with our local commit */
3799         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3800                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3801                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3802         talloc_free(h);
3803         return 0;
3804 }
3805
3806 /*
3807   recovery daemon ping to main daemon
3808  */
3809 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3810 {
3811         int ret;
3812         int32_t res;
3813
3814         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3815                            ctdb, NULL, &res, NULL, NULL);
3816         if (ret != 0 || res != 0) {
3817                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3818                 return -1;
3819         }
3820
3821         return 0;
3822 }
3823
3824 /* when forking the main daemon and the child process needs to connect back
3825  * to the daemon as a client process, this function can be used to change
3826  * the ctdb context from daemon into client mode
3827  */
3828 int switch_from_server_to_client(struct ctdb_context *ctdb)
3829 {
3830         int ret;
3831
3832         /* shutdown the transport */
3833         if (ctdb->methods) {
3834                 ctdb->methods->shutdown(ctdb);
3835         }
3836
3837         /* get a new event context */
3838         talloc_free(ctdb->ev);
3839         ctdb->ev = event_context_init(ctdb);
3840
3841         close(ctdb->daemon.sd);
3842         ctdb->daemon.sd = -1;
3843
3844         /* initialise ctdb */
3845         ret = ctdb_socket_connect(ctdb);
3846         if (ret != 0) {
3847                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3848                 return -1;
3849         }
3850
3851          return 0;
3852 }
3853
3854 /*
3855   get the status of running the monitor eventscripts: NULL means never run.
3856  */
3857 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3858                 struct timeval timeout, uint32_t destnode, 
3859                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3860                 struct ctdb_scripts_wire **script_status)
3861 {
3862         int ret;
3863         TDB_DATA outdata, indata;
3864         int32_t res;
3865         uint32_t uinttype = type;
3866
3867         indata.dptr = (uint8_t *)&uinttype;
3868         indata.dsize = sizeof(uinttype);
3869
3870         ret = ctdb_control(ctdb, destnode, 0, 
3871                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3872                            mem_ctx, &outdata, &res, &timeout, NULL);
3873         if (ret != 0 || res != 0) {
3874                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3875                 return -1;
3876         }
3877
3878         if (outdata.dsize == 0) {
3879                 *script_status = NULL;
3880         } else {
3881                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3882                 talloc_free(outdata.dptr);
3883         }
3884                     
3885         return 0;
3886 }
3887
3888 /*
3889   tell the main daemon how long it took to lock the reclock file
3890  */
3891 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3892 {
3893         int ret;
3894         int32_t res;
3895         TDB_DATA data;
3896
3897         data.dptr = (uint8_t *)&latency;
3898         data.dsize = sizeof(latency);
3899
3900         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3901                            ctdb, NULL, &res, NULL, NULL);
3902         if (ret != 0 || res != 0) {
3903                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3904                 return -1;
3905         }
3906
3907         return 0;
3908 }
3909
3910 /*
3911   get the name of the reclock file
3912  */
3913 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3914                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3915                          const char **name)
3916 {
3917         int ret;
3918         int32_t res;
3919         TDB_DATA data;
3920
3921         ret = ctdb_control(ctdb, destnode, 0, 
3922                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3923                            mem_ctx, &data, &res, &timeout, NULL);
3924         if (ret != 0 || res != 0) {
3925                 return -1;
3926         }
3927
3928         if (data.dsize == 0) {
3929                 *name = NULL;
3930         } else {
3931                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3932         }
3933         talloc_free(data.dptr);
3934
3935         return 0;
3936 }
3937
3938 /*
3939   set the reclock filename for a node
3940  */
3941 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3942 {
3943         int ret;
3944         TDB_DATA data;
3945         int32_t res;
3946
3947         if (reclock == NULL) {
3948                 data.dsize = 0;
3949                 data.dptr  = NULL;
3950         } else {
3951                 data.dsize = strlen(reclock) + 1;
3952                 data.dptr  = discard_const(reclock);
3953         }
3954
3955         ret = ctdb_control(ctdb, destnode, 0, 
3956                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3957                            NULL, NULL, &res, &timeout, NULL);
3958         if (ret != 0 || res != 0) {
3959                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3960                 return -1;
3961         }
3962
3963         return 0;
3964 }
3965
3966 /*
3967   stop a node
3968  */
3969 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3970 {
3971         int ret;
3972         int32_t res;
3973
3974         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3975                            ctdb, NULL, &res, &timeout, NULL);
3976         if (ret != 0 || res != 0) {
3977                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3978                 return -1;
3979         }
3980
3981         return 0;
3982 }
3983
3984 /*
3985   continue a node
3986  */
3987 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3988 {
3989         int ret;
3990
3991         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3992                            ctdb, NULL, NULL, &timeout, NULL);
3993         if (ret != 0) {
3994                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3995                 return -1;
3996         }
3997
3998         return 0;
3999 }
4000
4001 /*
4002   set the natgw state for a node
4003  */
4004 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4005 {
4006         int ret;
4007         TDB_DATA data;
4008         int32_t res;
4009
4010         data.dsize = sizeof(natgwstate);
4011         data.dptr  = (uint8_t *)&natgwstate;
4012
4013         ret = ctdb_control(ctdb, destnode, 0, 
4014                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4015                            NULL, NULL, &res, &timeout, NULL);
4016         if (ret != 0 || res != 0) {
4017                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4018                 return -1;
4019         }
4020
4021         return 0;
4022 }
4023
4024 /*
4025   set the lmaster role for a node
4026  */
4027 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4028 {
4029         int ret;
4030         TDB_DATA data;
4031         int32_t res;
4032
4033         data.dsize = sizeof(lmasterrole);
4034         data.dptr  = (uint8_t *)&lmasterrole;
4035
4036         ret = ctdb_control(ctdb, destnode, 0, 
4037                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4038                            NULL, NULL, &res, &timeout, NULL);
4039         if (ret != 0 || res != 0) {
4040                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4041                 return -1;
4042         }
4043
4044         return 0;
4045 }
4046
4047 /*
4048   set the recmaster role for a node
4049  */
4050 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4051 {
4052         int ret;
4053         TDB_DATA data;
4054         int32_t res;
4055
4056         data.dsize = sizeof(recmasterrole);
4057         data.dptr  = (uint8_t *)&recmasterrole;
4058
4059         ret = ctdb_control(ctdb, destnode, 0, 
4060                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4061                            NULL, NULL, &res, &timeout, NULL);
4062         if (ret != 0 || res != 0) {
4063                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4064                 return -1;
4065         }
4066
4067         return 0;
4068 }
4069
4070 /* enable an eventscript
4071  */
4072 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4073 {
4074         int ret;
4075         TDB_DATA data;
4076         int32_t res;
4077
4078         data.dsize = strlen(script) + 1;
4079         data.dptr  = discard_const(script);
4080
4081         ret = ctdb_control(ctdb, destnode, 0, 
4082                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4083                            NULL, NULL, &res, &timeout, NULL);
4084         if (ret != 0 || res != 0) {
4085                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4086                 return -1;
4087         }
4088
4089         return 0;
4090 }
4091
4092 /* disable an eventscript
4093  */
4094 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4095 {
4096         int ret;
4097         TDB_DATA data;
4098         int32_t res;
4099
4100         data.dsize = strlen(script) + 1;
4101         data.dptr  = discard_const(script);
4102
4103         ret = ctdb_control(ctdb, destnode, 0, 
4104                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4105                            NULL, NULL, &res, &timeout, NULL);
4106         if (ret != 0 || res != 0) {
4107                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4108                 return -1;
4109         }
4110
4111         return 0;
4112 }
4113
4114
4115 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4116 {
4117         int ret;
4118         TDB_DATA data;
4119         int32_t res;
4120
4121         data.dsize = sizeof(*bantime);
4122         data.dptr  = (uint8_t *)bantime;
4123
4124         ret = ctdb_control(ctdb, destnode, 0, 
4125                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4126                            NULL, NULL, &res, &timeout, NULL);
4127         if (ret != 0 || res != 0) {
4128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4129                 return -1;
4130         }
4131
4132         return 0;
4133 }
4134
4135
4136 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4137 {
4138         int ret;
4139         TDB_DATA outdata;
4140         int32_t res;
4141         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4142
4143         ret = ctdb_control(ctdb, destnode, 0, 
4144                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4145                            tmp_ctx, &outdata, &res, &timeout, NULL);
4146         if (ret != 0 || res != 0) {
4147                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4148                 talloc_free(tmp_ctx);
4149                 return -1;
4150         }
4151
4152         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4153         talloc_free(tmp_ctx);
4154
4155         return 0;
4156 }
4157
4158
4159 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4160 {
4161         int ret;
4162         int32_t res;
4163         TDB_DATA data;
4164         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4165
4166         data.dptr = (uint8_t*)db_prio;
4167         data.dsize = sizeof(*db_prio);
4168
4169         ret = ctdb_control(ctdb, destnode, 0, 
4170                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4171                            tmp_ctx, NULL, &res, &timeout, NULL);
4172         if (ret != 0 || res != 0) {
4173                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4174                 talloc_free(tmp_ctx);
4175                 return -1;
4176         }
4177
4178         talloc_free(tmp_ctx);
4179
4180         return 0;
4181 }
4182
4183 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4184 {
4185         int ret;
4186         int32_t res;
4187         TDB_DATA data;
4188         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4189
4190         data.dptr = (uint8_t*)&db_id;
4191         data.dsize = sizeof(db_id);
4192
4193         ret = ctdb_control(ctdb, destnode, 0, 
4194                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4195                            tmp_ctx, NULL, &res, &timeout, NULL);
4196         if (ret != 0 || res < 0) {
4197                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4198                 talloc_free(tmp_ctx);
4199                 return -1;
4200         }
4201
4202         if (priority) {
4203                 *priority = res;
4204         }
4205
4206         talloc_free(tmp_ctx);
4207
4208         return 0;
4209 }