ReadOnly: Add printing of the record flags when we are traversing a database to print...
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return 0;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
391
392         return state;
393 }
394
395 /*
396   make a ctdb call to the local daemon - async send. Called from client context.
397
398   This constructs a ctdb_call request and queues it for processing. 
399   This call never blocks.
400 */
401 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
402                                               struct ctdb_call *call)
403 {
404         struct ctdb_client_call_state *state;
405         struct ctdb_context *ctdb = ctdb_db->ctdb;
406         struct ctdb_ltdb_header header;
407         TDB_DATA data;
408         int ret;
409         size_t len;
410         struct ctdb_req_call *c;
411
412         /* if the domain socket is not yet open, open it */
413         if (ctdb->daemon.sd==-1) {
414                 ctdb_socket_connect(ctdb);
415         }
416
417         ret = ctdb_ltdb_lock(ctdb_db, call->key);
418         if (ret != 0) {
419                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
420                 return NULL;
421         }
422
423         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
424
425         if (ret == 0 && header.dmaster == ctdb->pnn) {
426                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
427                 talloc_free(data.dptr);
428                 ctdb_ltdb_unlock(ctdb_db, call->key);
429                 return state;
430         }
431
432         ctdb_ltdb_unlock(ctdb_db, call->key);
433         talloc_free(data.dptr);
434
435         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
436         if (state == NULL) {
437                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
438                 return NULL;
439         }
440         state->call = talloc_zero(state, struct ctdb_call);
441         if (state->call == NULL) {
442                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
443                 return NULL;
444         }
445
446         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
447         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
448         if (c == NULL) {
449                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
450                 return NULL;
451         }
452
453         state->reqid     = ctdb_reqid_new(ctdb, state);
454         state->ctdb_db = ctdb_db;
455         talloc_set_destructor(state, ctdb_client_call_destructor);
456
457         c->hdr.reqid     = state->reqid;
458         c->flags         = call->flags;
459         c->db_id         = ctdb_db->db_id;
460         c->callid        = call->call_id;
461         c->hopcount      = 0;
462         c->keylen        = call->key.dsize;
463         c->calldatalen   = call->call_data.dsize;
464         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
465         memcpy(&c->data[call->key.dsize], 
466                call->call_data.dptr, call->call_data.dsize);
467         *(state->call)              = *call;
468         state->call->call_data.dptr = &c->data[call->key.dsize];
469         state->call->key.dptr       = &c->data[0];
470
471         state->state  = CTDB_CALL_WAIT;
472
473
474         ctdb_client_queue_pkt(ctdb, &c->hdr);
475
476         return state;
477 }
478
479
480 /*
481   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
482 */
483 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
484 {
485         struct ctdb_client_call_state *state;
486
487         state = ctdb_call_send(ctdb_db, call);
488         return ctdb_call_recv(state, call);
489 }
490
491
492 /*
493   tell the daemon what messaging srvid we will use, and register the message
494   handler function in the client
495 */
496 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
497                              ctdb_msg_fn_t handler,
498                              void *private_data)
499                                     
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
513 }
514
515 /*
516   tell the daemon we no longer want a srvid
517 */
518 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
519 {
520         int res;
521         int32_t status;
522         
523         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
524                            tdb_null, NULL, NULL, &status, NULL, NULL);
525         if (res != 0 || status != 0) {
526                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
527                 return -1;
528         }
529
530         /* also need to register the handler with our own ctdb structure */
531         ctdb_deregister_message_handler(ctdb, srvid, private_data);
532         return 0;
533 }
534
535
536 /*
537   send a message - from client context
538  */
539 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
540                       uint64_t srvid, TDB_DATA data)
541 {
542         struct ctdb_req_message *r;
543         int len, res;
544
545         len = offsetof(struct ctdb_req_message, data) + data.dsize;
546         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
547                                len, struct ctdb_req_message);
548         CTDB_NO_MEMORY(ctdb, r);
549
550         r->hdr.destnode  = pnn;
551         r->srvid         = srvid;
552         r->datalen       = data.dsize;
553         memcpy(&r->data[0], data.dptr, data.dsize);
554         
555         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
556         if (res != 0) {
557                 return res;
558         }
559
560         talloc_free(r);
561         return 0;
562 }
563
564
565 /*
566   cancel a ctdb_fetch_lock operation, releasing the lock
567  */
568 static int fetch_lock_destructor(struct ctdb_record_handle *h)
569 {
570         ctdb_ltdb_unlock(h->ctdb_db, h->key);
571         return 0;
572 }
573
574 /*
575   force the migration of a record to this node
576  */
577 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
578 {
579         struct ctdb_call call;
580         ZERO_STRUCT(call);
581         call.call_id = CTDB_NULL_FUNC;
582         call.key = key;
583         call.flags = CTDB_IMMEDIATE_MIGRATION;
584         return ctdb_call(ctdb_db, &call);
585 }
586
587 /*
588   get a lock on a record, and return the records data. Blocks until it gets the lock
589  */
590 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
591                                            TDB_DATA key, TDB_DATA *data)
592 {
593         int ret;
594         struct ctdb_record_handle *h;
595
596         /*
597           procedure is as follows:
598
599           1) get the chain lock. 
600           2) check if we are dmaster
601           3) if we are the dmaster then return handle 
602           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
603              reply from ctdbd
604           5) when we get the reply, goto (1)
605          */
606
607         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
608         if (h == NULL) {
609                 return NULL;
610         }
611
612         h->ctdb_db = ctdb_db;
613         h->key     = key;
614         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
615         if (h->key.dptr == NULL) {
616                 talloc_free(h);
617                 return NULL;
618         }
619         h->data    = data;
620
621         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
622                  (const char *)key.dptr));
623
624 again:
625         /* step 1 - get the chain lock */
626         ret = ctdb_ltdb_lock(ctdb_db, key);
627         if (ret != 0) {
628                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
629                 talloc_free(h);
630                 return NULL;
631         }
632
633         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
634
635         talloc_set_destructor(h, fetch_lock_destructor);
636
637         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
638
639         /* when torturing, ensure we test the remote path */
640         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
641             random() % 5 == 0) {
642                 h->header.dmaster = (uint32_t)-1;
643         }
644
645
646         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
647
648         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
649                 ctdb_ltdb_unlock(ctdb_db, key);
650                 ret = ctdb_client_force_migration(ctdb_db, key);
651                 if (ret != 0) {
652                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
653                         talloc_free(h);
654                         return NULL;
655                 }
656                 goto again;
657         }
658
659         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
660         return h;
661 }
662
663 /*
664   store some data to the record that was locked with ctdb_fetch_lock()
665 */
666 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
667 {
668         if (h->ctdb_db->persistent) {
669                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
670                 return -1;
671         }
672
673         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
674 }
675
676 /*
677   non-locking fetch of a record
678  */
679 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
680                TDB_DATA key, TDB_DATA *data)
681 {
682         struct ctdb_call call;
683         int ret;
684
685         call.call_id = CTDB_FETCH_FUNC;
686         call.call_data.dptr = NULL;
687         call.call_data.dsize = 0;
688
689         ret = ctdb_call(ctdb_db, &call);
690
691         if (ret == 0) {
692                 *data = call.reply_data;
693                 talloc_steal(mem_ctx, data->dptr);
694         }
695
696         return ret;
697 }
698
699
700
701 /*
702    called when a control completes or timesout to invoke the callback
703    function the user provided
704 */
705 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
706         struct timeval t, void *private_data)
707 {
708         struct ctdb_client_control_state *state;
709         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
710         int ret;
711
712         state = talloc_get_type(private_data, struct ctdb_client_control_state);
713         talloc_steal(tmp_ctx, state);
714
715         ret = ctdb_control_recv(state->ctdb, state, state,
716                         NULL, 
717                         NULL, 
718                         NULL);
719
720         talloc_free(tmp_ctx);
721 }
722
723 /*
724   called when a CTDB_REPLY_CONTROL packet comes in in the client
725
726   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
727   contains any reply data from the control
728 */
729 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
730                                       struct ctdb_req_header *hdr)
731 {
732         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
733         struct ctdb_client_control_state *state;
734
735         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
736         if (state == NULL) {
737                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
738                 return;
739         }
740
741         if (hdr->reqid != state->reqid) {
742                 /* we found a record  but it was the wrong one */
743                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
744                 return;
745         }
746
747         state->outdata.dptr = c->data;
748         state->outdata.dsize = c->datalen;
749         state->status = c->status;
750         if (c->errorlen) {
751                 state->errormsg = talloc_strndup(state, 
752                                                  (char *)&c->data[c->datalen], 
753                                                  c->errorlen);
754         }
755
756         /* state->outdata now uses resources from c so we dont want c
757            to just dissappear from under us while state is still alive
758         */
759         talloc_steal(state, c);
760
761         state->state = CTDB_CONTROL_DONE;
762
763         /* if we had a callback registered for this control, pull the response
764            and call the callback.
765         */
766         if (state->async.fn) {
767                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
768         }
769 }
770
771
772 /*
773   destroy a ctdb_control in client
774 */
775 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
776 {
777         ctdb_reqid_remove(state->ctdb, state->reqid);
778         return 0;
779 }
780
781
782 /* time out handler for ctdb_control */
783 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
784         struct timeval t, void *private_data)
785 {
786         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
787
788         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
789                          "dstnode:%u\n", state->reqid, state->c->opcode,
790                          state->c->hdr.destnode));
791
792         state->state = CTDB_CONTROL_TIMEOUT;
793
794         /* if we had a callback registered for this control, pull the response
795            and call the callback.
796         */
797         if (state->async.fn) {
798                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
799         }
800 }
801
802 /* async version of send control request */
803 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
804                 uint32_t destnode, uint64_t srvid, 
805                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
806                 TALLOC_CTX *mem_ctx,
807                 struct timeval *timeout,
808                 char **errormsg)
809 {
810         struct ctdb_client_control_state *state;
811         size_t len;
812         struct ctdb_req_control *c;
813         int ret;
814
815         if (errormsg) {
816                 *errormsg = NULL;
817         }
818
819         /* if the domain socket is not yet open, open it */
820         if (ctdb->daemon.sd==-1) {
821                 ctdb_socket_connect(ctdb);
822         }
823
824         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
825         CTDB_NO_MEMORY_NULL(ctdb, state);
826
827         state->ctdb       = ctdb;
828         state->reqid      = ctdb_reqid_new(ctdb, state);
829         state->state      = CTDB_CONTROL_WAIT;
830         state->errormsg   = NULL;
831
832         talloc_set_destructor(state, ctdb_control_destructor);
833
834         len = offsetof(struct ctdb_req_control, data) + data.dsize;
835         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
836                                len, struct ctdb_req_control);
837         state->c            = c;        
838         CTDB_NO_MEMORY_NULL(ctdb, c);
839         c->hdr.reqid        = state->reqid;
840         c->hdr.destnode     = destnode;
841         c->opcode           = opcode;
842         c->client_id        = 0;
843         c->flags            = flags;
844         c->srvid            = srvid;
845         c->datalen          = data.dsize;
846         if (data.dsize) {
847                 memcpy(&c->data[0], data.dptr, data.dsize);
848         }
849
850         /* timeout */
851         if (timeout && !timeval_is_zero(timeout)) {
852                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
853         }
854
855         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
856         if (ret != 0) {
857                 talloc_free(state);
858                 return NULL;
859         }
860
861         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
862                 talloc_free(state);
863                 return NULL;
864         }
865
866         return state;
867 }
868
869
870 /* async version of receive control reply */
871 int ctdb_control_recv(struct ctdb_context *ctdb, 
872                 struct ctdb_client_control_state *state, 
873                 TALLOC_CTX *mem_ctx,
874                 TDB_DATA *outdata, int32_t *status, char **errormsg)
875 {
876         TALLOC_CTX *tmp_ctx;
877
878         if (status != NULL) {
879                 *status = -1;
880         }
881         if (errormsg != NULL) {
882                 *errormsg = NULL;
883         }
884
885         if (state == NULL) {
886                 return -1;
887         }
888
889         /* prevent double free of state */
890         tmp_ctx = talloc_new(ctdb);
891         talloc_steal(tmp_ctx, state);
892
893         /* loop one event at a time until we either timeout or the control
894            completes.
895         */
896         while (state->state == CTDB_CONTROL_WAIT) {
897                 event_loop_once(ctdb->ev);
898         }
899
900         if (state->state != CTDB_CONTROL_DONE) {
901                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
902                 if (state->async.fn) {
903                         state->async.fn(state);
904                 }
905                 talloc_free(tmp_ctx);
906                 return -1;
907         }
908
909         if (state->errormsg) {
910                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
911                 if (errormsg) {
912                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
913                 }
914                 if (state->async.fn) {
915                         state->async.fn(state);
916                 }
917                 talloc_free(tmp_ctx);
918                 return -1;
919         }
920
921         if (outdata) {
922                 *outdata = state->outdata;
923                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
924         }
925
926         if (status) {
927                 *status = state->status;
928         }
929
930         if (state->async.fn) {
931                 state->async.fn(state);
932         }
933
934         talloc_free(tmp_ctx);
935         return 0;
936 }
937
938
939
940 /*
941   send a ctdb control message
942   timeout specifies how long we should wait for a reply.
943   if timeout is NULL we wait indefinitely
944  */
945 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
946                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
947                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
948                  struct timeval *timeout,
949                  char **errormsg)
950 {
951         struct ctdb_client_control_state *state;
952
953         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
954                         flags, data, mem_ctx,
955                         timeout, errormsg);
956         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
957                         errormsg);
958 }
959
960
961
962
963 /*
964   a process exists call. Returns 0 if process exists, -1 otherwise
965  */
966 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
967 {
968         int ret;
969         TDB_DATA data;
970         int32_t status;
971
972         data.dptr = (uint8_t*)&pid;
973         data.dsize = sizeof(pid);
974
975         ret = ctdb_control(ctdb, destnode, 0, 
976                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
977                            NULL, NULL, &status, NULL, NULL);
978         if (ret != 0) {
979                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
980                 return -1;
981         }
982
983         return status;
984 }
985
986 /*
987   get remote statistics
988  */
989 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
990 {
991         int ret;
992         TDB_DATA data;
993         int32_t res;
994
995         ret = ctdb_control(ctdb, destnode, 0, 
996                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
997                            ctdb, &data, &res, NULL, NULL);
998         if (ret != 0 || res != 0) {
999                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1000                 return -1;
1001         }
1002
1003         if (data.dsize != sizeof(struct ctdb_statistics)) {
1004                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1005                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1006                       return -1;
1007         }
1008
1009         *status = *(struct ctdb_statistics *)data.dptr;
1010         talloc_free(data.dptr);
1011                         
1012         return 0;
1013 }
1014
1015 /*
1016   shutdown a remote ctdb node
1017  */
1018 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1019 {
1020         struct ctdb_client_control_state *state;
1021
1022         state = ctdb_control_send(ctdb, destnode, 0, 
1023                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1024                            NULL, &timeout, NULL);
1025         if (state == NULL) {
1026                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1027                 return -1;
1028         }
1029
1030         return 0;
1031 }
1032
1033 /*
1034   get vnn map from a remote node
1035  */
1036 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1037 {
1038         int ret;
1039         TDB_DATA outdata;
1040         int32_t res;
1041         struct ctdb_vnn_map_wire *map;
1042
1043         ret = ctdb_control(ctdb, destnode, 0, 
1044                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1045                            mem_ctx, &outdata, &res, &timeout, NULL);
1046         if (ret != 0 || res != 0) {
1047                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1048                 return -1;
1049         }
1050         
1051         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1052         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1053             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1054                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1055                 return -1;
1056         }
1057
1058         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1059         CTDB_NO_MEMORY(ctdb, *vnnmap);
1060         (*vnnmap)->generation = map->generation;
1061         (*vnnmap)->size       = map->size;
1062         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1063
1064         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1065         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1066         talloc_free(outdata.dptr);
1067                     
1068         return 0;
1069 }
1070
1071
1072 /*
1073   get the recovery mode of a remote node
1074  */
1075 struct ctdb_client_control_state *
1076 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1077 {
1078         return ctdb_control_send(ctdb, destnode, 0, 
1079                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1080                            mem_ctx, &timeout, NULL);
1081 }
1082
1083 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1084 {
1085         int ret;
1086         int32_t res;
1087
1088         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1089         if (ret != 0) {
1090                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1091                 return -1;
1092         }
1093
1094         if (recmode) {
1095                 *recmode = (uint32_t)res;
1096         }
1097
1098         return 0;
1099 }
1100
1101 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1102 {
1103         struct ctdb_client_control_state *state;
1104
1105         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1106         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1107 }
1108
1109
1110
1111
1112 /*
1113   set the recovery mode of a remote node
1114  */
1115 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1116 {
1117         int ret;
1118         TDB_DATA data;
1119         int32_t res;
1120
1121         data.dsize = sizeof(uint32_t);
1122         data.dptr = (unsigned char *)&recmode;
1123
1124         ret = ctdb_control(ctdb, destnode, 0, 
1125                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1126                            NULL, NULL, &res, &timeout, NULL);
1127         if (ret != 0 || res != 0) {
1128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1129                 return -1;
1130         }
1131
1132         return 0;
1133 }
1134
1135
1136
1137 /*
1138   get the recovery master of a remote node
1139  */
1140 struct ctdb_client_control_state *
1141 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1142                         struct timeval timeout, uint32_t destnode)
1143 {
1144         return ctdb_control_send(ctdb, destnode, 0, 
1145                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1146                            mem_ctx, &timeout, NULL);
1147 }
1148
1149 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1150 {
1151         int ret;
1152         int32_t res;
1153
1154         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1155         if (ret != 0) {
1156                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1157                 return -1;
1158         }
1159
1160         if (recmaster) {
1161                 *recmaster = (uint32_t)res;
1162         }
1163
1164         return 0;
1165 }
1166
1167 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1168 {
1169         struct ctdb_client_control_state *state;
1170
1171         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1172         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1173 }
1174
1175
1176 /*
1177   set the recovery master of a remote node
1178  */
1179 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1180 {
1181         int ret;
1182         TDB_DATA data;
1183         int32_t res;
1184
1185         ZERO_STRUCT(data);
1186         data.dsize = sizeof(uint32_t);
1187         data.dptr = (unsigned char *)&recmaster;
1188
1189         ret = ctdb_control(ctdb, destnode, 0, 
1190                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1191                            NULL, NULL, &res, &timeout, NULL);
1192         if (ret != 0 || res != 0) {
1193                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1194                 return -1;
1195         }
1196
1197         return 0;
1198 }
1199
1200
1201 /*
1202   get a list of databases off a remote node
1203  */
1204 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1205                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1206 {
1207         int ret;
1208         TDB_DATA outdata;
1209         int32_t res;
1210
1211         ret = ctdb_control(ctdb, destnode, 0, 
1212                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1213                            mem_ctx, &outdata, &res, &timeout, NULL);
1214         if (ret != 0 || res != 0) {
1215                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1216                 return -1;
1217         }
1218
1219         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1220         talloc_free(outdata.dptr);
1221                     
1222         return 0;
1223 }
1224
1225 /*
1226   get a list of nodes (vnn and flags ) from a remote node
1227  */
1228 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1229                 struct timeval timeout, uint32_t destnode, 
1230                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1231 {
1232         int ret;
1233         TDB_DATA outdata;
1234         int32_t res;
1235
1236         ret = ctdb_control(ctdb, destnode, 0, 
1237                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1238                            mem_ctx, &outdata, &res, &timeout, NULL);
1239         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1240                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1241                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1242         }
1243         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1244                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1245                 return -1;
1246         }
1247
1248         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1249         talloc_free(outdata.dptr);
1250                     
1251         return 0;
1252 }
1253
1254 /*
1255   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1256  */
1257 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1258                 struct timeval timeout, uint32_t destnode, 
1259                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1260 {
1261         int ret, i, len;
1262         TDB_DATA outdata;
1263         struct ctdb_node_mapv4 *nodemapv4;
1264         int32_t res;
1265
1266         ret = ctdb_control(ctdb, destnode, 0, 
1267                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1268                            mem_ctx, &outdata, &res, &timeout, NULL);
1269         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1270                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1271                 return -1;
1272         }
1273
1274         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1275
1276         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1277         (*nodemap) = talloc_zero_size(mem_ctx, len);
1278         CTDB_NO_MEMORY(ctdb, (*nodemap));
1279
1280         (*nodemap)->num = nodemapv4->num;
1281         for (i=0; i<nodemapv4->num; i++) {
1282                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1283                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1284                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1285                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1286         }
1287                 
1288         talloc_free(outdata.dptr);
1289                     
1290         return 0;
1291 }
1292
1293 /*
1294   drop the transport, reload the nodes file and restart the transport
1295  */
1296 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1297                     struct timeval timeout, uint32_t destnode)
1298 {
1299         int ret;
1300         int32_t res;
1301
1302         ret = ctdb_control(ctdb, destnode, 0, 
1303                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1304                            NULL, NULL, &res, &timeout, NULL);
1305         if (ret != 0 || res != 0) {
1306                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1307                 return -1;
1308         }
1309
1310         return 0;
1311 }
1312
1313
1314 /*
1315   set vnn map on a node
1316  */
1317 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1318                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1319 {
1320         int ret;
1321         TDB_DATA data;
1322         int32_t res;
1323         struct ctdb_vnn_map_wire *map;
1324         size_t len;
1325
1326         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1327         map = talloc_size(mem_ctx, len);
1328         CTDB_NO_MEMORY(ctdb, map);
1329
1330         map->generation = vnnmap->generation;
1331         map->size = vnnmap->size;
1332         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1333         
1334         data.dsize = len;
1335         data.dptr  = (uint8_t *)map;
1336
1337         ret = ctdb_control(ctdb, destnode, 0, 
1338                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1339                            NULL, NULL, &res, &timeout, NULL);
1340         if (ret != 0 || res != 0) {
1341                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1342                 return -1;
1343         }
1344
1345         talloc_free(map);
1346
1347         return 0;
1348 }
1349
1350
1351 /*
1352   async send for pull database
1353  */
1354 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1355         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1356         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1357 {
1358         TDB_DATA indata;
1359         struct ctdb_control_pulldb *pull;
1360         struct ctdb_client_control_state *state;
1361
1362         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1363         CTDB_NO_MEMORY_NULL(ctdb, pull);
1364
1365         pull->db_id   = dbid;
1366         pull->lmaster = lmaster;
1367
1368         indata.dsize = sizeof(struct ctdb_control_pulldb);
1369         indata.dptr  = (unsigned char *)pull;
1370
1371         state = ctdb_control_send(ctdb, destnode, 0, 
1372                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1373                                   mem_ctx, &timeout, NULL);
1374         talloc_free(pull);
1375
1376         return state;
1377 }
1378
1379 /*
1380   async recv for pull database
1381  */
1382 int ctdb_ctrl_pulldb_recv(
1383         struct ctdb_context *ctdb, 
1384         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1385         TDB_DATA *outdata)
1386 {
1387         int ret;
1388         int32_t res;
1389
1390         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1391         if ( (ret != 0) || (res != 0) ){
1392                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1393                 return -1;
1394         }
1395
1396         return 0;
1397 }
1398
1399 /*
1400   pull all keys and records for a specific database on a node
1401  */
1402 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1403                 uint32_t dbid, uint32_t lmaster, 
1404                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1405                 TDB_DATA *outdata)
1406 {
1407         struct ctdb_client_control_state *state;
1408
1409         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1410                                       timeout);
1411         
1412         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1413 }
1414
1415
1416 /*
1417   change dmaster for all keys in the database to the new value
1418  */
1419 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1420                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1421 {
1422         int ret;
1423         TDB_DATA indata;
1424         int32_t res;
1425
1426         indata.dsize = 2*sizeof(uint32_t);
1427         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1428
1429         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1430         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1431
1432         ret = ctdb_control(ctdb, destnode, 0, 
1433                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1434                            NULL, NULL, &res, &timeout, NULL);
1435         if (ret != 0 || res != 0) {
1436                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1437                 return -1;
1438         }
1439
1440         return 0;
1441 }
1442
1443 /*
1444   ping a node, return number of clients connected
1445  */
1446 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1447 {
1448         int ret;
1449         int32_t res;
1450
1451         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1452                            tdb_null, NULL, NULL, &res, NULL, NULL);
1453         if (ret != 0) {
1454                 return -1;
1455         }
1456         return res;
1457 }
1458
1459 /*
1460   find the real path to a ltdb 
1461  */
1462 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1463                    const char **path)
1464 {
1465         int ret;
1466         int32_t res;
1467         TDB_DATA data;
1468
1469         data.dptr = (uint8_t *)&dbid;
1470         data.dsize = sizeof(dbid);
1471
1472         ret = ctdb_control(ctdb, destnode, 0, 
1473                            CTDB_CONTROL_GETDBPATH, 0, data, 
1474                            mem_ctx, &data, &res, &timeout, NULL);
1475         if (ret != 0 || res != 0) {
1476                 return -1;
1477         }
1478
1479         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1480         if ((*path) == NULL) {
1481                 return -1;
1482         }
1483
1484         talloc_free(data.dptr);
1485
1486         return 0;
1487 }
1488
1489 /*
1490   find the name of a db 
1491  */
1492 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1493                    const char **name)
1494 {
1495         int ret;
1496         int32_t res;
1497         TDB_DATA data;
1498
1499         data.dptr = (uint8_t *)&dbid;
1500         data.dsize = sizeof(dbid);
1501
1502         ret = ctdb_control(ctdb, destnode, 0, 
1503                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1504                            mem_ctx, &data, &res, &timeout, NULL);
1505         if (ret != 0 || res != 0) {
1506                 return -1;
1507         }
1508
1509         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1510         if ((*name) == NULL) {
1511                 return -1;
1512         }
1513
1514         talloc_free(data.dptr);
1515
1516         return 0;
1517 }
1518
1519 /*
1520   get the health status of a db
1521  */
1522 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1523                           struct timeval timeout,
1524                           uint32_t destnode,
1525                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1526                           const char **reason)
1527 {
1528         int ret;
1529         int32_t res;
1530         TDB_DATA data;
1531
1532         data.dptr = (uint8_t *)&dbid;
1533         data.dsize = sizeof(dbid);
1534
1535         ret = ctdb_control(ctdb, destnode, 0,
1536                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1537                            mem_ctx, &data, &res, &timeout, NULL);
1538         if (ret != 0 || res != 0) {
1539                 return -1;
1540         }
1541
1542         if (data.dsize == 0) {
1543                 (*reason) = NULL;
1544                 return 0;
1545         }
1546
1547         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1548         if ((*reason) == NULL) {
1549                 return -1;
1550         }
1551
1552         talloc_free(data.dptr);
1553
1554         return 0;
1555 }
1556
1557 /*
1558   create a database
1559  */
1560 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1561                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1562 {
1563         int ret;
1564         int32_t res;
1565         TDB_DATA data;
1566
1567         data.dptr = discard_const(name);
1568         data.dsize = strlen(name)+1;
1569
1570         ret = ctdb_control(ctdb, destnode, 0, 
1571                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1572                            0, data, 
1573                            mem_ctx, &data, &res, &timeout, NULL);
1574
1575         if (ret != 0 || res != 0) {
1576                 return -1;
1577         }
1578
1579         return 0;
1580 }
1581
1582 /*
1583   get debug level on a node
1584  */
1585 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1586 {
1587         int ret;
1588         int32_t res;
1589         TDB_DATA data;
1590
1591         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1592                            ctdb, &data, &res, NULL, NULL);
1593         if (ret != 0 || res != 0) {
1594                 return -1;
1595         }
1596         if (data.dsize != sizeof(int32_t)) {
1597                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1598                          (unsigned)data.dsize));
1599                 return -1;
1600         }
1601         *level = *(int32_t *)data.dptr;
1602         talloc_free(data.dptr);
1603         return 0;
1604 }
1605
1606 /*
1607   set debug level on a node
1608  */
1609 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1610 {
1611         int ret;
1612         int32_t res;
1613         TDB_DATA data;
1614
1615         data.dptr = (uint8_t *)&level;
1616         data.dsize = sizeof(level);
1617
1618         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1619                            NULL, NULL, &res, NULL, NULL);
1620         if (ret != 0 || res != 0) {
1621                 return -1;
1622         }
1623         return 0;
1624 }
1625
1626
1627 /*
1628   get a list of connected nodes
1629  */
1630 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1631                                 struct timeval timeout,
1632                                 TALLOC_CTX *mem_ctx,
1633                                 uint32_t *num_nodes)
1634 {
1635         struct ctdb_node_map *map=NULL;
1636         int ret, i;
1637         uint32_t *nodes;
1638
1639         *num_nodes = 0;
1640
1641         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1642         if (ret != 0) {
1643                 return NULL;
1644         }
1645
1646         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1647         if (nodes == NULL) {
1648                 return NULL;
1649         }
1650
1651         for (i=0;i<map->num;i++) {
1652                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1653                         nodes[*num_nodes] = map->nodes[i].pnn;
1654                         (*num_nodes)++;
1655                 }
1656         }
1657
1658         return nodes;
1659 }
1660
1661
1662 /*
1663   reset remote status
1664  */
1665 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1666 {
1667         int ret;
1668         int32_t res;
1669
1670         ret = ctdb_control(ctdb, destnode, 0, 
1671                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1672                            NULL, NULL, &res, NULL, NULL);
1673         if (ret != 0 || res != 0) {
1674                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1675                 return -1;
1676         }
1677         return 0;
1678 }
1679
1680 /*
1681   this is the dummy null procedure that all databases support
1682 */
1683 static int ctdb_null_func(struct ctdb_call_info *call)
1684 {
1685         return 0;
1686 }
1687
1688 /*
1689   this is a plain fetch procedure that all databases support
1690 */
1691 static int ctdb_fetch_func(struct ctdb_call_info *call)
1692 {
1693         call->reply_data = &call->record_data;
1694         return 0;
1695 }
1696
1697 /*
1698   this is a plain fetch procedure that all databases support
1699   this returns the full record including the ltdb header
1700 */
1701 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
1702 {
1703         call->reply_data = talloc(call, TDB_DATA);
1704         if (call->reply_data == NULL) {
1705                 return -1;
1706         }
1707         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
1708         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
1709         if (call->reply_data->dptr == NULL) {
1710                 return -1;
1711         }
1712         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
1713         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
1714
1715         return 0;
1716 }
1717
1718 /*
1719   attach to a specific database - client call
1720 */
1721 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1722 {
1723         struct ctdb_db_context *ctdb_db;
1724         TDB_DATA data;
1725         int ret;
1726         int32_t res;
1727
1728         ctdb_db = ctdb_db_handle(ctdb, name);
1729         if (ctdb_db) {
1730                 return ctdb_db;
1731         }
1732
1733         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1734         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1735
1736         ctdb_db->ctdb = ctdb;
1737         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1738         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1739
1740         data.dptr = discard_const(name);
1741         data.dsize = strlen(name)+1;
1742
1743         /* tell ctdb daemon to attach */
1744         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1745                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1746                            0, data, ctdb_db, &data, &res, NULL, NULL);
1747         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1748                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1749                 talloc_free(ctdb_db);
1750                 return NULL;
1751         }
1752         
1753         ctdb_db->db_id = *(uint32_t *)data.dptr;
1754         talloc_free(data.dptr);
1755
1756         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1757         if (ret != 0) {
1758                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1759                 talloc_free(ctdb_db);
1760                 return NULL;
1761         }
1762
1763         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1764         if (ctdb->valgrinding) {
1765                 tdb_flags |= TDB_NOMMAP;
1766         }
1767         tdb_flags |= TDB_DISALLOW_NESTING;
1768
1769         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1770         if (ctdb_db->ltdb == NULL) {
1771                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1772                 talloc_free(ctdb_db);
1773                 return NULL;
1774         }
1775
1776         ctdb_db->persistent = persistent;
1777
1778         DLIST_ADD(ctdb->db_list, ctdb_db);
1779
1780         /* add well known functions */
1781         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1782         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1783         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1784
1785         return ctdb_db;
1786 }
1787
1788
1789 /*
1790   setup a call for a database
1791  */
1792 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1793 {
1794         struct ctdb_registered_call *call;
1795
1796 #if 0
1797         TDB_DATA data;
1798         int32_t status;
1799         struct ctdb_control_set_call c;
1800         int ret;
1801
1802         /* this is no longer valid with the separate daemon architecture */
1803         c.db_id = ctdb_db->db_id;
1804         c.fn    = fn;
1805         c.id    = id;
1806
1807         data.dptr = (uint8_t *)&c;
1808         data.dsize = sizeof(c);
1809
1810         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1811                            data, NULL, NULL, &status, NULL, NULL);
1812         if (ret != 0 || status != 0) {
1813                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1814                 return -1;
1815         }
1816 #endif
1817
1818         /* also register locally */
1819         call = talloc(ctdb_db, struct ctdb_registered_call);
1820         call->fn = fn;
1821         call->id = id;
1822
1823         DLIST_ADD(ctdb_db->calls, call);        
1824         return 0;
1825 }
1826
1827
1828 struct traverse_state {
1829         bool done;
1830         uint32_t count;
1831         ctdb_traverse_func fn;
1832         void *private_data;
1833 };
1834
1835 /*
1836   called on each key during a ctdb_traverse
1837  */
1838 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1839 {
1840         struct traverse_state *state = (struct traverse_state *)p;
1841         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1842         TDB_DATA key;
1843
1844         if (data.dsize < sizeof(uint32_t) ||
1845             d->length != data.dsize) {
1846                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1847                 state->done = True;
1848                 return;
1849         }
1850
1851         key.dsize = d->keylen;
1852         key.dptr  = &d->data[0];
1853         data.dsize = d->datalen;
1854         data.dptr = &d->data[d->keylen];
1855
1856         if (key.dsize == 0 && data.dsize == 0) {
1857                 /* end of traverse */
1858                 state->done = True;
1859                 return;
1860         }
1861
1862         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1863                 /* empty records are deleted records in ctdb */
1864                 return;
1865         }
1866
1867         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1868                 state->done = True;
1869         }
1870
1871         state->count++;
1872 }
1873
1874
1875 /*
1876   start a cluster wide traverse, calling the supplied fn on each record
1877   return the number of records traversed, or -1 on error
1878  */
1879 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1880 {
1881         TDB_DATA data;
1882         struct ctdb_traverse_start t;
1883         int32_t status;
1884         int ret;
1885         uint64_t srvid = (getpid() | 0xFLL<<60);
1886         struct traverse_state state;
1887
1888         state.done = False;
1889         state.count = 0;
1890         state.private_data = private_data;
1891         state.fn = fn;
1892
1893         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1894         if (ret != 0) {
1895                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1896                 return -1;
1897         }
1898
1899         t.db_id = ctdb_db->db_id;
1900         t.srvid = srvid;
1901         t.reqid = 0;
1902
1903         data.dptr = (uint8_t *)&t;
1904         data.dsize = sizeof(t);
1905
1906         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1907                            data, NULL, NULL, &status, NULL, NULL);
1908         if (ret != 0 || status != 0) {
1909                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1910                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1911                 return -1;
1912         }
1913
1914         while (!state.done) {
1915                 event_loop_once(ctdb_db->ctdb->ev);
1916         }
1917
1918         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1919         if (ret != 0) {
1920                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1921                 return -1;
1922         }
1923
1924         return state.count;
1925 }
1926
1927 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1928 /*
1929   called on each key during a catdb
1930  */
1931 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1932 {
1933         int i;
1934         FILE *f = (FILE *)p;
1935         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1936
1937         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1938         for (i=0;i<key.dsize;i++) {
1939                 if (ISASCII(key.dptr[i])) {
1940                         fprintf(f, "%c", key.dptr[i]);
1941                 } else {
1942                         fprintf(f, "\\%02X", key.dptr[i]);
1943                 }
1944         }
1945         fprintf(f, "\"\n");
1946
1947         fprintf(f, "dmaster: %u\n", h->dmaster);
1948         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1949         fprintf(f, "flags: 0x%08x", h->flags);
1950         if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
1951         if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
1952         if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
1953         if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
1954         if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
1955         if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
1956         if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
1957         fprintf(f, "\n");
1958
1959         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1960         for (i=sizeof(*h);i<data.dsize;i++) {
1961                 if (ISASCII(data.dptr[i])) {
1962                         fprintf(f, "%c", data.dptr[i]);
1963                 } else {
1964                         fprintf(f, "\\%02X", data.dptr[i]);
1965                 }
1966         }
1967         fprintf(f, "\"\n");
1968
1969         fprintf(f, "\n");
1970
1971         return 0;
1972 }
1973
1974 /*
1975   convenience function to list all keys to stdout
1976  */
1977 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1978 {
1979         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1980 }
1981
1982 /*
1983   get the pid of a ctdb daemon
1984  */
1985 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1986 {
1987         int ret;
1988         int32_t res;
1989
1990         ret = ctdb_control(ctdb, destnode, 0, 
1991                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1992                            NULL, NULL, &res, &timeout, NULL);
1993         if (ret != 0) {
1994                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1995                 return -1;
1996         }
1997
1998         *pid = res;
1999
2000         return 0;
2001 }
2002
2003
2004 /*
2005   async freeze send control
2006  */
2007 struct ctdb_client_control_state *
2008 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2009 {
2010         return ctdb_control_send(ctdb, destnode, priority, 
2011                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2012                            mem_ctx, &timeout, NULL);
2013 }
2014
2015 /* 
2016    async freeze recv control
2017 */
2018 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2019 {
2020         int ret;
2021         int32_t res;
2022
2023         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2024         if ( (ret != 0) || (res != 0) ){
2025                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2026                 return -1;
2027         }
2028
2029         return 0;
2030 }
2031
2032 /*
2033   freeze databases of a certain priority
2034  */
2035 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2036 {
2037         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2038         struct ctdb_client_control_state *state;
2039         int ret;
2040
2041         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2042         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2043         talloc_free(tmp_ctx);
2044
2045         return ret;
2046 }
2047
2048 /* Freeze all databases */
2049 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2050 {
2051         int i;
2052
2053         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2054                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2055                         return -1;
2056                 }
2057         }
2058         return 0;
2059 }
2060
2061 /*
2062   thaw databases of a certain priority
2063  */
2064 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2065 {
2066         int ret;
2067         int32_t res;
2068
2069         ret = ctdb_control(ctdb, destnode, priority, 
2070                            CTDB_CONTROL_THAW, 0, tdb_null, 
2071                            NULL, NULL, &res, &timeout, NULL);
2072         if (ret != 0 || res != 0) {
2073                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2074                 return -1;
2075         }
2076
2077         return 0;
2078 }
2079
2080 /* thaw all databases */
2081 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2082 {
2083         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2084 }
2085
2086 /*
2087   get pnn of a node, or -1
2088  */
2089 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2090 {
2091         int ret;
2092         int32_t res;
2093
2094         ret = ctdb_control(ctdb, destnode, 0, 
2095                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2096                            NULL, NULL, &res, &timeout, NULL);
2097         if (ret != 0) {
2098                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2099                 return -1;
2100         }
2101
2102         return res;
2103 }
2104
2105 /*
2106   get the monitoring mode of a remote node
2107  */
2108 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2109 {
2110         int ret;
2111         int32_t res;
2112
2113         ret = ctdb_control(ctdb, destnode, 0, 
2114                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2115                            NULL, NULL, &res, &timeout, NULL);
2116         if (ret != 0) {
2117                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2118                 return -1;
2119         }
2120
2121         *monmode = res;
2122
2123         return 0;
2124 }
2125
2126
2127 /*
2128  set the monitoring mode of a remote node to active
2129  */
2130 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2131 {
2132         int ret;
2133         
2134
2135         ret = ctdb_control(ctdb, destnode, 0, 
2136                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2137                            NULL, NULL,NULL, &timeout, NULL);
2138         if (ret != 0) {
2139                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2140                 return -1;
2141         }
2142
2143         
2144
2145         return 0;
2146 }
2147
2148 /*
2149   set the monitoring mode of a remote node to disable
2150  */
2151 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2152 {
2153         int ret;
2154         
2155
2156         ret = ctdb_control(ctdb, destnode, 0, 
2157                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2158                            NULL, NULL, NULL, &timeout, NULL);
2159         if (ret != 0) {
2160                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2161                 return -1;
2162         }
2163
2164         
2165
2166         return 0;
2167 }
2168
2169
2170
2171 /* 
2172   sent to a node to make it take over an ip address
2173 */
2174 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2175                           uint32_t destnode, struct ctdb_public_ip *ip)
2176 {
2177         TDB_DATA data;
2178         struct ctdb_public_ipv4 ipv4;
2179         int ret;
2180         int32_t res;
2181
2182         if (ip->addr.sa.sa_family == AF_INET) {
2183                 ipv4.pnn = ip->pnn;
2184                 ipv4.sin = ip->addr.ip;
2185
2186                 data.dsize = sizeof(ipv4);
2187                 data.dptr  = (uint8_t *)&ipv4;
2188
2189                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2190                            NULL, &res, &timeout, NULL);
2191         } else {
2192                 data.dsize = sizeof(*ip);
2193                 data.dptr  = (uint8_t *)ip;
2194
2195                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2196                            NULL, &res, &timeout, NULL);
2197         }
2198
2199         if (ret != 0 || res != 0) {
2200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2201                 return -1;
2202         }
2203
2204         return 0;       
2205 }
2206
2207
2208 /* 
2209   sent to a node to make it release an ip address
2210 */
2211 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2212                          uint32_t destnode, struct ctdb_public_ip *ip)
2213 {
2214         TDB_DATA data;
2215         struct ctdb_public_ipv4 ipv4;
2216         int ret;
2217         int32_t res;
2218
2219         if (ip->addr.sa.sa_family == AF_INET) {
2220                 ipv4.pnn = ip->pnn;
2221                 ipv4.sin = ip->addr.ip;
2222
2223                 data.dsize = sizeof(ipv4);
2224                 data.dptr  = (uint8_t *)&ipv4;
2225
2226                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2227                                    NULL, &res, &timeout, NULL);
2228         } else {
2229                 data.dsize = sizeof(*ip);
2230                 data.dptr  = (uint8_t *)ip;
2231
2232                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2233                                    NULL, &res, &timeout, NULL);
2234         }
2235
2236         if (ret != 0 || res != 0) {
2237                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2238                 return -1;
2239         }
2240
2241         return 0;       
2242 }
2243
2244
2245 /*
2246   get a tunable
2247  */
2248 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2249                           struct timeval timeout, 
2250                           uint32_t destnode,
2251                           const char *name, uint32_t *value)
2252 {
2253         struct ctdb_control_get_tunable *t;
2254         TDB_DATA data, outdata;
2255         int32_t res;
2256         int ret;
2257
2258         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2259         data.dptr  = talloc_size(ctdb, data.dsize);
2260         CTDB_NO_MEMORY(ctdb, data.dptr);
2261
2262         t = (struct ctdb_control_get_tunable *)data.dptr;
2263         t->length = strlen(name)+1;
2264         memcpy(t->name, name, t->length);
2265
2266         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2267                            &outdata, &res, &timeout, NULL);
2268         talloc_free(data.dptr);
2269         if (ret != 0 || res != 0) {
2270                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2271                 return -1;
2272         }
2273
2274         if (outdata.dsize != sizeof(uint32_t)) {
2275                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2276                 talloc_free(outdata.dptr);
2277                 return -1;
2278         }
2279         
2280         *value = *(uint32_t *)outdata.dptr;
2281         talloc_free(outdata.dptr);
2282
2283         return 0;
2284 }
2285
2286 /*
2287   set a tunable
2288  */
2289 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2290                           struct timeval timeout, 
2291                           uint32_t destnode,
2292                           const char *name, uint32_t value)
2293 {
2294         struct ctdb_control_set_tunable *t;
2295         TDB_DATA data;
2296         int32_t res;
2297         int ret;
2298
2299         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2300         data.dptr  = talloc_size(ctdb, data.dsize);
2301         CTDB_NO_MEMORY(ctdb, data.dptr);
2302
2303         t = (struct ctdb_control_set_tunable *)data.dptr;
2304         t->length = strlen(name)+1;
2305         memcpy(t->name, name, t->length);
2306         t->value = value;
2307
2308         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2309                            NULL, &res, &timeout, NULL);
2310         talloc_free(data.dptr);
2311         if (ret != 0 || res != 0) {
2312                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2313                 return -1;
2314         }
2315
2316         return 0;
2317 }
2318
2319 /*
2320   list tunables
2321  */
2322 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2323                             struct timeval timeout, 
2324                             uint32_t destnode,
2325                             TALLOC_CTX *mem_ctx,
2326                             const char ***list, uint32_t *count)
2327 {
2328         TDB_DATA outdata;
2329         int32_t res;
2330         int ret;
2331         struct ctdb_control_list_tunable *t;
2332         char *p, *s, *ptr;
2333
2334         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2335                            mem_ctx, &outdata, &res, &timeout, NULL);
2336         if (ret != 0 || res != 0) {
2337                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2338                 return -1;
2339         }
2340
2341         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2342         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2343             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2344                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2345                 talloc_free(outdata.dptr);
2346                 return -1;              
2347         }
2348         
2349         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2350         CTDB_NO_MEMORY(ctdb, p);
2351
2352         talloc_free(outdata.dptr);
2353         
2354         (*list) = NULL;
2355         (*count) = 0;
2356
2357         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2358                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2359                 CTDB_NO_MEMORY(ctdb, *list);
2360                 (*list)[*count] = talloc_strdup(*list, s);
2361                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2362                 (*count)++;
2363         }
2364
2365         talloc_free(p);
2366
2367         return 0;
2368 }
2369
2370
2371 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2372                                    struct timeval timeout, uint32_t destnode,
2373                                    TALLOC_CTX *mem_ctx,
2374                                    uint32_t flags,
2375                                    struct ctdb_all_public_ips **ips)
2376 {
2377         int ret;
2378         TDB_DATA outdata;
2379         int32_t res;
2380
2381         ret = ctdb_control(ctdb, destnode, 0, 
2382                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2383                            mem_ctx, &outdata, &res, &timeout, NULL);
2384         if (ret == 0 && res == -1) {
2385                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2386                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2387         }
2388         if (ret != 0 || res != 0) {
2389           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2390                 return -1;
2391         }
2392
2393         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2394         talloc_free(outdata.dptr);
2395                     
2396         return 0;
2397 }
2398
2399 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2400                              struct timeval timeout, uint32_t destnode,
2401                              TALLOC_CTX *mem_ctx,
2402                              struct ctdb_all_public_ips **ips)
2403 {
2404         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2405                                               destnode, mem_ctx,
2406                                               0, ips);
2407 }
2408
2409 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2410                         struct timeval timeout, uint32_t destnode, 
2411                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2412 {
2413         int ret, i, len;
2414         TDB_DATA outdata;
2415         int32_t res;
2416         struct ctdb_all_public_ipsv4 *ipsv4;
2417
2418         ret = ctdb_control(ctdb, destnode, 0, 
2419                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2420                            mem_ctx, &outdata, &res, &timeout, NULL);
2421         if (ret != 0 || res != 0) {
2422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2423                 return -1;
2424         }
2425
2426         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2427         len = offsetof(struct ctdb_all_public_ips, ips) +
2428                 ipsv4->num*sizeof(struct ctdb_public_ip);
2429         *ips = talloc_zero_size(mem_ctx, len);
2430         CTDB_NO_MEMORY(ctdb, *ips);
2431         (*ips)->num = ipsv4->num;
2432         for (i=0; i<ipsv4->num; i++) {
2433                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2434                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2435         }
2436
2437         talloc_free(outdata.dptr);
2438                     
2439         return 0;
2440 }
2441
2442 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2443                                  struct timeval timeout, uint32_t destnode,
2444                                  TALLOC_CTX *mem_ctx,
2445                                  const ctdb_sock_addr *addr,
2446                                  struct ctdb_control_public_ip_info **_info)
2447 {
2448         int ret;
2449         TDB_DATA indata;
2450         TDB_DATA outdata;
2451         int32_t res;
2452         struct ctdb_control_public_ip_info *info;
2453         uint32_t len;
2454         uint32_t i;
2455
2456         indata.dptr = discard_const_p(uint8_t, addr);
2457         indata.dsize = sizeof(*addr);
2458
2459         ret = ctdb_control(ctdb, destnode, 0,
2460                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2461                            mem_ctx, &outdata, &res, &timeout, NULL);
2462         if (ret != 0 || res != 0) {
2463                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2464                                 "failed ret:%d res:%d\n",
2465                                 ret, res));
2466                 return -1;
2467         }
2468
2469         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2470         if (len > outdata.dsize) {
2471                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2472                                 "returned invalid data with size %u > %u\n",
2473                                 (unsigned int)outdata.dsize,
2474                                 (unsigned int)len));
2475                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2476                 return -1;
2477         }
2478
2479         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2480         len += info->num*sizeof(struct ctdb_control_iface_info);
2481
2482         if (len > outdata.dsize) {
2483                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2484                                 "returned invalid data with size %u > %u\n",
2485                                 (unsigned int)outdata.dsize,
2486                                 (unsigned int)len));
2487                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2488                 return -1;
2489         }
2490
2491         /* make sure we null terminate the returned strings */
2492         for (i=0; i < info->num; i++) {
2493                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2494         }
2495
2496         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2497                                                                 outdata.dptr,
2498                                                                 outdata.dsize);
2499         talloc_free(outdata.dptr);
2500         if (*_info == NULL) {
2501                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2502                                 "talloc_memdup size %u failed\n",
2503                                 (unsigned int)outdata.dsize));
2504                 return -1;
2505         }
2506
2507         return 0;
2508 }
2509
2510 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2511                          struct timeval timeout, uint32_t destnode,
2512                          TALLOC_CTX *mem_ctx,
2513                          struct ctdb_control_get_ifaces **_ifaces)
2514 {
2515         int ret;
2516         TDB_DATA outdata;
2517         int32_t res;
2518         struct ctdb_control_get_ifaces *ifaces;
2519         uint32_t len;
2520         uint32_t i;
2521
2522         ret = ctdb_control(ctdb, destnode, 0,
2523                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2524                            mem_ctx, &outdata, &res, &timeout, NULL);
2525         if (ret != 0 || res != 0) {
2526                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2527                                 "failed ret:%d res:%d\n",
2528                                 ret, res));
2529                 return -1;
2530         }
2531
2532         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2533         if (len > outdata.dsize) {
2534                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2535                                 "returned invalid data with size %u > %u\n",
2536                                 (unsigned int)outdata.dsize,
2537                                 (unsigned int)len));
2538                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2539                 return -1;
2540         }
2541
2542         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2543         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2544
2545         if (len > outdata.dsize) {
2546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2547                                 "returned invalid data with size %u > %u\n",
2548                                 (unsigned int)outdata.dsize,
2549                                 (unsigned int)len));
2550                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2551                 return -1;
2552         }
2553
2554         /* make sure we null terminate the returned strings */
2555         for (i=0; i < ifaces->num; i++) {
2556                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2557         }
2558
2559         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2560                                                                   outdata.dptr,
2561                                                                   outdata.dsize);
2562         talloc_free(outdata.dptr);
2563         if (*_ifaces == NULL) {
2564                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2565                                 "talloc_memdup size %u failed\n",
2566                                 (unsigned int)outdata.dsize));
2567                 return -1;
2568         }
2569
2570         return 0;
2571 }
2572
2573 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2574                              struct timeval timeout, uint32_t destnode,
2575                              TALLOC_CTX *mem_ctx,
2576                              const struct ctdb_control_iface_info *info)
2577 {
2578         int ret;
2579         TDB_DATA indata;
2580         int32_t res;
2581
2582         indata.dptr = discard_const_p(uint8_t, info);
2583         indata.dsize = sizeof(*info);
2584
2585         ret = ctdb_control(ctdb, destnode, 0,
2586                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2587                            mem_ctx, NULL, &res, &timeout, NULL);
2588         if (ret != 0 || res != 0) {
2589                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2590                                 "failed ret:%d res:%d\n",
2591                                 ret, res));
2592                 return -1;
2593         }
2594
2595         return 0;
2596 }
2597
2598 /*
2599   set/clear the permanent disabled bit on a remote node
2600  */
2601 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2602                        uint32_t set, uint32_t clear)
2603 {
2604         int ret;
2605         TDB_DATA data;
2606         struct ctdb_node_map *nodemap=NULL;
2607         struct ctdb_node_flag_change c;
2608         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2609         uint32_t recmaster;
2610         uint32_t *nodes;
2611
2612
2613         /* find the recovery master */
2614         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2615         if (ret != 0) {
2616                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2617                 talloc_free(tmp_ctx);
2618                 return ret;
2619         }
2620
2621
2622         /* read the node flags from the recmaster */
2623         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2624         if (ret != 0) {
2625                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2626                 talloc_free(tmp_ctx);
2627                 return -1;
2628         }
2629         if (destnode >= nodemap->num) {
2630                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2631                 talloc_free(tmp_ctx);
2632                 return -1;
2633         }
2634
2635         c.pnn       = destnode;
2636         c.old_flags = nodemap->nodes[destnode].flags;
2637         c.new_flags = c.old_flags;
2638         c.new_flags |= set;
2639         c.new_flags &= ~clear;
2640
2641         data.dsize = sizeof(c);
2642         data.dptr = (unsigned char *)&c;
2643
2644         /* send the flags update to all connected nodes */
2645         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2646
2647         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2648                                         nodes, 0,
2649                                         timeout, false, data,
2650                                         NULL, NULL,
2651                                         NULL) != 0) {
2652                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2653
2654                 talloc_free(tmp_ctx);
2655                 return -1;
2656         }
2657
2658         talloc_free(tmp_ctx);
2659         return 0;
2660 }
2661
2662
2663 /*
2664   get all tunables
2665  */
2666 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2667                                struct timeval timeout, 
2668                                uint32_t destnode,
2669                                struct ctdb_tunable *tunables)
2670 {
2671         TDB_DATA outdata;
2672         int ret;
2673         int32_t res;
2674
2675         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2676                            &outdata, &res, &timeout, NULL);
2677         if (ret != 0 || res != 0) {
2678                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2679                 return -1;
2680         }
2681
2682         if (outdata.dsize != sizeof(*tunables)) {
2683                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2684                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2685                 return -1;              
2686         }
2687
2688         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2689         talloc_free(outdata.dptr);
2690         return 0;
2691 }
2692
2693 /*
2694   add a public address to a node
2695  */
2696 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2697                       struct timeval timeout, 
2698                       uint32_t destnode,
2699                       struct ctdb_control_ip_iface *pub)
2700 {
2701         TDB_DATA data;
2702         int32_t res;
2703         int ret;
2704
2705         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2706         data.dptr  = (unsigned char *)pub;
2707
2708         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2709                            NULL, &res, &timeout, NULL);
2710         if (ret != 0 || res != 0) {
2711                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2712                 return -1;
2713         }
2714
2715         return 0;
2716 }
2717
2718 /*
2719   delete a public address from a node
2720  */
2721 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2722                       struct timeval timeout, 
2723                       uint32_t destnode,
2724                       struct ctdb_control_ip_iface *pub)
2725 {
2726         TDB_DATA data;
2727         int32_t res;
2728         int ret;
2729
2730         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2731         data.dptr  = (unsigned char *)pub;
2732
2733         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2734                            NULL, &res, &timeout, NULL);
2735         if (ret != 0 || res != 0) {
2736                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2737                 return -1;
2738         }
2739
2740         return 0;
2741 }
2742
2743 /*
2744   kill a tcp connection
2745  */
2746 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2747                       struct timeval timeout, 
2748                       uint32_t destnode,
2749                       struct ctdb_control_killtcp *killtcp)
2750 {
2751         TDB_DATA data;
2752         int32_t res;
2753         int ret;
2754
2755         data.dsize = sizeof(struct ctdb_control_killtcp);
2756         data.dptr  = (unsigned char *)killtcp;
2757
2758         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2759                            NULL, &res, &timeout, NULL);
2760         if (ret != 0 || res != 0) {
2761                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2762                 return -1;
2763         }
2764
2765         return 0;
2766 }
2767
2768 /*
2769   send a gratious arp
2770  */
2771 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2772                       struct timeval timeout, 
2773                       uint32_t destnode,
2774                       ctdb_sock_addr *addr,
2775                       const char *ifname)
2776 {
2777         TDB_DATA data;
2778         int32_t res;
2779         int ret, len;
2780         struct ctdb_control_gratious_arp *gratious_arp;
2781         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2782
2783
2784         len = strlen(ifname)+1;
2785         gratious_arp = talloc_size(tmp_ctx, 
2786                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2787         CTDB_NO_MEMORY(ctdb, gratious_arp);
2788
2789         gratious_arp->addr = *addr;
2790         gratious_arp->len = len;
2791         memcpy(&gratious_arp->iface[0], ifname, len);
2792
2793
2794         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2795         data.dptr  = (unsigned char *)gratious_arp;
2796
2797         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2798                            NULL, &res, &timeout, NULL);
2799         if (ret != 0 || res != 0) {
2800                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2801                 talloc_free(tmp_ctx);
2802                 return -1;
2803         }
2804
2805         talloc_free(tmp_ctx);
2806         return 0;
2807 }
2808
2809 /*
2810   get a list of all tcp tickles that a node knows about for a particular vnn
2811  */
2812 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2813                               struct timeval timeout, uint32_t destnode, 
2814                               TALLOC_CTX *mem_ctx, 
2815                               ctdb_sock_addr *addr,
2816                               struct ctdb_control_tcp_tickle_list **list)
2817 {
2818         int ret;
2819         TDB_DATA data, outdata;
2820         int32_t status;
2821
2822         data.dptr = (uint8_t*)addr;
2823         data.dsize = sizeof(ctdb_sock_addr);
2824
2825         ret = ctdb_control(ctdb, destnode, 0, 
2826                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2827                            mem_ctx, &outdata, &status, NULL, NULL);
2828         if (ret != 0 || status != 0) {
2829                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2830                 return -1;
2831         }
2832
2833         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2834
2835         return status;
2836 }
2837
2838 /*
2839   register a server id
2840  */
2841 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2842                       struct timeval timeout, 
2843                       struct ctdb_server_id *id)
2844 {
2845         TDB_DATA data;
2846         int32_t res;
2847         int ret;
2848
2849         data.dsize = sizeof(struct ctdb_server_id);
2850         data.dptr  = (unsigned char *)id;
2851
2852         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2853                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2854                         0, data, NULL,
2855                         NULL, &res, &timeout, NULL);
2856         if (ret != 0 || res != 0) {
2857                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2858                 return -1;
2859         }
2860
2861         return 0;
2862 }
2863
2864 /*
2865   unregister a server id
2866  */
2867 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2868                       struct timeval timeout, 
2869                       struct ctdb_server_id *id)
2870 {
2871         TDB_DATA data;
2872         int32_t res;
2873         int ret;
2874
2875         data.dsize = sizeof(struct ctdb_server_id);
2876         data.dptr  = (unsigned char *)id;
2877
2878         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2879                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2880                         0, data, NULL,
2881                         NULL, &res, &timeout, NULL);
2882         if (ret != 0 || res != 0) {
2883                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2884                 return -1;
2885         }
2886
2887         return 0;
2888 }
2889
2890
2891 /*
2892   check if a server id exists
2893
2894   if a server id does exist, return *status == 1, otherwise *status == 0
2895  */
2896 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2897                       struct timeval timeout, 
2898                       uint32_t destnode,
2899                       struct ctdb_server_id *id,
2900                       uint32_t *status)
2901 {
2902         TDB_DATA data;
2903         int32_t res;
2904         int ret;
2905
2906         data.dsize = sizeof(struct ctdb_server_id);
2907         data.dptr  = (unsigned char *)id;
2908
2909         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2910                         0, data, NULL,
2911                         NULL, &res, &timeout, NULL);
2912         if (ret != 0) {
2913                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2914                 return -1;
2915         }
2916
2917         if (res) {
2918                 *status = 1;
2919         } else {
2920                 *status = 0;
2921         }
2922
2923         return 0;
2924 }
2925
2926 /*
2927    get the list of server ids that are registered on a node
2928 */
2929 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2930                 TALLOC_CTX *mem_ctx,
2931                 struct timeval timeout, uint32_t destnode, 
2932                 struct ctdb_server_id_list **svid_list)
2933 {
2934         int ret;
2935         TDB_DATA outdata;
2936         int32_t res;
2937
2938         ret = ctdb_control(ctdb, destnode, 0, 
2939                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2940                            mem_ctx, &outdata, &res, &timeout, NULL);
2941         if (ret != 0 || res != 0) {
2942                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2943                 return -1;
2944         }
2945
2946         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2947                     
2948         return 0;
2949 }
2950
2951 /*
2952   initialise the ctdb daemon for client applications
2953
2954   NOTE: In current code the daemon does not fork. This is for testing purposes only
2955   and to simplify the code.
2956 */
2957 struct ctdb_context *ctdb_init(struct event_context *ev)
2958 {
2959         int ret;
2960         struct ctdb_context *ctdb;
2961
2962         ctdb = talloc_zero(ev, struct ctdb_context);
2963         if (ctdb == NULL) {
2964                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2965                 return NULL;
2966         }
2967         ctdb->ev  = ev;
2968         ctdb->idr = idr_init(ctdb);
2969         /* Wrap early to exercise code. */
2970         ctdb->lastid = INT_MAX-200;
2971         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2972
2973         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2974         if (ret != 0) {
2975                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2976                 talloc_free(ctdb);
2977                 return NULL;
2978         }
2979
2980         ctdb->statistics.statistics_start_time = timeval_current();
2981
2982         return ctdb;
2983 }
2984
2985
2986 /*
2987   set some ctdb flags
2988 */
2989 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2990 {
2991         ctdb->flags |= flags;
2992 }
2993
2994 /*
2995   setup the local socket name
2996 */
2997 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2998 {
2999         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3000         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3001
3002         return 0;
3003 }
3004
3005 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3006 {
3007         return ctdb->daemon.name;
3008 }
3009
3010 /*
3011   return the pnn of this node
3012 */
3013 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3014 {
3015         return ctdb->pnn;
3016 }
3017
3018
3019 /*
3020   get the uptime of a remote node
3021  */
3022 struct ctdb_client_control_state *
3023 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3024 {
3025         return ctdb_control_send(ctdb, destnode, 0, 
3026                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3027                            mem_ctx, &timeout, NULL);
3028 }
3029
3030 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3031 {
3032         int ret;
3033         int32_t res;
3034         TDB_DATA outdata;
3035
3036         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3037         if (ret != 0 || res != 0) {
3038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3039                 return -1;
3040         }
3041
3042         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3043
3044         return 0;
3045 }
3046
3047 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3048 {
3049         struct ctdb_client_control_state *state;
3050
3051         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3052         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3053 }
3054
3055 /*
3056   send a control to execute the "recovered" event script on a node
3057  */
3058 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3059 {
3060         int ret;
3061         int32_t status;
3062
3063         ret = ctdb_control(ctdb, destnode, 0, 
3064                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3065                            NULL, NULL, &status, &timeout, NULL);
3066         if (ret != 0 || status != 0) {
3067                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3068                 return -1;
3069         }
3070
3071         return 0;
3072 }
3073
3074 /* 
3075   callback for the async helpers used when sending the same control
3076   to multiple nodes in parallell.
3077 */
3078 static void async_callback(struct ctdb_client_control_state *state)
3079 {
3080         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3081         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3082         int ret;
3083         TDB_DATA outdata;
3084         int32_t res;
3085         uint32_t destnode = state->c->hdr.destnode;
3086
3087         /* one more node has responded with recmode data */
3088         data->count--;
3089
3090         /* if we failed to push the db, then return an error and let
3091            the main loop try again.
3092         */
3093         if (state->state != CTDB_CONTROL_DONE) {
3094                 if ( !data->dont_log_errors) {
3095                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3096                 }
3097                 data->fail_count++;
3098                 if (data->fail_callback) {
3099                         data->fail_callback(ctdb, destnode, res, outdata,
3100                                         data->callback_data);
3101                 }
3102                 return;
3103         }
3104         
3105         state->async.fn = NULL;
3106
3107         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3108         if ((ret != 0) || (res != 0)) {
3109                 if ( !data->dont_log_errors) {
3110                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3111                 }
3112                 data->fail_count++;
3113                 if (data->fail_callback) {
3114                         data->fail_callback(ctdb, destnode, res, outdata,
3115                                         data->callback_data);
3116                 }
3117         }
3118         if ((ret == 0) && (data->callback != NULL)) {
3119                 data->callback(ctdb, destnode, res, outdata,
3120                                         data->callback_data);
3121         }
3122 }
3123
3124
3125 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3126 {
3127         /* set up the callback functions */
3128         state->async.fn = async_callback;
3129         state->async.private_data = data;
3130         
3131         /* one more control to wait for to complete */
3132         data->count++;
3133 }
3134
3135
3136 /* wait for up to the maximum number of seconds allowed
3137    or until all nodes we expect a response from has replied
3138 */
3139 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3140 {
3141         while (data->count > 0) {
3142                 event_loop_once(ctdb->ev);
3143         }
3144         if (data->fail_count != 0) {
3145                 if (!data->dont_log_errors) {
3146                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3147                                  data->fail_count));
3148                 }
3149                 return -1;
3150         }
3151         return 0;
3152 }
3153
3154
3155 /* 
3156    perform a simple control on the listed nodes
3157    The control cannot return data
3158  */
3159 int ctdb_client_async_control(struct ctdb_context *ctdb,
3160                                 enum ctdb_controls opcode,
3161                                 uint32_t *nodes,
3162                                 uint64_t srvid,
3163                                 struct timeval timeout,
3164                                 bool dont_log_errors,
3165                                 TDB_DATA data,
3166                                 client_async_callback client_callback,
3167                                 client_async_callback fail_callback,
3168                                 void *callback_data)
3169 {
3170         struct client_async_data *async_data;
3171         struct ctdb_client_control_state *state;
3172         int j, num_nodes;
3173
3174         async_data = talloc_zero(ctdb, struct client_async_data);
3175         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3176         async_data->dont_log_errors = dont_log_errors;
3177         async_data->callback = client_callback;
3178         async_data->fail_callback = fail_callback;
3179         async_data->callback_data = callback_data;
3180         async_data->opcode        = opcode;
3181
3182         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3183
3184         /* loop over all nodes and send an async control to each of them */
3185         for (j=0; j<num_nodes; j++) {
3186                 uint32_t pnn = nodes[j];
3187
3188                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3189                                           0, data, async_data, &timeout, NULL);
3190                 if (state == NULL) {
3191                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3192                         talloc_free(async_data);
3193                         return -1;
3194                 }
3195                 
3196                 ctdb_client_async_add(async_data, state);
3197         }
3198
3199         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3200                 talloc_free(async_data);
3201                 return -1;
3202         }
3203
3204         talloc_free(async_data);
3205         return 0;
3206 }
3207
3208 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3209                                 struct ctdb_vnn_map *vnn_map,
3210                                 TALLOC_CTX *mem_ctx,
3211                                 bool include_self)
3212 {
3213         int i, j, num_nodes;
3214         uint32_t *nodes;
3215
3216         for (i=num_nodes=0;i<vnn_map->size;i++) {
3217                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3218                         continue;
3219                 }
3220                 num_nodes++;
3221         } 
3222
3223         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3224         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3225
3226         for (i=j=0;i<vnn_map->size;i++) {
3227                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3228                         continue;
3229                 }
3230                 nodes[j++] = vnn_map->map[i];
3231         } 
3232
3233         return nodes;
3234 }
3235
3236 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3237                                 struct ctdb_node_map *node_map,
3238                                 TALLOC_CTX *mem_ctx,
3239                                 bool include_self)
3240 {
3241         int i, j, num_nodes;
3242         uint32_t *nodes;
3243
3244         for (i=num_nodes=0;i<node_map->num;i++) {
3245                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3246                         continue;
3247                 }
3248                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3249                         continue;
3250                 }
3251                 num_nodes++;
3252         } 
3253
3254         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3255         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3256
3257         for (i=j=0;i<node_map->num;i++) {
3258                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3259                         continue;
3260                 }
3261                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3262                         continue;
3263                 }
3264                 nodes[j++] = node_map->nodes[i].pnn;
3265         } 
3266
3267         return nodes;
3268 }
3269
3270 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3271                                 struct ctdb_node_map *node_map,
3272                                 TALLOC_CTX *mem_ctx,
3273                                 uint32_t pnn)
3274 {
3275         int i, j, num_nodes;
3276         uint32_t *nodes;
3277
3278         for (i=num_nodes=0;i<node_map->num;i++) {
3279                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3280                         continue;
3281                 }
3282                 if (node_map->nodes[i].pnn == pnn) {
3283                         continue;
3284                 }
3285                 num_nodes++;
3286         } 
3287
3288         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3289         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3290
3291         for (i=j=0;i<node_map->num;i++) {
3292                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3293                         continue;
3294                 }
3295                 if (node_map->nodes[i].pnn == pnn) {
3296                         continue;
3297                 }
3298                 nodes[j++] = node_map->nodes[i].pnn;
3299         } 
3300
3301         return nodes;
3302 }
3303
3304 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3305                                 struct ctdb_node_map *node_map,
3306                                 TALLOC_CTX *mem_ctx,
3307                                 bool include_self)
3308 {
3309         int i, j, num_nodes;
3310         uint32_t *nodes;
3311
3312         for (i=num_nodes=0;i<node_map->num;i++) {
3313                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3314                         continue;
3315                 }
3316                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3317                         continue;
3318                 }
3319                 num_nodes++;
3320         } 
3321
3322         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3323         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3324
3325         for (i=j=0;i<node_map->num;i++) {
3326                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3327                         continue;
3328                 }
3329                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3330                         continue;
3331                 }
3332                 nodes[j++] = node_map->nodes[i].pnn;
3333         } 
3334
3335         return nodes;
3336 }
3337
3338 /* 
3339   this is used to test if a pnn lock exists and if it exists will return
3340   the number of connections that pnn has reported or -1 if that recovery
3341   daemon is not running.
3342 */
3343 int
3344 ctdb_read_pnn_lock(int fd, int32_t pnn)
3345 {
3346         struct flock lock;
3347         char c;
3348
3349         lock.l_type = F_WRLCK;
3350         lock.l_whence = SEEK_SET;
3351         lock.l_start = pnn;
3352         lock.l_len = 1;
3353         lock.l_pid = 0;
3354
3355         if (fcntl(fd, F_GETLK, &lock) != 0) {
3356                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3357                 return -1;
3358         }
3359
3360         if (lock.l_type == F_UNLCK) {
3361                 return -1;
3362         }
3363
3364         if (pread(fd, &c, 1, pnn) == -1) {
3365                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3366                 return -1;
3367         }
3368
3369         return c;
3370 }
3371
3372 /*
3373   get capabilities of a remote node
3374  */
3375 struct ctdb_client_control_state *
3376 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3377 {
3378         return ctdb_control_send(ctdb, destnode, 0, 
3379                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3380                            mem_ctx, &timeout, NULL);
3381 }
3382
3383 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3384 {
3385         int ret;
3386         int32_t res;
3387         TDB_DATA outdata;
3388
3389         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3390         if ( (ret != 0) || (res != 0) ) {
3391                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3392                 return -1;
3393         }
3394
3395         if (capabilities) {
3396                 *capabilities = *((uint32_t *)outdata.dptr);
3397         }
3398
3399         return 0;
3400 }
3401
3402 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3403 {
3404         struct ctdb_client_control_state *state;
3405         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3406         int ret;
3407
3408         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3409         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3410         talloc_free(tmp_ctx);
3411         return ret;
3412 }
3413
3414 /**
3415  * check whether a transaction is active on a given db on a given node
3416  */
3417 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3418                                      uint32_t destnode,
3419                                      uint32_t db_id)
3420 {
3421         int32_t status;
3422         int ret;
3423         TDB_DATA indata;
3424
3425         indata.dptr = (uint8_t *)&db_id;
3426         indata.dsize = sizeof(db_id);
3427
3428         ret = ctdb_control(ctdb, destnode, 0,
3429                            CTDB_CONTROL_TRANS2_ACTIVE,
3430                            0, indata, NULL, NULL, &status,
3431                            NULL, NULL);
3432
3433         if (ret != 0) {
3434                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3435                 return -1;
3436         }
3437
3438         return status;
3439 }
3440
3441
3442 struct ctdb_transaction_handle {
3443         struct ctdb_db_context *ctdb_db;
3444         bool in_replay;
3445         /*
3446          * we store the reads and writes done under a transaction:
3447          * - one list stores both reads and writes (m_all),
3448          * - the other just writes (m_write)
3449          */
3450         struct ctdb_marshall_buffer *m_all;
3451         struct ctdb_marshall_buffer *m_write;
3452 };
3453
3454 /* start a transaction on a database */
3455 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3456 {
3457         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3458         return 0;
3459 }
3460
3461 /* start a transaction on a database */
3462 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3463 {
3464         struct ctdb_record_handle *rh;
3465         TDB_DATA key;
3466         TDB_DATA data;
3467         struct ctdb_ltdb_header header;
3468         TALLOC_CTX *tmp_ctx;
3469         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3470         int ret;
3471         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3472         pid_t pid;
3473         int32_t status;
3474
3475         key.dptr = discard_const(keyname);
3476         key.dsize = strlen(keyname);
3477
3478         if (!ctdb_db->persistent) {
3479                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3480                 return -1;
3481         }
3482
3483 again:
3484         tmp_ctx = talloc_new(h);
3485
3486         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3487         if (rh == NULL) {
3488                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3489                 talloc_free(tmp_ctx);
3490                 return -1;
3491         }
3492
3493         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3494                                               CTDB_CURRENT_NODE,
3495                                               ctdb_db->db_id);
3496         if (status == 1) {
3497                 unsigned long int usec = (1000 + random()) % 100000;
3498                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3499                                     "on db_id[0x%08x]. waiting for %lu "
3500                                     "microseconds\n",
3501                                     ctdb_db->db_id, usec));
3502                 talloc_free(tmp_ctx);
3503                 usleep(usec);
3504                 goto again;
3505         }
3506
3507         /*
3508          * store the pid in the database:
3509          * it is not enough that the node is dmaster...
3510          */
3511         pid = getpid();
3512         data.dptr = (unsigned char *)&pid;
3513         data.dsize = sizeof(pid_t);
3514         rh->header.rsn++;
3515         rh->header.dmaster = ctdb_db->ctdb->pnn;
3516         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3517         if (ret != 0) {
3518                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3519                                   "transaction record\n"));
3520                 talloc_free(tmp_ctx);
3521                 return -1;
3522         }
3523
3524         talloc_free(rh);
3525
3526         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3527         if (ret != 0) {
3528                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3529                 talloc_free(tmp_ctx);
3530                 return -1;
3531         }
3532
3533         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3534         if (ret != 0) {
3535                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3536                                  "lock record inside transaction\n"));
3537                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3538                 talloc_free(tmp_ctx);
3539                 goto again;
3540         }
3541
3542         if (header.dmaster != ctdb_db->ctdb->pnn) {
3543                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3544                                    "transaction lock record\n"));
3545                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3546                 talloc_free(tmp_ctx);
3547                 goto again;
3548         }
3549
3550         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3551                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3552                                     "the transaction lock record\n"));
3553                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3554                 talloc_free(tmp_ctx);
3555                 goto again;
3556         }
3557
3558         talloc_free(tmp_ctx);
3559
3560         return 0;
3561 }
3562
3563
3564 /* start a transaction on a database */
3565 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3566                                                        TALLOC_CTX *mem_ctx)
3567 {
3568         struct ctdb_transaction_handle *h;
3569         int ret;
3570
3571         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3572         if (h == NULL) {
3573                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3574                 return NULL;
3575         }
3576
3577         h->ctdb_db = ctdb_db;
3578
3579         ret = ctdb_transaction_fetch_start(h);
3580         if (ret != 0) {
3581                 talloc_free(h);
3582                 return NULL;
3583         }
3584
3585         talloc_set_destructor(h, ctdb_transaction_destructor);
3586
3587         return h;
3588 }
3589
3590
3591
3592 /*
3593   fetch a record inside a transaction
3594  */
3595 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3596                            TALLOC_CTX *mem_ctx, 
3597                            TDB_DATA key, TDB_DATA *data)
3598 {
3599         struct ctdb_ltdb_header header;
3600         int ret;
3601
3602         ZERO_STRUCT(header);
3603
3604         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3605         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3606                 /* record doesn't exist yet */
3607                 *data = tdb_null;
3608                 ret = 0;
3609         }
3610         
3611         if (ret != 0) {
3612                 return ret;
3613         }
3614
3615         if (!h->in_replay) {
3616                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3617                 if (h->m_all == NULL) {
3618                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3619                         return -1;
3620                 }
3621         }
3622
3623         return 0;
3624 }
3625
3626 /*
3627   stores a record inside a transaction
3628  */
3629 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3630                            TDB_DATA key, TDB_DATA data)
3631 {
3632         TALLOC_CTX *tmp_ctx = talloc_new(h);
3633         struct ctdb_ltdb_header header;
3634         TDB_DATA olddata;
3635         int ret;
3636
3637         ZERO_STRUCT(header);
3638
3639         /* we need the header so we can update the RSN */
3640         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3641         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3642                 /* the record doesn't exist - create one with us as dmaster.
3643                    This is only safe because we are in a transaction and this
3644                    is a persistent database */
3645                 ZERO_STRUCT(header);
3646         } else if (ret != 0) {
3647                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3648                 talloc_free(tmp_ctx);
3649                 return ret;
3650         }
3651
3652         if (data.dsize == olddata.dsize &&
3653             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3654                 /* save writing the same data */
3655                 talloc_free(tmp_ctx);
3656                 return 0;
3657         }
3658
3659         header.dmaster = h->ctdb_db->ctdb->pnn;
3660         header.rsn++;
3661
3662         if (!h->in_replay) {
3663                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3664                 if (h->m_all == NULL) {
3665                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3666                         talloc_free(tmp_ctx);
3667                         return -1;
3668                 }
3669         }               
3670
3671         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3672         if (h->m_write == NULL) {
3673                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3674                 talloc_free(tmp_ctx);
3675                 return -1;
3676         }
3677         
3678         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3679
3680         talloc_free(tmp_ctx);
3681         
3682         return ret;
3683 }
3684
3685 /*
3686   replay a transaction
3687  */
3688 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3689 {
3690         int ret, i;
3691         struct ctdb_rec_data *rec = NULL;
3692
3693         h->in_replay = true;
3694         talloc_free(h->m_write);
3695         h->m_write = NULL;
3696
3697         ret = ctdb_transaction_fetch_start(h);
3698         if (ret != 0) {
3699                 return ret;
3700         }
3701
3702         for (i=0;i<h->m_all->count;i++) {
3703                 TDB_DATA key, data;
3704
3705                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3706                 if (rec == NULL) {
3707                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3708                         goto failed;
3709                 }
3710
3711                 if (rec->reqid == 0) {
3712                         /* its a store */
3713                         if (ctdb_transaction_store(h, key, data) != 0) {
3714                                 goto failed;
3715                         }
3716                 } else {
3717                         TDB_DATA data2;
3718                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3719
3720                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3721                                 talloc_free(tmp_ctx);
3722                                 goto failed;
3723                         }
3724                         if (data2.dsize != data.dsize ||
3725                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3726                                 /* the record has changed on us - we have to give up */
3727                                 talloc_free(tmp_ctx);
3728                                 goto failed;
3729                         }
3730                         talloc_free(tmp_ctx);
3731                 }
3732         }
3733         
3734         return 0;
3735
3736 failed:
3737         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3738         return -1;
3739 }
3740
3741
3742 /*
3743   commit a transaction
3744  */
3745 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3746 {
3747         int ret, retries=0;
3748         int32_t status;
3749         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3750         struct timeval timeout;
3751         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3752
3753         talloc_set_destructor(h, NULL);
3754
3755         /* our commit strategy is quite complex.
3756
3757            - we first try to commit the changes to all other nodes
3758
3759            - if that works, then we commit locally and we are done
3760
3761            - if a commit on another node fails, then we need to cancel
3762              the transaction, then restart the transaction (thus
3763              opening a window of time for a pending recovery to
3764              complete), then replay the transaction, checking all the
3765              reads and writes (checking that reads give the same data,
3766              and writes succeed). Then we retry the transaction to the
3767              other nodes
3768         */
3769
3770 again:
3771         if (h->m_write == NULL) {
3772                 /* no changes were made */
3773                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3774                 talloc_free(h);
3775                 return 0;
3776         }
3777
3778         /* tell ctdbd to commit to the other nodes */
3779         timeout = timeval_current_ofs(1, 0);
3780         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3781                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3782                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3783                            &timeout, NULL);
3784         if (ret != 0 || status != 0) {
3785                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3786                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3787                                      ", retrying after 1 second...\n",
3788                                      (retries==0)?"":"retry "));
3789                 sleep(1);
3790
3791                 if (ret != 0) {
3792                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3793                 } else {
3794                         /* work out what error code we will give if we 
3795                            have to fail the operation */
3796                         switch ((enum ctdb_trans2_commit_error)status) {
3797                         case CTDB_TRANS2_COMMIT_SUCCESS:
3798                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3799                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3800                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3801                                 break;
3802                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3803                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3804                                 break;
3805                         }
3806                 }
3807
3808                 if (++retries == 100) {
3809                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3810                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3811                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3812                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3813                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3814                         talloc_free(h);
3815                         return -1;
3816                 }               
3817
3818                 if (ctdb_replay_transaction(h) != 0) {
3819                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3820                                           "transaction on db 0x%08x, "
3821                                           "failure control =%u\n",
3822                                           h->ctdb_db->db_id,
3823                                           (unsigned)failure_control));
3824                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3825                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3826                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3827                         talloc_free(h);
3828                         return -1;
3829                 }
3830                 goto again;
3831         } else {
3832                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3833         }
3834
3835         /* do the real commit locally */
3836         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3837         if (ret != 0) {
3838                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3839                                   "on db id 0x%08x locally, "
3840                                   "failure_control=%u\n",
3841                                   h->ctdb_db->db_id,
3842                                   (unsigned)failure_control));
3843                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3844                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3845                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3846                 talloc_free(h);
3847                 return ret;
3848         }
3849
3850         /* tell ctdbd that we are finished with our local commit */
3851         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3852                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3853                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3854         talloc_free(h);
3855         return 0;
3856 }
3857
3858 /*
3859   recovery daemon ping to main daemon
3860  */
3861 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3862 {
3863         int ret;
3864         int32_t res;
3865
3866         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3867                            ctdb, NULL, &res, NULL, NULL);
3868         if (ret != 0 || res != 0) {
3869                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3870                 return -1;
3871         }
3872
3873         return 0;
3874 }
3875
3876 /* when forking the main daemon and the child process needs to connect back
3877  * to the daemon as a client process, this function can be used to change
3878  * the ctdb context from daemon into client mode
3879  */
3880 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3881 {
3882         int ret;
3883         va_list ap;
3884
3885         /* Add extra information so we can identify this in the logs */
3886         va_start(ap, fmt);
3887         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3888         va_end(ap);
3889
3890         /* shutdown the transport */
3891         if (ctdb->methods) {
3892                 ctdb->methods->shutdown(ctdb);
3893         }
3894
3895         /* get a new event context */
3896         talloc_free(ctdb->ev);
3897         ctdb->ev = event_context_init(ctdb);
3898         tevent_loop_allow_nesting(ctdb->ev);
3899
3900         close(ctdb->daemon.sd);
3901         ctdb->daemon.sd = -1;
3902
3903         /* the client does not need to be realtime */
3904         if (ctdb->do_setsched) {
3905                 ctdb_restore_scheduler(ctdb);
3906         }
3907
3908         /* initialise ctdb */
3909         ret = ctdb_socket_connect(ctdb);
3910         if (ret != 0) {
3911                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3912                 return -1;
3913         }
3914
3915          return 0;
3916 }
3917
3918 /*
3919   get the status of running the monitor eventscripts: NULL means never run.
3920  */
3921 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3922                 struct timeval timeout, uint32_t destnode, 
3923                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3924                 struct ctdb_scripts_wire **script_status)
3925 {
3926         int ret;
3927         TDB_DATA outdata, indata;
3928         int32_t res;
3929         uint32_t uinttype = type;
3930
3931         indata.dptr = (uint8_t *)&uinttype;
3932         indata.dsize = sizeof(uinttype);
3933
3934         ret = ctdb_control(ctdb, destnode, 0, 
3935                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3936                            mem_ctx, &outdata, &res, &timeout, NULL);
3937         if (ret != 0 || res != 0) {
3938                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3939                 return -1;
3940         }
3941
3942         if (outdata.dsize == 0) {
3943                 *script_status = NULL;
3944         } else {
3945                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3946                 talloc_free(outdata.dptr);
3947         }
3948                     
3949         return 0;
3950 }
3951
3952 /*
3953   tell the main daemon how long it took to lock the reclock file
3954  */
3955 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3956 {
3957         int ret;
3958         int32_t res;
3959         TDB_DATA data;
3960
3961         data.dptr = (uint8_t *)&latency;
3962         data.dsize = sizeof(latency);
3963
3964         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3965                            ctdb, NULL, &res, NULL, NULL);
3966         if (ret != 0 || res != 0) {
3967                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3968                 return -1;
3969         }
3970
3971         return 0;
3972 }
3973
3974 /*
3975   get the name of the reclock file
3976  */
3977 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3978                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3979                          const char **name)
3980 {
3981         int ret;
3982         int32_t res;
3983         TDB_DATA data;
3984
3985         ret = ctdb_control(ctdb, destnode, 0, 
3986                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3987                            mem_ctx, &data, &res, &timeout, NULL);
3988         if (ret != 0 || res != 0) {
3989                 return -1;
3990         }
3991
3992         if (data.dsize == 0) {
3993                 *name = NULL;
3994         } else {
3995                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3996         }
3997         talloc_free(data.dptr);
3998
3999         return 0;
4000 }
4001
4002 /*
4003   set the reclock filename for a node
4004  */
4005 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4006 {
4007         int ret;
4008         TDB_DATA data;
4009         int32_t res;
4010
4011         if (reclock == NULL) {
4012                 data.dsize = 0;
4013                 data.dptr  = NULL;
4014         } else {
4015                 data.dsize = strlen(reclock) + 1;
4016                 data.dptr  = discard_const(reclock);
4017         }
4018
4019         ret = ctdb_control(ctdb, destnode, 0, 
4020                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4021                            NULL, NULL, &res, &timeout, NULL);
4022         if (ret != 0 || res != 0) {
4023                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4024                 return -1;
4025         }
4026
4027         return 0;
4028 }
4029
4030 /*
4031   stop a node
4032  */
4033 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4034 {
4035         int ret;
4036         int32_t res;
4037
4038         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4039                            ctdb, NULL, &res, &timeout, NULL);
4040         if (ret != 0 || res != 0) {
4041                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4042                 return -1;
4043         }
4044
4045         return 0;
4046 }
4047
4048 /*
4049   continue a node
4050  */
4051 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4052 {
4053         int ret;
4054
4055         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4056                            ctdb, NULL, NULL, &timeout, NULL);
4057         if (ret != 0) {
4058                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4059                 return -1;
4060         }
4061
4062         return 0;
4063 }
4064
4065 /*
4066   set the natgw state for a node
4067  */
4068 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4069 {
4070         int ret;
4071         TDB_DATA data;
4072         int32_t res;
4073
4074         data.dsize = sizeof(natgwstate);
4075         data.dptr  = (uint8_t *)&natgwstate;
4076
4077         ret = ctdb_control(ctdb, destnode, 0, 
4078                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4079                            NULL, NULL, &res, &timeout, NULL);
4080         if (ret != 0 || res != 0) {
4081                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4082                 return -1;
4083         }
4084
4085         return 0;
4086 }
4087
4088 /*
4089   set the lmaster role for a node
4090  */
4091 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4092 {
4093         int ret;
4094         TDB_DATA data;
4095         int32_t res;
4096
4097         data.dsize = sizeof(lmasterrole);
4098         data.dptr  = (uint8_t *)&lmasterrole;
4099
4100         ret = ctdb_control(ctdb, destnode, 0, 
4101                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4102                            NULL, NULL, &res, &timeout, NULL);
4103         if (ret != 0 || res != 0) {
4104                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4105                 return -1;
4106         }
4107
4108         return 0;
4109 }
4110
4111 /*
4112   set the recmaster role for a node
4113  */
4114 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4115 {
4116         int ret;
4117         TDB_DATA data;
4118         int32_t res;
4119
4120         data.dsize = sizeof(recmasterrole);
4121         data.dptr  = (uint8_t *)&recmasterrole;
4122
4123         ret = ctdb_control(ctdb, destnode, 0, 
4124                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4125                            NULL, NULL, &res, &timeout, NULL);
4126         if (ret != 0 || res != 0) {
4127                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4128                 return -1;
4129         }
4130
4131         return 0;
4132 }
4133
4134 /* enable an eventscript
4135  */
4136 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4137 {
4138         int ret;
4139         TDB_DATA data;
4140         int32_t res;
4141
4142         data.dsize = strlen(script) + 1;
4143         data.dptr  = discard_const(script);
4144
4145         ret = ctdb_control(ctdb, destnode, 0, 
4146                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4147                            NULL, NULL, &res, &timeout, NULL);
4148         if (ret != 0 || res != 0) {
4149                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4150                 return -1;
4151         }
4152
4153         return 0;
4154 }
4155
4156 /* disable an eventscript
4157  */
4158 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4159 {
4160         int ret;
4161         TDB_DATA data;
4162         int32_t res;
4163
4164         data.dsize = strlen(script) + 1;
4165         data.dptr  = discard_const(script);
4166
4167         ret = ctdb_control(ctdb, destnode, 0, 
4168                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4169                            NULL, NULL, &res, &timeout, NULL);
4170         if (ret != 0 || res != 0) {
4171                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4172                 return -1;
4173         }
4174
4175         return 0;
4176 }
4177
4178
4179 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4180 {
4181         int ret;
4182         TDB_DATA data;
4183         int32_t res;
4184
4185         data.dsize = sizeof(*bantime);
4186         data.dptr  = (uint8_t *)bantime;
4187
4188         ret = ctdb_control(ctdb, destnode, 0, 
4189                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4190                            NULL, NULL, &res, &timeout, NULL);
4191         if (ret != 0 || res != 0) {
4192                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4193                 return -1;
4194         }
4195
4196         return 0;
4197 }
4198
4199
4200 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4201 {
4202         int ret;
4203         TDB_DATA outdata;
4204         int32_t res;
4205         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4206
4207         ret = ctdb_control(ctdb, destnode, 0, 
4208                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4209                            tmp_ctx, &outdata, &res, &timeout, NULL);
4210         if (ret != 0 || res != 0) {
4211                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4212                 talloc_free(tmp_ctx);
4213                 return -1;
4214         }
4215
4216         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4217         talloc_free(tmp_ctx);
4218
4219         return 0;
4220 }
4221
4222
4223 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4224 {
4225         int ret;
4226         int32_t res;
4227         TDB_DATA data;
4228         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4229
4230         data.dptr = (uint8_t*)db_prio;
4231         data.dsize = sizeof(*db_prio);
4232
4233         ret = ctdb_control(ctdb, destnode, 0, 
4234                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4235                            tmp_ctx, NULL, &res, &timeout, NULL);
4236         if (ret != 0 || res != 0) {
4237                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4238                 talloc_free(tmp_ctx);
4239                 return -1;
4240         }
4241
4242         talloc_free(tmp_ctx);
4243
4244         return 0;
4245 }
4246
4247 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4248 {
4249         int ret;
4250         int32_t res;
4251         TDB_DATA data;
4252         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4253
4254         data.dptr = (uint8_t*)&db_id;
4255         data.dsize = sizeof(db_id);
4256
4257         ret = ctdb_control(ctdb, destnode, 0, 
4258                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4259                            tmp_ctx, NULL, &res, &timeout, NULL);
4260         if (ret != 0 || res < 0) {
4261                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4262                 talloc_free(tmp_ctx);
4263                 return -1;
4264         }
4265
4266         if (priority) {
4267                 *priority = res;
4268         }
4269
4270         talloc_free(tmp_ctx);
4271
4272         return 0;
4273 }
4274
4275 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4276 {
4277         int ret;
4278         TDB_DATA outdata;
4279         int32_t res;
4280
4281         ret = ctdb_control(ctdb, destnode, 0, 
4282                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4283                            mem_ctx, &outdata, &res, &timeout, NULL);
4284         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4285                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4286                 return -1;
4287         }
4288
4289         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4290         talloc_free(outdata.dptr);
4291                     
4292         return 0;
4293 }
4294
4295 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4296 {
4297         if (h == NULL) {
4298                 return NULL;
4299         }
4300
4301         return &h->header;
4302 }
4303
4304
4305 struct ctdb_client_control_state *
4306 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4307 {
4308         struct ctdb_client_control_state *handle;
4309         struct ctdb_marshall_buffer *m;
4310         struct ctdb_rec_data *rec;
4311         TDB_DATA outdata;
4312
4313         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4314         if (m == NULL) {
4315                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4316                 return NULL;
4317         }
4318
4319         m->db_id = ctdb_db->db_id;
4320
4321         rec = ctdb_marshall_record(m, 0, key, header, data);
4322         if (rec == NULL) {
4323                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4324                 talloc_free(m);
4325                 return NULL;
4326         }
4327         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4328         if (m == NULL) {
4329                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4330                 talloc_free(m);
4331                 return NULL;
4332         }
4333         m->count++;
4334         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4335
4336
4337         outdata.dptr = (uint8_t *)m;
4338         outdata.dsize = talloc_get_size(m);
4339
4340         handle = ctdb_control_send(ctdb, destnode, 0, 
4341                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4342                            mem_ctx, &timeout, NULL);
4343         talloc_free(m);
4344         return handle;
4345 }
4346
4347 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4348 {
4349         int ret;
4350         int32_t res;
4351
4352         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4353         if ( (ret != 0) || (res != 0) ){
4354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4355                 return -1;
4356         }
4357
4358         return 0;
4359 }
4360
4361 int
4362 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4363 {
4364         struct ctdb_client_control_state *state;
4365
4366         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4367         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4368 }
4369