ReadOnly records: Add a new RPC function FETCH_WITH_HEADER.
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect to a unix domain socket
251 */
252 int ctdb_socket_connect(struct ctdb_context *ctdb)
253 {
254         struct sockaddr_un addr;
255
256         memset(&addr, 0, sizeof(addr));
257         addr.sun_family = AF_UNIX;
258         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
259
260         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
261         if (ctdb->daemon.sd == -1) {
262                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
263                 return -1;
264         }
265
266         set_nonblocking(ctdb->daemon.sd);
267         set_close_on_exec(ctdb->daemon.sd);
268         
269         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
270                 close(ctdb->daemon.sd);
271                 ctdb->daemon.sd = -1;
272                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
273                 return -1;
274         }
275
276         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
277                                               CTDB_DS_ALIGNMENT, 
278                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
279         return 0;
280 }
281
282
283 struct ctdb_record_handle {
284         struct ctdb_db_context *ctdb_db;
285         TDB_DATA key;
286         TDB_DATA *data;
287         struct ctdb_ltdb_header header;
288 };
289
290
291 /*
292   make a recv call to the local ctdb daemon - called from client context
293
294   This is called when the program wants to wait for a ctdb_call to complete and get the 
295   results. This call will block unless the call has already completed.
296 */
297 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
298 {
299         if (state == NULL) {
300                 return -1;
301         }
302
303         while (state->state < CTDB_CALL_DONE) {
304                 event_loop_once(state->ctdb_db->ctdb->ev);
305         }
306         if (state->state != CTDB_CALL_DONE) {
307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
308                 talloc_free(state);
309                 return -1;
310         }
311
312         if (state->call->reply_data.dsize) {
313                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
314                                                       state->call->reply_data.dptr,
315                                                       state->call->reply_data.dsize);
316                 call->reply_data.dsize = state->call->reply_data.dsize;
317         } else {
318                 call->reply_data.dptr = NULL;
319                 call->reply_data.dsize = 0;
320         }
321         call->status = state->call->status;
322         talloc_free(state);
323
324         return 0;
325 }
326
327
328
329
330 /*
331   destroy a ctdb_call in client
332 */
333 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
334 {
335         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
336         return 0;
337 }
338
339 /*
340   construct an event driven local ctdb_call
341
342   this is used so that locally processed ctdb_call requests are processed
343   in an event driven manner
344 */
345 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
346                                                                   struct ctdb_call *call,
347                                                                   struct ctdb_ltdb_header *header,
348                                                                   TDB_DATA *data)
349 {
350         struct ctdb_client_call_state *state;
351         struct ctdb_context *ctdb = ctdb_db->ctdb;
352         int ret;
353
354         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
355         CTDB_NO_MEMORY_NULL(ctdb, state);
356         state->call = talloc_zero(state, struct ctdb_call);
357         CTDB_NO_MEMORY_NULL(ctdb, state->call);
358
359         talloc_steal(state, data->dptr);
360
361         state->state   = CTDB_CALL_DONE;
362         *(state->call) = *call;
363         state->ctdb_db = ctdb_db;
364
365         ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
366
367         return state;
368 }
369
370 /*
371   make a ctdb call to the local daemon - async send. Called from client context.
372
373   This constructs a ctdb_call request and queues it for processing. 
374   This call never blocks.
375 */
376 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
377                                               struct ctdb_call *call)
378 {
379         struct ctdb_client_call_state *state;
380         struct ctdb_context *ctdb = ctdb_db->ctdb;
381         struct ctdb_ltdb_header header;
382         TDB_DATA data;
383         int ret;
384         size_t len;
385         struct ctdb_req_call *c;
386
387         /* if the domain socket is not yet open, open it */
388         if (ctdb->daemon.sd==-1) {
389                 ctdb_socket_connect(ctdb);
390         }
391
392         ret = ctdb_ltdb_lock(ctdb_db, call->key);
393         if (ret != 0) {
394                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
395                 return NULL;
396         }
397
398         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
399
400         if (ret == 0 && header.dmaster == ctdb->pnn) {
401                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
402                 talloc_free(data.dptr);
403                 ctdb_ltdb_unlock(ctdb_db, call->key);
404                 return state;
405         }
406
407         ctdb_ltdb_unlock(ctdb_db, call->key);
408         talloc_free(data.dptr);
409
410         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
411         if (state == NULL) {
412                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
413                 return NULL;
414         }
415         state->call = talloc_zero(state, struct ctdb_call);
416         if (state->call == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
418                 return NULL;
419         }
420
421         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
422         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
423         if (c == NULL) {
424                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
425                 return NULL;
426         }
427
428         state->reqid     = ctdb_reqid_new(ctdb, state);
429         state->ctdb_db = ctdb_db;
430         talloc_set_destructor(state, ctdb_client_call_destructor);
431
432         c->hdr.reqid     = state->reqid;
433         c->flags         = call->flags;
434         c->db_id         = ctdb_db->db_id;
435         c->callid        = call->call_id;
436         c->hopcount      = 0;
437         c->keylen        = call->key.dsize;
438         c->calldatalen   = call->call_data.dsize;
439         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
440         memcpy(&c->data[call->key.dsize], 
441                call->call_data.dptr, call->call_data.dsize);
442         *(state->call)              = *call;
443         state->call->call_data.dptr = &c->data[call->key.dsize];
444         state->call->key.dptr       = &c->data[0];
445
446         state->state  = CTDB_CALL_WAIT;
447
448
449         ctdb_client_queue_pkt(ctdb, &c->hdr);
450
451         return state;
452 }
453
454
455 /*
456   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
457 */
458 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
459 {
460         struct ctdb_client_call_state *state;
461
462         state = ctdb_call_send(ctdb_db, call);
463         return ctdb_call_recv(state, call);
464 }
465
466
467 /*
468   tell the daemon what messaging srvid we will use, and register the message
469   handler function in the client
470 */
471 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
472                              ctdb_msg_fn_t handler,
473                              void *private_data)
474                                     
475 {
476         int res;
477         int32_t status;
478         
479         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
480                            tdb_null, NULL, NULL, &status, NULL, NULL);
481         if (res != 0 || status != 0) {
482                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
483                 return -1;
484         }
485
486         /* also need to register the handler with our own ctdb structure */
487         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
488 }
489
490 /*
491   tell the daemon we no longer want a srvid
492 */
493 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
494 {
495         int res;
496         int32_t status;
497         
498         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
499                            tdb_null, NULL, NULL, &status, NULL, NULL);
500         if (res != 0 || status != 0) {
501                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
502                 return -1;
503         }
504
505         /* also need to register the handler with our own ctdb structure */
506         ctdb_deregister_message_handler(ctdb, srvid, private_data);
507         return 0;
508 }
509
510
511 /*
512   send a message - from client context
513  */
514 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
515                       uint64_t srvid, TDB_DATA data)
516 {
517         struct ctdb_req_message *r;
518         int len, res;
519
520         len = offsetof(struct ctdb_req_message, data) + data.dsize;
521         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
522                                len, struct ctdb_req_message);
523         CTDB_NO_MEMORY(ctdb, r);
524
525         r->hdr.destnode  = pnn;
526         r->srvid         = srvid;
527         r->datalen       = data.dsize;
528         memcpy(&r->data[0], data.dptr, data.dsize);
529         
530         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
531         if (res != 0) {
532                 return res;
533         }
534
535         talloc_free(r);
536         return 0;
537 }
538
539
540 /*
541   cancel a ctdb_fetch_lock operation, releasing the lock
542  */
543 static int fetch_lock_destructor(struct ctdb_record_handle *h)
544 {
545         ctdb_ltdb_unlock(h->ctdb_db, h->key);
546         return 0;
547 }
548
549 /*
550   force the migration of a record to this node
551  */
552 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
553 {
554         struct ctdb_call call;
555         ZERO_STRUCT(call);
556         call.call_id = CTDB_NULL_FUNC;
557         call.key = key;
558         call.flags = CTDB_IMMEDIATE_MIGRATION;
559         return ctdb_call(ctdb_db, &call);
560 }
561
562 /*
563   get a lock on a record, and return the records data. Blocks until it gets the lock
564  */
565 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
566                                            TDB_DATA key, TDB_DATA *data)
567 {
568         int ret;
569         struct ctdb_record_handle *h;
570
571         /*
572           procedure is as follows:
573
574           1) get the chain lock. 
575           2) check if we are dmaster
576           3) if we are the dmaster then return handle 
577           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
578              reply from ctdbd
579           5) when we get the reply, goto (1)
580          */
581
582         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
583         if (h == NULL) {
584                 return NULL;
585         }
586
587         h->ctdb_db = ctdb_db;
588         h->key     = key;
589         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
590         if (h->key.dptr == NULL) {
591                 talloc_free(h);
592                 return NULL;
593         }
594         h->data    = data;
595
596         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
597                  (const char *)key.dptr));
598
599 again:
600         /* step 1 - get the chain lock */
601         ret = ctdb_ltdb_lock(ctdb_db, key);
602         if (ret != 0) {
603                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
604                 talloc_free(h);
605                 return NULL;
606         }
607
608         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
609
610         talloc_set_destructor(h, fetch_lock_destructor);
611
612         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
613
614         /* when torturing, ensure we test the remote path */
615         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
616             random() % 5 == 0) {
617                 h->header.dmaster = (uint32_t)-1;
618         }
619
620
621         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
622
623         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
624                 ctdb_ltdb_unlock(ctdb_db, key);
625                 ret = ctdb_client_force_migration(ctdb_db, key);
626                 if (ret != 0) {
627                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
628                         talloc_free(h);
629                         return NULL;
630                 }
631                 goto again;
632         }
633
634         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
635         return h;
636 }
637
638 /*
639   store some data to the record that was locked with ctdb_fetch_lock()
640 */
641 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
642 {
643         if (h->ctdb_db->persistent) {
644                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
645                 return -1;
646         }
647
648         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
649 }
650
651 /*
652   non-locking fetch of a record
653  */
654 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
655                TDB_DATA key, TDB_DATA *data)
656 {
657         struct ctdb_call call;
658         int ret;
659
660         call.call_id = CTDB_FETCH_FUNC;
661         call.call_data.dptr = NULL;
662         call.call_data.dsize = 0;
663
664         ret = ctdb_call(ctdb_db, &call);
665
666         if (ret == 0) {
667                 *data = call.reply_data;
668                 talloc_steal(mem_ctx, data->dptr);
669         }
670
671         return ret;
672 }
673
674
675
676 /*
677    called when a control completes or timesout to invoke the callback
678    function the user provided
679 */
680 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
681         struct timeval t, void *private_data)
682 {
683         struct ctdb_client_control_state *state;
684         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
685         int ret;
686
687         state = talloc_get_type(private_data, struct ctdb_client_control_state);
688         talloc_steal(tmp_ctx, state);
689
690         ret = ctdb_control_recv(state->ctdb, state, state,
691                         NULL, 
692                         NULL, 
693                         NULL);
694
695         talloc_free(tmp_ctx);
696 }
697
698 /*
699   called when a CTDB_REPLY_CONTROL packet comes in in the client
700
701   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
702   contains any reply data from the control
703 */
704 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
705                                       struct ctdb_req_header *hdr)
706 {
707         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
708         struct ctdb_client_control_state *state;
709
710         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
711         if (state == NULL) {
712                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
713                 return;
714         }
715
716         if (hdr->reqid != state->reqid) {
717                 /* we found a record  but it was the wrong one */
718                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
719                 return;
720         }
721
722         state->outdata.dptr = c->data;
723         state->outdata.dsize = c->datalen;
724         state->status = c->status;
725         if (c->errorlen) {
726                 state->errormsg = talloc_strndup(state, 
727                                                  (char *)&c->data[c->datalen], 
728                                                  c->errorlen);
729         }
730
731         /* state->outdata now uses resources from c so we dont want c
732            to just dissappear from under us while state is still alive
733         */
734         talloc_steal(state, c);
735
736         state->state = CTDB_CONTROL_DONE;
737
738         /* if we had a callback registered for this control, pull the response
739            and call the callback.
740         */
741         if (state->async.fn) {
742                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
743         }
744 }
745
746
747 /*
748   destroy a ctdb_control in client
749 */
750 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
751 {
752         ctdb_reqid_remove(state->ctdb, state->reqid);
753         return 0;
754 }
755
756
757 /* time out handler for ctdb_control */
758 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
759         struct timeval t, void *private_data)
760 {
761         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
762
763         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
764                          "dstnode:%u\n", state->reqid, state->c->opcode,
765                          state->c->hdr.destnode));
766
767         state->state = CTDB_CONTROL_TIMEOUT;
768
769         /* if we had a callback registered for this control, pull the response
770            and call the callback.
771         */
772         if (state->async.fn) {
773                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
774         }
775 }
776
777 /* async version of send control request */
778 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
779                 uint32_t destnode, uint64_t srvid, 
780                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
781                 TALLOC_CTX *mem_ctx,
782                 struct timeval *timeout,
783                 char **errormsg)
784 {
785         struct ctdb_client_control_state *state;
786         size_t len;
787         struct ctdb_req_control *c;
788         int ret;
789
790         if (errormsg) {
791                 *errormsg = NULL;
792         }
793
794         /* if the domain socket is not yet open, open it */
795         if (ctdb->daemon.sd==-1) {
796                 ctdb_socket_connect(ctdb);
797         }
798
799         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
800         CTDB_NO_MEMORY_NULL(ctdb, state);
801
802         state->ctdb       = ctdb;
803         state->reqid      = ctdb_reqid_new(ctdb, state);
804         state->state      = CTDB_CONTROL_WAIT;
805         state->errormsg   = NULL;
806
807         talloc_set_destructor(state, ctdb_control_destructor);
808
809         len = offsetof(struct ctdb_req_control, data) + data.dsize;
810         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
811                                len, struct ctdb_req_control);
812         state->c            = c;        
813         CTDB_NO_MEMORY_NULL(ctdb, c);
814         c->hdr.reqid        = state->reqid;
815         c->hdr.destnode     = destnode;
816         c->opcode           = opcode;
817         c->client_id        = 0;
818         c->flags            = flags;
819         c->srvid            = srvid;
820         c->datalen          = data.dsize;
821         if (data.dsize) {
822                 memcpy(&c->data[0], data.dptr, data.dsize);
823         }
824
825         /* timeout */
826         if (timeout && !timeval_is_zero(timeout)) {
827                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
828         }
829
830         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
831         if (ret != 0) {
832                 talloc_free(state);
833                 return NULL;
834         }
835
836         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
837                 talloc_free(state);
838                 return NULL;
839         }
840
841         return state;
842 }
843
844
845 /* async version of receive control reply */
846 int ctdb_control_recv(struct ctdb_context *ctdb, 
847                 struct ctdb_client_control_state *state, 
848                 TALLOC_CTX *mem_ctx,
849                 TDB_DATA *outdata, int32_t *status, char **errormsg)
850 {
851         TALLOC_CTX *tmp_ctx;
852
853         if (status != NULL) {
854                 *status = -1;
855         }
856         if (errormsg != NULL) {
857                 *errormsg = NULL;
858         }
859
860         if (state == NULL) {
861                 return -1;
862         }
863
864         /* prevent double free of state */
865         tmp_ctx = talloc_new(ctdb);
866         talloc_steal(tmp_ctx, state);
867
868         /* loop one event at a time until we either timeout or the control
869            completes.
870         */
871         while (state->state == CTDB_CONTROL_WAIT) {
872                 event_loop_once(ctdb->ev);
873         }
874
875         if (state->state != CTDB_CONTROL_DONE) {
876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
877                 if (state->async.fn) {
878                         state->async.fn(state);
879                 }
880                 talloc_free(tmp_ctx);
881                 return -1;
882         }
883
884         if (state->errormsg) {
885                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
886                 if (errormsg) {
887                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
888                 }
889                 if (state->async.fn) {
890                         state->async.fn(state);
891                 }
892                 talloc_free(tmp_ctx);
893                 return -1;
894         }
895
896         if (outdata) {
897                 *outdata = state->outdata;
898                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
899         }
900
901         if (status) {
902                 *status = state->status;
903         }
904
905         if (state->async.fn) {
906                 state->async.fn(state);
907         }
908
909         talloc_free(tmp_ctx);
910         return 0;
911 }
912
913
914
915 /*
916   send a ctdb control message
917   timeout specifies how long we should wait for a reply.
918   if timeout is NULL we wait indefinitely
919  */
920 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
921                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
922                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
923                  struct timeval *timeout,
924                  char **errormsg)
925 {
926         struct ctdb_client_control_state *state;
927
928         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
929                         flags, data, mem_ctx,
930                         timeout, errormsg);
931         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
932                         errormsg);
933 }
934
935
936
937
938 /*
939   a process exists call. Returns 0 if process exists, -1 otherwise
940  */
941 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
942 {
943         int ret;
944         TDB_DATA data;
945         int32_t status;
946
947         data.dptr = (uint8_t*)&pid;
948         data.dsize = sizeof(pid);
949
950         ret = ctdb_control(ctdb, destnode, 0, 
951                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
952                            NULL, NULL, &status, NULL, NULL);
953         if (ret != 0) {
954                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
955                 return -1;
956         }
957
958         return status;
959 }
960
961 /*
962   get remote statistics
963  */
964 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
965 {
966         int ret;
967         TDB_DATA data;
968         int32_t res;
969
970         ret = ctdb_control(ctdb, destnode, 0, 
971                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
972                            ctdb, &data, &res, NULL, NULL);
973         if (ret != 0 || res != 0) {
974                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
975                 return -1;
976         }
977
978         if (data.dsize != sizeof(struct ctdb_statistics)) {
979                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
980                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
981                       return -1;
982         }
983
984         *status = *(struct ctdb_statistics *)data.dptr;
985         talloc_free(data.dptr);
986                         
987         return 0;
988 }
989
990 /*
991   shutdown a remote ctdb node
992  */
993 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
994 {
995         struct ctdb_client_control_state *state;
996
997         state = ctdb_control_send(ctdb, destnode, 0, 
998                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
999                            NULL, &timeout, NULL);
1000         if (state == NULL) {
1001                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1002                 return -1;
1003         }
1004
1005         return 0;
1006 }
1007
1008 /*
1009   get vnn map from a remote node
1010  */
1011 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1012 {
1013         int ret;
1014         TDB_DATA outdata;
1015         int32_t res;
1016         struct ctdb_vnn_map_wire *map;
1017
1018         ret = ctdb_control(ctdb, destnode, 0, 
1019                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1020                            mem_ctx, &outdata, &res, &timeout, NULL);
1021         if (ret != 0 || res != 0) {
1022                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1023                 return -1;
1024         }
1025         
1026         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1027         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1028             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1029                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1030                 return -1;
1031         }
1032
1033         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1034         CTDB_NO_MEMORY(ctdb, *vnnmap);
1035         (*vnnmap)->generation = map->generation;
1036         (*vnnmap)->size       = map->size;
1037         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1038
1039         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1040         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1041         talloc_free(outdata.dptr);
1042                     
1043         return 0;
1044 }
1045
1046
1047 /*
1048   get the recovery mode of a remote node
1049  */
1050 struct ctdb_client_control_state *
1051 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1052 {
1053         return ctdb_control_send(ctdb, destnode, 0, 
1054                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1055                            mem_ctx, &timeout, NULL);
1056 }
1057
1058 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1059 {
1060         int ret;
1061         int32_t res;
1062
1063         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1064         if (ret != 0) {
1065                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1066                 return -1;
1067         }
1068
1069         if (recmode) {
1070                 *recmode = (uint32_t)res;
1071         }
1072
1073         return 0;
1074 }
1075
1076 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1077 {
1078         struct ctdb_client_control_state *state;
1079
1080         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1081         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1082 }
1083
1084
1085
1086
1087 /*
1088   set the recovery mode of a remote node
1089  */
1090 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1091 {
1092         int ret;
1093         TDB_DATA data;
1094         int32_t res;
1095
1096         data.dsize = sizeof(uint32_t);
1097         data.dptr = (unsigned char *)&recmode;
1098
1099         ret = ctdb_control(ctdb, destnode, 0, 
1100                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1101                            NULL, NULL, &res, &timeout, NULL);
1102         if (ret != 0 || res != 0) {
1103                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1104                 return -1;
1105         }
1106
1107         return 0;
1108 }
1109
1110
1111
1112 /*
1113   get the recovery master of a remote node
1114  */
1115 struct ctdb_client_control_state *
1116 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1117                         struct timeval timeout, uint32_t destnode)
1118 {
1119         return ctdb_control_send(ctdb, destnode, 0, 
1120                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1121                            mem_ctx, &timeout, NULL);
1122 }
1123
1124 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1125 {
1126         int ret;
1127         int32_t res;
1128
1129         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1130         if (ret != 0) {
1131                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1132                 return -1;
1133         }
1134
1135         if (recmaster) {
1136                 *recmaster = (uint32_t)res;
1137         }
1138
1139         return 0;
1140 }
1141
1142 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1143 {
1144         struct ctdb_client_control_state *state;
1145
1146         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1147         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1148 }
1149
1150
1151 /*
1152   set the recovery master of a remote node
1153  */
1154 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1155 {
1156         int ret;
1157         TDB_DATA data;
1158         int32_t res;
1159
1160         ZERO_STRUCT(data);
1161         data.dsize = sizeof(uint32_t);
1162         data.dptr = (unsigned char *)&recmaster;
1163
1164         ret = ctdb_control(ctdb, destnode, 0, 
1165                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1166                            NULL, NULL, &res, &timeout, NULL);
1167         if (ret != 0 || res != 0) {
1168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1169                 return -1;
1170         }
1171
1172         return 0;
1173 }
1174
1175
1176 /*
1177   get a list of databases off a remote node
1178  */
1179 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1180                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1181 {
1182         int ret;
1183         TDB_DATA outdata;
1184         int32_t res;
1185
1186         ret = ctdb_control(ctdb, destnode, 0, 
1187                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1188                            mem_ctx, &outdata, &res, &timeout, NULL);
1189         if (ret != 0 || res != 0) {
1190                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1191                 return -1;
1192         }
1193
1194         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1195         talloc_free(outdata.dptr);
1196                     
1197         return 0;
1198 }
1199
1200 /*
1201   get a list of nodes (vnn and flags ) from a remote node
1202  */
1203 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1204                 struct timeval timeout, uint32_t destnode, 
1205                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1206 {
1207         int ret;
1208         TDB_DATA outdata;
1209         int32_t res;
1210
1211         ret = ctdb_control(ctdb, destnode, 0, 
1212                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1213                            mem_ctx, &outdata, &res, &timeout, NULL);
1214         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1215                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1216                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1217         }
1218         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1219                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1220                 return -1;
1221         }
1222
1223         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1224         talloc_free(outdata.dptr);
1225                     
1226         return 0;
1227 }
1228
1229 /*
1230   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1231  */
1232 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1233                 struct timeval timeout, uint32_t destnode, 
1234                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1235 {
1236         int ret, i, len;
1237         TDB_DATA outdata;
1238         struct ctdb_node_mapv4 *nodemapv4;
1239         int32_t res;
1240
1241         ret = ctdb_control(ctdb, destnode, 0, 
1242                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1243                            mem_ctx, &outdata, &res, &timeout, NULL);
1244         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1245                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1246                 return -1;
1247         }
1248
1249         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1250
1251         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1252         (*nodemap) = talloc_zero_size(mem_ctx, len);
1253         CTDB_NO_MEMORY(ctdb, (*nodemap));
1254
1255         (*nodemap)->num = nodemapv4->num;
1256         for (i=0; i<nodemapv4->num; i++) {
1257                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1258                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1259                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1260                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1261         }
1262                 
1263         talloc_free(outdata.dptr);
1264                     
1265         return 0;
1266 }
1267
1268 /*
1269   drop the transport, reload the nodes file and restart the transport
1270  */
1271 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1272                     struct timeval timeout, uint32_t destnode)
1273 {
1274         int ret;
1275         int32_t res;
1276
1277         ret = ctdb_control(ctdb, destnode, 0, 
1278                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1279                            NULL, NULL, &res, &timeout, NULL);
1280         if (ret != 0 || res != 0) {
1281                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1282                 return -1;
1283         }
1284
1285         return 0;
1286 }
1287
1288
1289 /*
1290   set vnn map on a node
1291  */
1292 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1293                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1294 {
1295         int ret;
1296         TDB_DATA data;
1297         int32_t res;
1298         struct ctdb_vnn_map_wire *map;
1299         size_t len;
1300
1301         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1302         map = talloc_size(mem_ctx, len);
1303         CTDB_NO_MEMORY(ctdb, map);
1304
1305         map->generation = vnnmap->generation;
1306         map->size = vnnmap->size;
1307         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1308         
1309         data.dsize = len;
1310         data.dptr  = (uint8_t *)map;
1311
1312         ret = ctdb_control(ctdb, destnode, 0, 
1313                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1314                            NULL, NULL, &res, &timeout, NULL);
1315         if (ret != 0 || res != 0) {
1316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1317                 return -1;
1318         }
1319
1320         talloc_free(map);
1321
1322         return 0;
1323 }
1324
1325
1326 /*
1327   async send for pull database
1328  */
1329 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1330         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1331         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1332 {
1333         TDB_DATA indata;
1334         struct ctdb_control_pulldb *pull;
1335         struct ctdb_client_control_state *state;
1336
1337         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1338         CTDB_NO_MEMORY_NULL(ctdb, pull);
1339
1340         pull->db_id   = dbid;
1341         pull->lmaster = lmaster;
1342
1343         indata.dsize = sizeof(struct ctdb_control_pulldb);
1344         indata.dptr  = (unsigned char *)pull;
1345
1346         state = ctdb_control_send(ctdb, destnode, 0, 
1347                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1348                                   mem_ctx, &timeout, NULL);
1349         talloc_free(pull);
1350
1351         return state;
1352 }
1353
1354 /*
1355   async recv for pull database
1356  */
1357 int ctdb_ctrl_pulldb_recv(
1358         struct ctdb_context *ctdb, 
1359         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1360         TDB_DATA *outdata)
1361 {
1362         int ret;
1363         int32_t res;
1364
1365         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1366         if ( (ret != 0) || (res != 0) ){
1367                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1368                 return -1;
1369         }
1370
1371         return 0;
1372 }
1373
1374 /*
1375   pull all keys and records for a specific database on a node
1376  */
1377 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1378                 uint32_t dbid, uint32_t lmaster, 
1379                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1380                 TDB_DATA *outdata)
1381 {
1382         struct ctdb_client_control_state *state;
1383
1384         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1385                                       timeout);
1386         
1387         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1388 }
1389
1390
1391 /*
1392   change dmaster for all keys in the database to the new value
1393  */
1394 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1395                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1396 {
1397         int ret;
1398         TDB_DATA indata;
1399         int32_t res;
1400
1401         indata.dsize = 2*sizeof(uint32_t);
1402         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1403
1404         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1405         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1406
1407         ret = ctdb_control(ctdb, destnode, 0, 
1408                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1409                            NULL, NULL, &res, &timeout, NULL);
1410         if (ret != 0 || res != 0) {
1411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1412                 return -1;
1413         }
1414
1415         return 0;
1416 }
1417
1418 /*
1419   ping a node, return number of clients connected
1420  */
1421 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1422 {
1423         int ret;
1424         int32_t res;
1425
1426         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1427                            tdb_null, NULL, NULL, &res, NULL, NULL);
1428         if (ret != 0) {
1429                 return -1;
1430         }
1431         return res;
1432 }
1433
1434 /*
1435   find the real path to a ltdb 
1436  */
1437 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1438                    const char **path)
1439 {
1440         int ret;
1441         int32_t res;
1442         TDB_DATA data;
1443
1444         data.dptr = (uint8_t *)&dbid;
1445         data.dsize = sizeof(dbid);
1446
1447         ret = ctdb_control(ctdb, destnode, 0, 
1448                            CTDB_CONTROL_GETDBPATH, 0, data, 
1449                            mem_ctx, &data, &res, &timeout, NULL);
1450         if (ret != 0 || res != 0) {
1451                 return -1;
1452         }
1453
1454         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1455         if ((*path) == NULL) {
1456                 return -1;
1457         }
1458
1459         talloc_free(data.dptr);
1460
1461         return 0;
1462 }
1463
1464 /*
1465   find the name of a db 
1466  */
1467 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1468                    const char **name)
1469 {
1470         int ret;
1471         int32_t res;
1472         TDB_DATA data;
1473
1474         data.dptr = (uint8_t *)&dbid;
1475         data.dsize = sizeof(dbid);
1476
1477         ret = ctdb_control(ctdb, destnode, 0, 
1478                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1479                            mem_ctx, &data, &res, &timeout, NULL);
1480         if (ret != 0 || res != 0) {
1481                 return -1;
1482         }
1483
1484         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1485         if ((*name) == NULL) {
1486                 return -1;
1487         }
1488
1489         talloc_free(data.dptr);
1490
1491         return 0;
1492 }
1493
1494 /*
1495   get the health status of a db
1496  */
1497 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1498                           struct timeval timeout,
1499                           uint32_t destnode,
1500                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1501                           const char **reason)
1502 {
1503         int ret;
1504         int32_t res;
1505         TDB_DATA data;
1506
1507         data.dptr = (uint8_t *)&dbid;
1508         data.dsize = sizeof(dbid);
1509
1510         ret = ctdb_control(ctdb, destnode, 0,
1511                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1512                            mem_ctx, &data, &res, &timeout, NULL);
1513         if (ret != 0 || res != 0) {
1514                 return -1;
1515         }
1516
1517         if (data.dsize == 0) {
1518                 (*reason) = NULL;
1519                 return 0;
1520         }
1521
1522         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1523         if ((*reason) == NULL) {
1524                 return -1;
1525         }
1526
1527         talloc_free(data.dptr);
1528
1529         return 0;
1530 }
1531
1532 /*
1533   create a database
1534  */
1535 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1536                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1537 {
1538         int ret;
1539         int32_t res;
1540         TDB_DATA data;
1541
1542         data.dptr = discard_const(name);
1543         data.dsize = strlen(name)+1;
1544
1545         ret = ctdb_control(ctdb, destnode, 0, 
1546                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1547                            0, data, 
1548                            mem_ctx, &data, &res, &timeout, NULL);
1549
1550         if (ret != 0 || res != 0) {
1551                 return -1;
1552         }
1553
1554         return 0;
1555 }
1556
1557 /*
1558   get debug level on a node
1559  */
1560 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1561 {
1562         int ret;
1563         int32_t res;
1564         TDB_DATA data;
1565
1566         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1567                            ctdb, &data, &res, NULL, NULL);
1568         if (ret != 0 || res != 0) {
1569                 return -1;
1570         }
1571         if (data.dsize != sizeof(int32_t)) {
1572                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1573                          (unsigned)data.dsize));
1574                 return -1;
1575         }
1576         *level = *(int32_t *)data.dptr;
1577         talloc_free(data.dptr);
1578         return 0;
1579 }
1580
1581 /*
1582   set debug level on a node
1583  */
1584 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1585 {
1586         int ret;
1587         int32_t res;
1588         TDB_DATA data;
1589
1590         data.dptr = (uint8_t *)&level;
1591         data.dsize = sizeof(level);
1592
1593         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1594                            NULL, NULL, &res, NULL, NULL);
1595         if (ret != 0 || res != 0) {
1596                 return -1;
1597         }
1598         return 0;
1599 }
1600
1601
1602 /*
1603   get a list of connected nodes
1604  */
1605 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1606                                 struct timeval timeout,
1607                                 TALLOC_CTX *mem_ctx,
1608                                 uint32_t *num_nodes)
1609 {
1610         struct ctdb_node_map *map=NULL;
1611         int ret, i;
1612         uint32_t *nodes;
1613
1614         *num_nodes = 0;
1615
1616         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1617         if (ret != 0) {
1618                 return NULL;
1619         }
1620
1621         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1622         if (nodes == NULL) {
1623                 return NULL;
1624         }
1625
1626         for (i=0;i<map->num;i++) {
1627                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1628                         nodes[*num_nodes] = map->nodes[i].pnn;
1629                         (*num_nodes)++;
1630                 }
1631         }
1632
1633         return nodes;
1634 }
1635
1636
1637 /*
1638   reset remote status
1639  */
1640 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1641 {
1642         int ret;
1643         int32_t res;
1644
1645         ret = ctdb_control(ctdb, destnode, 0, 
1646                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1647                            NULL, NULL, &res, NULL, NULL);
1648         if (ret != 0 || res != 0) {
1649                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1650                 return -1;
1651         }
1652         return 0;
1653 }
1654
1655 /*
1656   this is the dummy null procedure that all databases support
1657 */
1658 static int ctdb_null_func(struct ctdb_call_info *call)
1659 {
1660         return 0;
1661 }
1662
1663 /*
1664   this is a plain fetch procedure that all databases support
1665 */
1666 static int ctdb_fetch_func(struct ctdb_call_info *call)
1667 {
1668         call->reply_data = &call->record_data;
1669         return 0;
1670 }
1671
1672 /*
1673   this is a plain fetch procedure that all databases support
1674   this returns the full record including the ltdb header
1675 */
1676 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
1677 {
1678         call->reply_data = talloc(call, TDB_DATA);
1679         if (call->reply_data == NULL) {
1680                 return -1;
1681         }
1682         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
1683         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
1684         if (call->reply_data->dptr == NULL) {
1685                 return -1;
1686         }
1687         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
1688         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
1689
1690         return 0;
1691 }
1692
1693 /*
1694   attach to a specific database - client call
1695 */
1696 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1697 {
1698         struct ctdb_db_context *ctdb_db;
1699         TDB_DATA data;
1700         int ret;
1701         int32_t res;
1702
1703         ctdb_db = ctdb_db_handle(ctdb, name);
1704         if (ctdb_db) {
1705                 return ctdb_db;
1706         }
1707
1708         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1709         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1710
1711         ctdb_db->ctdb = ctdb;
1712         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1713         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1714
1715         data.dptr = discard_const(name);
1716         data.dsize = strlen(name)+1;
1717
1718         /* tell ctdb daemon to attach */
1719         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1720                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1721                            0, data, ctdb_db, &data, &res, NULL, NULL);
1722         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1723                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1724                 talloc_free(ctdb_db);
1725                 return NULL;
1726         }
1727         
1728         ctdb_db->db_id = *(uint32_t *)data.dptr;
1729         talloc_free(data.dptr);
1730
1731         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1732         if (ret != 0) {
1733                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1734                 talloc_free(ctdb_db);
1735                 return NULL;
1736         }
1737
1738         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1739         if (ctdb->valgrinding) {
1740                 tdb_flags |= TDB_NOMMAP;
1741         }
1742         tdb_flags |= TDB_DISALLOW_NESTING;
1743
1744         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1745         if (ctdb_db->ltdb == NULL) {
1746                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1747                 talloc_free(ctdb_db);
1748                 return NULL;
1749         }
1750
1751         ctdb_db->persistent = persistent;
1752
1753         DLIST_ADD(ctdb->db_list, ctdb_db);
1754
1755         /* add well known functions */
1756         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1757         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1758         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1759
1760         return ctdb_db;
1761 }
1762
1763
1764 /*
1765   setup a call for a database
1766  */
1767 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1768 {
1769         struct ctdb_registered_call *call;
1770
1771 #if 0
1772         TDB_DATA data;
1773         int32_t status;
1774         struct ctdb_control_set_call c;
1775         int ret;
1776
1777         /* this is no longer valid with the separate daemon architecture */
1778         c.db_id = ctdb_db->db_id;
1779         c.fn    = fn;
1780         c.id    = id;
1781
1782         data.dptr = (uint8_t *)&c;
1783         data.dsize = sizeof(c);
1784
1785         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1786                            data, NULL, NULL, &status, NULL, NULL);
1787         if (ret != 0 || status != 0) {
1788                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1789                 return -1;
1790         }
1791 #endif
1792
1793         /* also register locally */
1794         call = talloc(ctdb_db, struct ctdb_registered_call);
1795         call->fn = fn;
1796         call->id = id;
1797
1798         DLIST_ADD(ctdb_db->calls, call);        
1799         return 0;
1800 }
1801
1802
1803 struct traverse_state {
1804         bool done;
1805         uint32_t count;
1806         ctdb_traverse_func fn;
1807         void *private_data;
1808 };
1809
1810 /*
1811   called on each key during a ctdb_traverse
1812  */
1813 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1814 {
1815         struct traverse_state *state = (struct traverse_state *)p;
1816         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1817         TDB_DATA key;
1818
1819         if (data.dsize < sizeof(uint32_t) ||
1820             d->length != data.dsize) {
1821                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1822                 state->done = True;
1823                 return;
1824         }
1825
1826         key.dsize = d->keylen;
1827         key.dptr  = &d->data[0];
1828         data.dsize = d->datalen;
1829         data.dptr = &d->data[d->keylen];
1830
1831         if (key.dsize == 0 && data.dsize == 0) {
1832                 /* end of traverse */
1833                 state->done = True;
1834                 return;
1835         }
1836
1837         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1838                 /* empty records are deleted records in ctdb */
1839                 return;
1840         }
1841
1842         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1843                 state->done = True;
1844         }
1845
1846         state->count++;
1847 }
1848
1849
1850 /*
1851   start a cluster wide traverse, calling the supplied fn on each record
1852   return the number of records traversed, or -1 on error
1853  */
1854 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1855 {
1856         TDB_DATA data;
1857         struct ctdb_traverse_start t;
1858         int32_t status;
1859         int ret;
1860         uint64_t srvid = (getpid() | 0xFLL<<60);
1861         struct traverse_state state;
1862
1863         state.done = False;
1864         state.count = 0;
1865         state.private_data = private_data;
1866         state.fn = fn;
1867
1868         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1869         if (ret != 0) {
1870                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1871                 return -1;
1872         }
1873
1874         t.db_id = ctdb_db->db_id;
1875         t.srvid = srvid;
1876         t.reqid = 0;
1877
1878         data.dptr = (uint8_t *)&t;
1879         data.dsize = sizeof(t);
1880
1881         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1882                            data, NULL, NULL, &status, NULL, NULL);
1883         if (ret != 0 || status != 0) {
1884                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1885                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1886                 return -1;
1887         }
1888
1889         while (!state.done) {
1890                 event_loop_once(ctdb_db->ctdb->ev);
1891         }
1892
1893         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1894         if (ret != 0) {
1895                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1896                 return -1;
1897         }
1898
1899         return state.count;
1900 }
1901
1902 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1903 /*
1904   called on each key during a catdb
1905  */
1906 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1907 {
1908         int i;
1909         FILE *f = (FILE *)p;
1910         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1911
1912         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1913         for (i=0;i<key.dsize;i++) {
1914                 if (ISASCII(key.dptr[i])) {
1915                         fprintf(f, "%c", key.dptr[i]);
1916                 } else {
1917                         fprintf(f, "\\%02X", key.dptr[i]);
1918                 }
1919         }
1920         fprintf(f, "\"\n");
1921
1922         fprintf(f, "dmaster: %u\n", h->dmaster);
1923         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1924
1925         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1926         for (i=sizeof(*h);i<data.dsize;i++) {
1927                 if (ISASCII(data.dptr[i])) {
1928                         fprintf(f, "%c", data.dptr[i]);
1929                 } else {
1930                         fprintf(f, "\\%02X", data.dptr[i]);
1931                 }
1932         }
1933         fprintf(f, "\"\n");
1934
1935         fprintf(f, "\n");
1936
1937         return 0;
1938 }
1939
1940 /*
1941   convenience function to list all keys to stdout
1942  */
1943 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1944 {
1945         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1946 }
1947
1948 /*
1949   get the pid of a ctdb daemon
1950  */
1951 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1952 {
1953         int ret;
1954         int32_t res;
1955
1956         ret = ctdb_control(ctdb, destnode, 0, 
1957                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1958                            NULL, NULL, &res, &timeout, NULL);
1959         if (ret != 0) {
1960                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1961                 return -1;
1962         }
1963
1964         *pid = res;
1965
1966         return 0;
1967 }
1968
1969
1970 /*
1971   async freeze send control
1972  */
1973 struct ctdb_client_control_state *
1974 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1975 {
1976         return ctdb_control_send(ctdb, destnode, priority, 
1977                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1978                            mem_ctx, &timeout, NULL);
1979 }
1980
1981 /* 
1982    async freeze recv control
1983 */
1984 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1985 {
1986         int ret;
1987         int32_t res;
1988
1989         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1990         if ( (ret != 0) || (res != 0) ){
1991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1992                 return -1;
1993         }
1994
1995         return 0;
1996 }
1997
1998 /*
1999   freeze databases of a certain priority
2000  */
2001 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2002 {
2003         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2004         struct ctdb_client_control_state *state;
2005         int ret;
2006
2007         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2008         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2009         talloc_free(tmp_ctx);
2010
2011         return ret;
2012 }
2013
2014 /* Freeze all databases */
2015 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2016 {
2017         int i;
2018
2019         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2020                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2021                         return -1;
2022                 }
2023         }
2024         return 0;
2025 }
2026
2027 /*
2028   thaw databases of a certain priority
2029  */
2030 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2031 {
2032         int ret;
2033         int32_t res;
2034
2035         ret = ctdb_control(ctdb, destnode, priority, 
2036                            CTDB_CONTROL_THAW, 0, tdb_null, 
2037                            NULL, NULL, &res, &timeout, NULL);
2038         if (ret != 0 || res != 0) {
2039                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2040                 return -1;
2041         }
2042
2043         return 0;
2044 }
2045
2046 /* thaw all databases */
2047 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2048 {
2049         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2050 }
2051
2052 /*
2053   get pnn of a node, or -1
2054  */
2055 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2056 {
2057         int ret;
2058         int32_t res;
2059
2060         ret = ctdb_control(ctdb, destnode, 0, 
2061                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2062                            NULL, NULL, &res, &timeout, NULL);
2063         if (ret != 0) {
2064                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2065                 return -1;
2066         }
2067
2068         return res;
2069 }
2070
2071 /*
2072   get the monitoring mode of a remote node
2073  */
2074 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2075 {
2076         int ret;
2077         int32_t res;
2078
2079         ret = ctdb_control(ctdb, destnode, 0, 
2080                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2081                            NULL, NULL, &res, &timeout, NULL);
2082         if (ret != 0) {
2083                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2084                 return -1;
2085         }
2086
2087         *monmode = res;
2088
2089         return 0;
2090 }
2091
2092
2093 /*
2094  set the monitoring mode of a remote node to active
2095  */
2096 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2097 {
2098         int ret;
2099         
2100
2101         ret = ctdb_control(ctdb, destnode, 0, 
2102                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2103                            NULL, NULL,NULL, &timeout, NULL);
2104         if (ret != 0) {
2105                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2106                 return -1;
2107         }
2108
2109         
2110
2111         return 0;
2112 }
2113
2114 /*
2115   set the monitoring mode of a remote node to disable
2116  */
2117 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2118 {
2119         int ret;
2120         
2121
2122         ret = ctdb_control(ctdb, destnode, 0, 
2123                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2124                            NULL, NULL, NULL, &timeout, NULL);
2125         if (ret != 0) {
2126                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2127                 return -1;
2128         }
2129
2130         
2131
2132         return 0;
2133 }
2134
2135
2136
2137 /* 
2138   sent to a node to make it take over an ip address
2139 */
2140 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2141                           uint32_t destnode, struct ctdb_public_ip *ip)
2142 {
2143         TDB_DATA data;
2144         struct ctdb_public_ipv4 ipv4;
2145         int ret;
2146         int32_t res;
2147
2148         if (ip->addr.sa.sa_family == AF_INET) {
2149                 ipv4.pnn = ip->pnn;
2150                 ipv4.sin = ip->addr.ip;
2151
2152                 data.dsize = sizeof(ipv4);
2153                 data.dptr  = (uint8_t *)&ipv4;
2154
2155                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2156                            NULL, &res, &timeout, NULL);
2157         } else {
2158                 data.dsize = sizeof(*ip);
2159                 data.dptr  = (uint8_t *)ip;
2160
2161                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2162                            NULL, &res, &timeout, NULL);
2163         }
2164
2165         if (ret != 0 || res != 0) {
2166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2167                 return -1;
2168         }
2169
2170         return 0;       
2171 }
2172
2173
2174 /* 
2175   sent to a node to make it release an ip address
2176 */
2177 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2178                          uint32_t destnode, struct ctdb_public_ip *ip)
2179 {
2180         TDB_DATA data;
2181         struct ctdb_public_ipv4 ipv4;
2182         int ret;
2183         int32_t res;
2184
2185         if (ip->addr.sa.sa_family == AF_INET) {
2186                 ipv4.pnn = ip->pnn;
2187                 ipv4.sin = ip->addr.ip;
2188
2189                 data.dsize = sizeof(ipv4);
2190                 data.dptr  = (uint8_t *)&ipv4;
2191
2192                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2193                                    NULL, &res, &timeout, NULL);
2194         } else {
2195                 data.dsize = sizeof(*ip);
2196                 data.dptr  = (uint8_t *)ip;
2197
2198                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2199                                    NULL, &res, &timeout, NULL);
2200         }
2201
2202         if (ret != 0 || res != 0) {
2203                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2204                 return -1;
2205         }
2206
2207         return 0;       
2208 }
2209
2210
2211 /*
2212   get a tunable
2213  */
2214 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2215                           struct timeval timeout, 
2216                           uint32_t destnode,
2217                           const char *name, uint32_t *value)
2218 {
2219         struct ctdb_control_get_tunable *t;
2220         TDB_DATA data, outdata;
2221         int32_t res;
2222         int ret;
2223
2224         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2225         data.dptr  = talloc_size(ctdb, data.dsize);
2226         CTDB_NO_MEMORY(ctdb, data.dptr);
2227
2228         t = (struct ctdb_control_get_tunable *)data.dptr;
2229         t->length = strlen(name)+1;
2230         memcpy(t->name, name, t->length);
2231
2232         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2233                            &outdata, &res, &timeout, NULL);
2234         talloc_free(data.dptr);
2235         if (ret != 0 || res != 0) {
2236                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2237                 return -1;
2238         }
2239
2240         if (outdata.dsize != sizeof(uint32_t)) {
2241                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2242                 talloc_free(outdata.dptr);
2243                 return -1;
2244         }
2245         
2246         *value = *(uint32_t *)outdata.dptr;
2247         talloc_free(outdata.dptr);
2248
2249         return 0;
2250 }
2251
2252 /*
2253   set a tunable
2254  */
2255 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2256                           struct timeval timeout, 
2257                           uint32_t destnode,
2258                           const char *name, uint32_t value)
2259 {
2260         struct ctdb_control_set_tunable *t;
2261         TDB_DATA data;
2262         int32_t res;
2263         int ret;
2264
2265         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2266         data.dptr  = talloc_size(ctdb, data.dsize);
2267         CTDB_NO_MEMORY(ctdb, data.dptr);
2268
2269         t = (struct ctdb_control_set_tunable *)data.dptr;
2270         t->length = strlen(name)+1;
2271         memcpy(t->name, name, t->length);
2272         t->value = value;
2273
2274         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2275                            NULL, &res, &timeout, NULL);
2276         talloc_free(data.dptr);
2277         if (ret != 0 || res != 0) {
2278                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2279                 return -1;
2280         }
2281
2282         return 0;
2283 }
2284
2285 /*
2286   list tunables
2287  */
2288 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2289                             struct timeval timeout, 
2290                             uint32_t destnode,
2291                             TALLOC_CTX *mem_ctx,
2292                             const char ***list, uint32_t *count)
2293 {
2294         TDB_DATA outdata;
2295         int32_t res;
2296         int ret;
2297         struct ctdb_control_list_tunable *t;
2298         char *p, *s, *ptr;
2299
2300         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2301                            mem_ctx, &outdata, &res, &timeout, NULL);
2302         if (ret != 0 || res != 0) {
2303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2304                 return -1;
2305         }
2306
2307         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2308         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2309             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2310                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2311                 talloc_free(outdata.dptr);
2312                 return -1;              
2313         }
2314         
2315         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2316         CTDB_NO_MEMORY(ctdb, p);
2317
2318         talloc_free(outdata.dptr);
2319         
2320         (*list) = NULL;
2321         (*count) = 0;
2322
2323         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2324                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2325                 CTDB_NO_MEMORY(ctdb, *list);
2326                 (*list)[*count] = talloc_strdup(*list, s);
2327                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2328                 (*count)++;
2329         }
2330
2331         talloc_free(p);
2332
2333         return 0;
2334 }
2335
2336
2337 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2338                                    struct timeval timeout, uint32_t destnode,
2339                                    TALLOC_CTX *mem_ctx,
2340                                    uint32_t flags,
2341                                    struct ctdb_all_public_ips **ips)
2342 {
2343         int ret;
2344         TDB_DATA outdata;
2345         int32_t res;
2346
2347         ret = ctdb_control(ctdb, destnode, 0, 
2348                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2349                            mem_ctx, &outdata, &res, &timeout, NULL);
2350         if (ret == 0 && res == -1) {
2351                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2352                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2353         }
2354         if (ret != 0 || res != 0) {
2355           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2356                 return -1;
2357         }
2358
2359         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2360         talloc_free(outdata.dptr);
2361                     
2362         return 0;
2363 }
2364
2365 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2366                              struct timeval timeout, uint32_t destnode,
2367                              TALLOC_CTX *mem_ctx,
2368                              struct ctdb_all_public_ips **ips)
2369 {
2370         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2371                                               destnode, mem_ctx,
2372                                               0, ips);
2373 }
2374
2375 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2376                         struct timeval timeout, uint32_t destnode, 
2377                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2378 {
2379         int ret, i, len;
2380         TDB_DATA outdata;
2381         int32_t res;
2382         struct ctdb_all_public_ipsv4 *ipsv4;
2383
2384         ret = ctdb_control(ctdb, destnode, 0, 
2385                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2386                            mem_ctx, &outdata, &res, &timeout, NULL);
2387         if (ret != 0 || res != 0) {
2388                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2389                 return -1;
2390         }
2391
2392         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2393         len = offsetof(struct ctdb_all_public_ips, ips) +
2394                 ipsv4->num*sizeof(struct ctdb_public_ip);
2395         *ips = talloc_zero_size(mem_ctx, len);
2396         CTDB_NO_MEMORY(ctdb, *ips);
2397         (*ips)->num = ipsv4->num;
2398         for (i=0; i<ipsv4->num; i++) {
2399                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2400                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2401         }
2402
2403         talloc_free(outdata.dptr);
2404                     
2405         return 0;
2406 }
2407
2408 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2409                                  struct timeval timeout, uint32_t destnode,
2410                                  TALLOC_CTX *mem_ctx,
2411                                  const ctdb_sock_addr *addr,
2412                                  struct ctdb_control_public_ip_info **_info)
2413 {
2414         int ret;
2415         TDB_DATA indata;
2416         TDB_DATA outdata;
2417         int32_t res;
2418         struct ctdb_control_public_ip_info *info;
2419         uint32_t len;
2420         uint32_t i;
2421
2422         indata.dptr = discard_const_p(uint8_t, addr);
2423         indata.dsize = sizeof(*addr);
2424
2425         ret = ctdb_control(ctdb, destnode, 0,
2426                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2427                            mem_ctx, &outdata, &res, &timeout, NULL);
2428         if (ret != 0 || res != 0) {
2429                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2430                                 "failed ret:%d res:%d\n",
2431                                 ret, res));
2432                 return -1;
2433         }
2434
2435         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2436         if (len > outdata.dsize) {
2437                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2438                                 "returned invalid data with size %u > %u\n",
2439                                 (unsigned int)outdata.dsize,
2440                                 (unsigned int)len));
2441                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2442                 return -1;
2443         }
2444
2445         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2446         len += info->num*sizeof(struct ctdb_control_iface_info);
2447
2448         if (len > outdata.dsize) {
2449                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2450                                 "returned invalid data with size %u > %u\n",
2451                                 (unsigned int)outdata.dsize,
2452                                 (unsigned int)len));
2453                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2454                 return -1;
2455         }
2456
2457         /* make sure we null terminate the returned strings */
2458         for (i=0; i < info->num; i++) {
2459                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2460         }
2461
2462         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2463                                                                 outdata.dptr,
2464                                                                 outdata.dsize);
2465         talloc_free(outdata.dptr);
2466         if (*_info == NULL) {
2467                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2468                                 "talloc_memdup size %u failed\n",
2469                                 (unsigned int)outdata.dsize));
2470                 return -1;
2471         }
2472
2473         return 0;
2474 }
2475
2476 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2477                          struct timeval timeout, uint32_t destnode,
2478                          TALLOC_CTX *mem_ctx,
2479                          struct ctdb_control_get_ifaces **_ifaces)
2480 {
2481         int ret;
2482         TDB_DATA outdata;
2483         int32_t res;
2484         struct ctdb_control_get_ifaces *ifaces;
2485         uint32_t len;
2486         uint32_t i;
2487
2488         ret = ctdb_control(ctdb, destnode, 0,
2489                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2490                            mem_ctx, &outdata, &res, &timeout, NULL);
2491         if (ret != 0 || res != 0) {
2492                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2493                                 "failed ret:%d res:%d\n",
2494                                 ret, res));
2495                 return -1;
2496         }
2497
2498         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2499         if (len > outdata.dsize) {
2500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2501                                 "returned invalid data with size %u > %u\n",
2502                                 (unsigned int)outdata.dsize,
2503                                 (unsigned int)len));
2504                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2505                 return -1;
2506         }
2507
2508         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2509         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2510
2511         if (len > outdata.dsize) {
2512                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2513                                 "returned invalid data with size %u > %u\n",
2514                                 (unsigned int)outdata.dsize,
2515                                 (unsigned int)len));
2516                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2517                 return -1;
2518         }
2519
2520         /* make sure we null terminate the returned strings */
2521         for (i=0; i < ifaces->num; i++) {
2522                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2523         }
2524
2525         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2526                                                                   outdata.dptr,
2527                                                                   outdata.dsize);
2528         talloc_free(outdata.dptr);
2529         if (*_ifaces == NULL) {
2530                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2531                                 "talloc_memdup size %u failed\n",
2532                                 (unsigned int)outdata.dsize));
2533                 return -1;
2534         }
2535
2536         return 0;
2537 }
2538
2539 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2540                              struct timeval timeout, uint32_t destnode,
2541                              TALLOC_CTX *mem_ctx,
2542                              const struct ctdb_control_iface_info *info)
2543 {
2544         int ret;
2545         TDB_DATA indata;
2546         int32_t res;
2547
2548         indata.dptr = discard_const_p(uint8_t, info);
2549         indata.dsize = sizeof(*info);
2550
2551         ret = ctdb_control(ctdb, destnode, 0,
2552                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2553                            mem_ctx, NULL, &res, &timeout, NULL);
2554         if (ret != 0 || res != 0) {
2555                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2556                                 "failed ret:%d res:%d\n",
2557                                 ret, res));
2558                 return -1;
2559         }
2560
2561         return 0;
2562 }
2563
2564 /*
2565   set/clear the permanent disabled bit on a remote node
2566  */
2567 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2568                        uint32_t set, uint32_t clear)
2569 {
2570         int ret;
2571         TDB_DATA data;
2572         struct ctdb_node_map *nodemap=NULL;
2573         struct ctdb_node_flag_change c;
2574         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2575         uint32_t recmaster;
2576         uint32_t *nodes;
2577
2578
2579         /* find the recovery master */
2580         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2581         if (ret != 0) {
2582                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2583                 talloc_free(tmp_ctx);
2584                 return ret;
2585         }
2586
2587
2588         /* read the node flags from the recmaster */
2589         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2590         if (ret != 0) {
2591                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2592                 talloc_free(tmp_ctx);
2593                 return -1;
2594         }
2595         if (destnode >= nodemap->num) {
2596                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2597                 talloc_free(tmp_ctx);
2598                 return -1;
2599         }
2600
2601         c.pnn       = destnode;
2602         c.old_flags = nodemap->nodes[destnode].flags;
2603         c.new_flags = c.old_flags;
2604         c.new_flags |= set;
2605         c.new_flags &= ~clear;
2606
2607         data.dsize = sizeof(c);
2608         data.dptr = (unsigned char *)&c;
2609
2610         /* send the flags update to all connected nodes */
2611         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2612
2613         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2614                                         nodes, 0,
2615                                         timeout, false, data,
2616                                         NULL, NULL,
2617                                         NULL) != 0) {
2618                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2619
2620                 talloc_free(tmp_ctx);
2621                 return -1;
2622         }
2623
2624         talloc_free(tmp_ctx);
2625         return 0;
2626 }
2627
2628
2629 /*
2630   get all tunables
2631  */
2632 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2633                                struct timeval timeout, 
2634                                uint32_t destnode,
2635                                struct ctdb_tunable *tunables)
2636 {
2637         TDB_DATA outdata;
2638         int ret;
2639         int32_t res;
2640
2641         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2642                            &outdata, &res, &timeout, NULL);
2643         if (ret != 0 || res != 0) {
2644                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2645                 return -1;
2646         }
2647
2648         if (outdata.dsize != sizeof(*tunables)) {
2649                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2650                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2651                 return -1;              
2652         }
2653
2654         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2655         talloc_free(outdata.dptr);
2656         return 0;
2657 }
2658
2659 /*
2660   add a public address to a node
2661  */
2662 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2663                       struct timeval timeout, 
2664                       uint32_t destnode,
2665                       struct ctdb_control_ip_iface *pub)
2666 {
2667         TDB_DATA data;
2668         int32_t res;
2669         int ret;
2670
2671         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2672         data.dptr  = (unsigned char *)pub;
2673
2674         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2675                            NULL, &res, &timeout, NULL);
2676         if (ret != 0 || res != 0) {
2677                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2678                 return -1;
2679         }
2680
2681         return 0;
2682 }
2683
2684 /*
2685   delete a public address from a node
2686  */
2687 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2688                       struct timeval timeout, 
2689                       uint32_t destnode,
2690                       struct ctdb_control_ip_iface *pub)
2691 {
2692         TDB_DATA data;
2693         int32_t res;
2694         int ret;
2695
2696         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2697         data.dptr  = (unsigned char *)pub;
2698
2699         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2700                            NULL, &res, &timeout, NULL);
2701         if (ret != 0 || res != 0) {
2702                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2703                 return -1;
2704         }
2705
2706         return 0;
2707 }
2708
2709 /*
2710   kill a tcp connection
2711  */
2712 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2713                       struct timeval timeout, 
2714                       uint32_t destnode,
2715                       struct ctdb_control_killtcp *killtcp)
2716 {
2717         TDB_DATA data;
2718         int32_t res;
2719         int ret;
2720
2721         data.dsize = sizeof(struct ctdb_control_killtcp);
2722         data.dptr  = (unsigned char *)killtcp;
2723
2724         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2725                            NULL, &res, &timeout, NULL);
2726         if (ret != 0 || res != 0) {
2727                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2728                 return -1;
2729         }
2730
2731         return 0;
2732 }
2733
2734 /*
2735   send a gratious arp
2736  */
2737 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2738                       struct timeval timeout, 
2739                       uint32_t destnode,
2740                       ctdb_sock_addr *addr,
2741                       const char *ifname)
2742 {
2743         TDB_DATA data;
2744         int32_t res;
2745         int ret, len;
2746         struct ctdb_control_gratious_arp *gratious_arp;
2747         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2748
2749
2750         len = strlen(ifname)+1;
2751         gratious_arp = talloc_size(tmp_ctx, 
2752                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2753         CTDB_NO_MEMORY(ctdb, gratious_arp);
2754
2755         gratious_arp->addr = *addr;
2756         gratious_arp->len = len;
2757         memcpy(&gratious_arp->iface[0], ifname, len);
2758
2759
2760         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2761         data.dptr  = (unsigned char *)gratious_arp;
2762
2763         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2764                            NULL, &res, &timeout, NULL);
2765         if (ret != 0 || res != 0) {
2766                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2767                 talloc_free(tmp_ctx);
2768                 return -1;
2769         }
2770
2771         talloc_free(tmp_ctx);
2772         return 0;
2773 }
2774
2775 /*
2776   get a list of all tcp tickles that a node knows about for a particular vnn
2777  */
2778 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2779                               struct timeval timeout, uint32_t destnode, 
2780                               TALLOC_CTX *mem_ctx, 
2781                               ctdb_sock_addr *addr,
2782                               struct ctdb_control_tcp_tickle_list **list)
2783 {
2784         int ret;
2785         TDB_DATA data, outdata;
2786         int32_t status;
2787
2788         data.dptr = (uint8_t*)addr;
2789         data.dsize = sizeof(ctdb_sock_addr);
2790
2791         ret = ctdb_control(ctdb, destnode, 0, 
2792                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2793                            mem_ctx, &outdata, &status, NULL, NULL);
2794         if (ret != 0 || status != 0) {
2795                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2796                 return -1;
2797         }
2798
2799         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2800
2801         return status;
2802 }
2803
2804 /*
2805   register a server id
2806  */
2807 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2808                       struct timeval timeout, 
2809                       struct ctdb_server_id *id)
2810 {
2811         TDB_DATA data;
2812         int32_t res;
2813         int ret;
2814
2815         data.dsize = sizeof(struct ctdb_server_id);
2816         data.dptr  = (unsigned char *)id;
2817
2818         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2819                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2820                         0, data, NULL,
2821                         NULL, &res, &timeout, NULL);
2822         if (ret != 0 || res != 0) {
2823                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2824                 return -1;
2825         }
2826
2827         return 0;
2828 }
2829
2830 /*
2831   unregister a server id
2832  */
2833 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2834                       struct timeval timeout, 
2835                       struct ctdb_server_id *id)
2836 {
2837         TDB_DATA data;
2838         int32_t res;
2839         int ret;
2840
2841         data.dsize = sizeof(struct ctdb_server_id);
2842         data.dptr  = (unsigned char *)id;
2843
2844         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2845                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2846                         0, data, NULL,
2847                         NULL, &res, &timeout, NULL);
2848         if (ret != 0 || res != 0) {
2849                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2850                 return -1;
2851         }
2852
2853         return 0;
2854 }
2855
2856
2857 /*
2858   check if a server id exists
2859
2860   if a server id does exist, return *status == 1, otherwise *status == 0
2861  */
2862 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2863                       struct timeval timeout, 
2864                       uint32_t destnode,
2865                       struct ctdb_server_id *id,
2866                       uint32_t *status)
2867 {
2868         TDB_DATA data;
2869         int32_t res;
2870         int ret;
2871
2872         data.dsize = sizeof(struct ctdb_server_id);
2873         data.dptr  = (unsigned char *)id;
2874
2875         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2876                         0, data, NULL,
2877                         NULL, &res, &timeout, NULL);
2878         if (ret != 0) {
2879                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2880                 return -1;
2881         }
2882
2883         if (res) {
2884                 *status = 1;
2885         } else {
2886                 *status = 0;
2887         }
2888
2889         return 0;
2890 }
2891
2892 /*
2893    get the list of server ids that are registered on a node
2894 */
2895 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2896                 TALLOC_CTX *mem_ctx,
2897                 struct timeval timeout, uint32_t destnode, 
2898                 struct ctdb_server_id_list **svid_list)
2899 {
2900         int ret;
2901         TDB_DATA outdata;
2902         int32_t res;
2903
2904         ret = ctdb_control(ctdb, destnode, 0, 
2905                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2906                            mem_ctx, &outdata, &res, &timeout, NULL);
2907         if (ret != 0 || res != 0) {
2908                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2909                 return -1;
2910         }
2911
2912         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2913                     
2914         return 0;
2915 }
2916
2917 /*
2918   initialise the ctdb daemon for client applications
2919
2920   NOTE: In current code the daemon does not fork. This is for testing purposes only
2921   and to simplify the code.
2922 */
2923 struct ctdb_context *ctdb_init(struct event_context *ev)
2924 {
2925         int ret;
2926         struct ctdb_context *ctdb;
2927
2928         ctdb = talloc_zero(ev, struct ctdb_context);
2929         if (ctdb == NULL) {
2930                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2931                 return NULL;
2932         }
2933         ctdb->ev  = ev;
2934         ctdb->idr = idr_init(ctdb);
2935         /* Wrap early to exercise code. */
2936         ctdb->lastid = INT_MAX-200;
2937         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2938
2939         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2940         if (ret != 0) {
2941                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2942                 talloc_free(ctdb);
2943                 return NULL;
2944         }
2945
2946         ctdb->statistics.statistics_start_time = timeval_current();
2947
2948         return ctdb;
2949 }
2950
2951
2952 /*
2953   set some ctdb flags
2954 */
2955 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2956 {
2957         ctdb->flags |= flags;
2958 }
2959
2960 /*
2961   setup the local socket name
2962 */
2963 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2964 {
2965         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2966         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2967
2968         return 0;
2969 }
2970
2971 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2972 {
2973         return ctdb->daemon.name;
2974 }
2975
2976 /*
2977   return the pnn of this node
2978 */
2979 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2980 {
2981         return ctdb->pnn;
2982 }
2983
2984
2985 /*
2986   get the uptime of a remote node
2987  */
2988 struct ctdb_client_control_state *
2989 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2990 {
2991         return ctdb_control_send(ctdb, destnode, 0, 
2992                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2993                            mem_ctx, &timeout, NULL);
2994 }
2995
2996 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2997 {
2998         int ret;
2999         int32_t res;
3000         TDB_DATA outdata;
3001
3002         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3003         if (ret != 0 || res != 0) {
3004                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3005                 return -1;
3006         }
3007
3008         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3009
3010         return 0;
3011 }
3012
3013 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3014 {
3015         struct ctdb_client_control_state *state;
3016
3017         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3018         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3019 }
3020
3021 /*
3022   send a control to execute the "recovered" event script on a node
3023  */
3024 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3025 {
3026         int ret;
3027         int32_t status;
3028
3029         ret = ctdb_control(ctdb, destnode, 0, 
3030                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3031                            NULL, NULL, &status, &timeout, NULL);
3032         if (ret != 0 || status != 0) {
3033                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3034                 return -1;
3035         }
3036
3037         return 0;
3038 }
3039
3040 /* 
3041   callback for the async helpers used when sending the same control
3042   to multiple nodes in parallell.
3043 */
3044 static void async_callback(struct ctdb_client_control_state *state)
3045 {
3046         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3047         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3048         int ret;
3049         TDB_DATA outdata;
3050         int32_t res;
3051         uint32_t destnode = state->c->hdr.destnode;
3052
3053         /* one more node has responded with recmode data */
3054         data->count--;
3055
3056         /* if we failed to push the db, then return an error and let
3057            the main loop try again.
3058         */
3059         if (state->state != CTDB_CONTROL_DONE) {
3060                 if ( !data->dont_log_errors) {
3061                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3062                 }
3063                 data->fail_count++;
3064                 if (data->fail_callback) {
3065                         data->fail_callback(ctdb, destnode, res, outdata,
3066                                         data->callback_data);
3067                 }
3068                 return;
3069         }
3070         
3071         state->async.fn = NULL;
3072
3073         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3074         if ((ret != 0) || (res != 0)) {
3075                 if ( !data->dont_log_errors) {
3076                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3077                 }
3078                 data->fail_count++;
3079                 if (data->fail_callback) {
3080                         data->fail_callback(ctdb, destnode, res, outdata,
3081                                         data->callback_data);
3082                 }
3083         }
3084         if ((ret == 0) && (data->callback != NULL)) {
3085                 data->callback(ctdb, destnode, res, outdata,
3086                                         data->callback_data);
3087         }
3088 }
3089
3090
3091 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3092 {
3093         /* set up the callback functions */
3094         state->async.fn = async_callback;
3095         state->async.private_data = data;
3096         
3097         /* one more control to wait for to complete */
3098         data->count++;
3099 }
3100
3101
3102 /* wait for up to the maximum number of seconds allowed
3103    or until all nodes we expect a response from has replied
3104 */
3105 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3106 {
3107         while (data->count > 0) {
3108                 event_loop_once(ctdb->ev);
3109         }
3110         if (data->fail_count != 0) {
3111                 if (!data->dont_log_errors) {
3112                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3113                                  data->fail_count));
3114                 }
3115                 return -1;
3116         }
3117         return 0;
3118 }
3119
3120
3121 /* 
3122    perform a simple control on the listed nodes
3123    The control cannot return data
3124  */
3125 int ctdb_client_async_control(struct ctdb_context *ctdb,
3126                                 enum ctdb_controls opcode,
3127                                 uint32_t *nodes,
3128                                 uint64_t srvid,
3129                                 struct timeval timeout,
3130                                 bool dont_log_errors,
3131                                 TDB_DATA data,
3132                                 client_async_callback client_callback,
3133                                 client_async_callback fail_callback,
3134                                 void *callback_data)
3135 {
3136         struct client_async_data *async_data;
3137         struct ctdb_client_control_state *state;
3138         int j, num_nodes;
3139
3140         async_data = talloc_zero(ctdb, struct client_async_data);
3141         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3142         async_data->dont_log_errors = dont_log_errors;
3143         async_data->callback = client_callback;
3144         async_data->fail_callback = fail_callback;
3145         async_data->callback_data = callback_data;
3146         async_data->opcode        = opcode;
3147
3148         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3149
3150         /* loop over all nodes and send an async control to each of them */
3151         for (j=0; j<num_nodes; j++) {
3152                 uint32_t pnn = nodes[j];
3153
3154                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3155                                           0, data, async_data, &timeout, NULL);
3156                 if (state == NULL) {
3157                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3158                         talloc_free(async_data);
3159                         return -1;
3160                 }
3161                 
3162                 ctdb_client_async_add(async_data, state);
3163         }
3164
3165         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3166                 talloc_free(async_data);
3167                 return -1;
3168         }
3169
3170         talloc_free(async_data);
3171         return 0;
3172 }
3173
3174 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3175                                 struct ctdb_vnn_map *vnn_map,
3176                                 TALLOC_CTX *mem_ctx,
3177                                 bool include_self)
3178 {
3179         int i, j, num_nodes;
3180         uint32_t *nodes;
3181
3182         for (i=num_nodes=0;i<vnn_map->size;i++) {
3183                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3184                         continue;
3185                 }
3186                 num_nodes++;
3187         } 
3188
3189         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3190         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3191
3192         for (i=j=0;i<vnn_map->size;i++) {
3193                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3194                         continue;
3195                 }
3196                 nodes[j++] = vnn_map->map[i];
3197         } 
3198
3199         return nodes;
3200 }
3201
3202 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3203                                 struct ctdb_node_map *node_map,
3204                                 TALLOC_CTX *mem_ctx,
3205                                 bool include_self)
3206 {
3207         int i, j, num_nodes;
3208         uint32_t *nodes;
3209
3210         for (i=num_nodes=0;i<node_map->num;i++) {
3211                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3212                         continue;
3213                 }
3214                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3215                         continue;
3216                 }
3217                 num_nodes++;
3218         } 
3219
3220         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3221         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3222
3223         for (i=j=0;i<node_map->num;i++) {
3224                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3225                         continue;
3226                 }
3227                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3228                         continue;
3229                 }
3230                 nodes[j++] = node_map->nodes[i].pnn;
3231         } 
3232
3233         return nodes;
3234 }
3235
3236 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3237                                 struct ctdb_node_map *node_map,
3238                                 TALLOC_CTX *mem_ctx,
3239                                 uint32_t pnn)
3240 {
3241         int i, j, num_nodes;
3242         uint32_t *nodes;
3243
3244         for (i=num_nodes=0;i<node_map->num;i++) {
3245                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3246                         continue;
3247                 }
3248                 if (node_map->nodes[i].pnn == pnn) {
3249                         continue;
3250                 }
3251                 num_nodes++;
3252         } 
3253
3254         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3255         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3256
3257         for (i=j=0;i<node_map->num;i++) {
3258                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3259                         continue;
3260                 }
3261                 if (node_map->nodes[i].pnn == pnn) {
3262                         continue;
3263                 }
3264                 nodes[j++] = node_map->nodes[i].pnn;
3265         } 
3266
3267         return nodes;
3268 }
3269
3270 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3271                                 struct ctdb_node_map *node_map,
3272                                 TALLOC_CTX *mem_ctx,
3273                                 bool include_self)
3274 {
3275         int i, j, num_nodes;
3276         uint32_t *nodes;
3277
3278         for (i=num_nodes=0;i<node_map->num;i++) {
3279                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3280                         continue;
3281                 }
3282                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3283                         continue;
3284                 }
3285                 num_nodes++;
3286         } 
3287
3288         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3289         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3290
3291         for (i=j=0;i<node_map->num;i++) {
3292                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3293                         continue;
3294                 }
3295                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3296                         continue;
3297                 }
3298                 nodes[j++] = node_map->nodes[i].pnn;
3299         } 
3300
3301         return nodes;
3302 }
3303
3304 /* 
3305   this is used to test if a pnn lock exists and if it exists will return
3306   the number of connections that pnn has reported or -1 if that recovery
3307   daemon is not running.
3308 */
3309 int
3310 ctdb_read_pnn_lock(int fd, int32_t pnn)
3311 {
3312         struct flock lock;
3313         char c;
3314
3315         lock.l_type = F_WRLCK;
3316         lock.l_whence = SEEK_SET;
3317         lock.l_start = pnn;
3318         lock.l_len = 1;
3319         lock.l_pid = 0;
3320
3321         if (fcntl(fd, F_GETLK, &lock) != 0) {
3322                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3323                 return -1;
3324         }
3325
3326         if (lock.l_type == F_UNLCK) {
3327                 return -1;
3328         }
3329
3330         if (pread(fd, &c, 1, pnn) == -1) {
3331                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3332                 return -1;
3333         }
3334
3335         return c;
3336 }
3337
3338 /*
3339   get capabilities of a remote node
3340  */
3341 struct ctdb_client_control_state *
3342 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3343 {
3344         return ctdb_control_send(ctdb, destnode, 0, 
3345                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3346                            mem_ctx, &timeout, NULL);
3347 }
3348
3349 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3350 {
3351         int ret;
3352         int32_t res;
3353         TDB_DATA outdata;
3354
3355         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3356         if ( (ret != 0) || (res != 0) ) {
3357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3358                 return -1;
3359         }
3360
3361         if (capabilities) {
3362                 *capabilities = *((uint32_t *)outdata.dptr);
3363         }
3364
3365         return 0;
3366 }
3367
3368 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3369 {
3370         struct ctdb_client_control_state *state;
3371         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3372         int ret;
3373
3374         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3375         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3376         talloc_free(tmp_ctx);
3377         return ret;
3378 }
3379
3380 /**
3381  * check whether a transaction is active on a given db on a given node
3382  */
3383 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3384                                      uint32_t destnode,
3385                                      uint32_t db_id)
3386 {
3387         int32_t status;
3388         int ret;
3389         TDB_DATA indata;
3390
3391         indata.dptr = (uint8_t *)&db_id;
3392         indata.dsize = sizeof(db_id);
3393
3394         ret = ctdb_control(ctdb, destnode, 0,
3395                            CTDB_CONTROL_TRANS2_ACTIVE,
3396                            0, indata, NULL, NULL, &status,
3397                            NULL, NULL);
3398
3399         if (ret != 0) {
3400                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3401                 return -1;
3402         }
3403
3404         return status;
3405 }
3406
3407
3408 struct ctdb_transaction_handle {
3409         struct ctdb_db_context *ctdb_db;
3410         bool in_replay;
3411         /*
3412          * we store the reads and writes done under a transaction:
3413          * - one list stores both reads and writes (m_all),
3414          * - the other just writes (m_write)
3415          */
3416         struct ctdb_marshall_buffer *m_all;
3417         struct ctdb_marshall_buffer *m_write;
3418 };
3419
3420 /* start a transaction on a database */
3421 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3422 {
3423         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3424         return 0;
3425 }
3426
3427 /* start a transaction on a database */
3428 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3429 {
3430         struct ctdb_record_handle *rh;
3431         TDB_DATA key;
3432         TDB_DATA data;
3433         struct ctdb_ltdb_header header;
3434         TALLOC_CTX *tmp_ctx;
3435         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3436         int ret;
3437         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3438         pid_t pid;
3439         int32_t status;
3440
3441         key.dptr = discard_const(keyname);
3442         key.dsize = strlen(keyname);
3443
3444         if (!ctdb_db->persistent) {
3445                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3446                 return -1;
3447         }
3448
3449 again:
3450         tmp_ctx = talloc_new(h);
3451
3452         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3453         if (rh == NULL) {
3454                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3455                 talloc_free(tmp_ctx);
3456                 return -1;
3457         }
3458
3459         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3460                                               CTDB_CURRENT_NODE,
3461                                               ctdb_db->db_id);
3462         if (status == 1) {
3463                 unsigned long int usec = (1000 + random()) % 100000;
3464                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3465                                     "on db_id[0x%08x]. waiting for %lu "
3466                                     "microseconds\n",
3467                                     ctdb_db->db_id, usec));
3468                 talloc_free(tmp_ctx);
3469                 usleep(usec);
3470                 goto again;
3471         }
3472
3473         /*
3474          * store the pid in the database:
3475          * it is not enough that the node is dmaster...
3476          */
3477         pid = getpid();
3478         data.dptr = (unsigned char *)&pid;
3479         data.dsize = sizeof(pid_t);
3480         rh->header.rsn++;
3481         rh->header.dmaster = ctdb_db->ctdb->pnn;
3482         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3483         if (ret != 0) {
3484                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3485                                   "transaction record\n"));
3486                 talloc_free(tmp_ctx);
3487                 return -1;
3488         }
3489
3490         talloc_free(rh);
3491
3492         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3493         if (ret != 0) {
3494                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3495                 talloc_free(tmp_ctx);
3496                 return -1;
3497         }
3498
3499         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3500         if (ret != 0) {
3501                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3502                                  "lock record inside transaction\n"));
3503                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3504                 talloc_free(tmp_ctx);
3505                 goto again;
3506         }
3507
3508         if (header.dmaster != ctdb_db->ctdb->pnn) {
3509                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3510                                    "transaction lock record\n"));
3511                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3512                 talloc_free(tmp_ctx);
3513                 goto again;
3514         }
3515
3516         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3517                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3518                                     "the transaction lock record\n"));
3519                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3520                 talloc_free(tmp_ctx);
3521                 goto again;
3522         }
3523
3524         talloc_free(tmp_ctx);
3525
3526         return 0;
3527 }
3528
3529
3530 /* start a transaction on a database */
3531 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3532                                                        TALLOC_CTX *mem_ctx)
3533 {
3534         struct ctdb_transaction_handle *h;
3535         int ret;
3536
3537         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3538         if (h == NULL) {
3539                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3540                 return NULL;
3541         }
3542
3543         h->ctdb_db = ctdb_db;
3544
3545         ret = ctdb_transaction_fetch_start(h);
3546         if (ret != 0) {
3547                 talloc_free(h);
3548                 return NULL;
3549         }
3550
3551         talloc_set_destructor(h, ctdb_transaction_destructor);
3552
3553         return h;
3554 }
3555
3556
3557
3558 /*
3559   fetch a record inside a transaction
3560  */
3561 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3562                            TALLOC_CTX *mem_ctx, 
3563                            TDB_DATA key, TDB_DATA *data)
3564 {
3565         struct ctdb_ltdb_header header;
3566         int ret;
3567
3568         ZERO_STRUCT(header);
3569
3570         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3571         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3572                 /* record doesn't exist yet */
3573                 *data = tdb_null;
3574                 ret = 0;
3575         }
3576         
3577         if (ret != 0) {
3578                 return ret;
3579         }
3580
3581         if (!h->in_replay) {
3582                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3583                 if (h->m_all == NULL) {
3584                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3585                         return -1;
3586                 }
3587         }
3588
3589         return 0;
3590 }
3591
3592 /*
3593   stores a record inside a transaction
3594  */
3595 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3596                            TDB_DATA key, TDB_DATA data)
3597 {
3598         TALLOC_CTX *tmp_ctx = talloc_new(h);
3599         struct ctdb_ltdb_header header;
3600         TDB_DATA olddata;
3601         int ret;
3602
3603         ZERO_STRUCT(header);
3604
3605         /* we need the header so we can update the RSN */
3606         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3607         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3608                 /* the record doesn't exist - create one with us as dmaster.
3609                    This is only safe because we are in a transaction and this
3610                    is a persistent database */
3611                 ZERO_STRUCT(header);
3612         } else if (ret != 0) {
3613                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3614                 talloc_free(tmp_ctx);
3615                 return ret;
3616         }
3617
3618         if (data.dsize == olddata.dsize &&
3619             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3620                 /* save writing the same data */
3621                 talloc_free(tmp_ctx);
3622                 return 0;
3623         }
3624
3625         header.dmaster = h->ctdb_db->ctdb->pnn;
3626         header.rsn++;
3627
3628         if (!h->in_replay) {
3629                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3630                 if (h->m_all == NULL) {
3631                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3632                         talloc_free(tmp_ctx);
3633                         return -1;
3634                 }
3635         }               
3636
3637         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3638         if (h->m_write == NULL) {
3639                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3640                 talloc_free(tmp_ctx);
3641                 return -1;
3642         }
3643         
3644         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3645
3646         talloc_free(tmp_ctx);
3647         
3648         return ret;
3649 }
3650
3651 /*
3652   replay a transaction
3653  */
3654 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3655 {
3656         int ret, i;
3657         struct ctdb_rec_data *rec = NULL;
3658
3659         h->in_replay = true;
3660         talloc_free(h->m_write);
3661         h->m_write = NULL;
3662
3663         ret = ctdb_transaction_fetch_start(h);
3664         if (ret != 0) {
3665                 return ret;
3666         }
3667
3668         for (i=0;i<h->m_all->count;i++) {
3669                 TDB_DATA key, data;
3670
3671                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3672                 if (rec == NULL) {
3673                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3674                         goto failed;
3675                 }
3676
3677                 if (rec->reqid == 0) {
3678                         /* its a store */
3679                         if (ctdb_transaction_store(h, key, data) != 0) {
3680                                 goto failed;
3681                         }
3682                 } else {
3683                         TDB_DATA data2;
3684                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3685
3686                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3687                                 talloc_free(tmp_ctx);
3688                                 goto failed;
3689                         }
3690                         if (data2.dsize != data.dsize ||
3691                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3692                                 /* the record has changed on us - we have to give up */
3693                                 talloc_free(tmp_ctx);
3694                                 goto failed;
3695                         }
3696                         talloc_free(tmp_ctx);
3697                 }
3698         }
3699         
3700         return 0;
3701
3702 failed:
3703         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3704         return -1;
3705 }
3706
3707
3708 /*
3709   commit a transaction
3710  */
3711 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3712 {
3713         int ret, retries=0;
3714         int32_t status;
3715         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3716         struct timeval timeout;
3717         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3718
3719         talloc_set_destructor(h, NULL);
3720
3721         /* our commit strategy is quite complex.
3722
3723            - we first try to commit the changes to all other nodes
3724
3725            - if that works, then we commit locally and we are done
3726
3727            - if a commit on another node fails, then we need to cancel
3728              the transaction, then restart the transaction (thus
3729              opening a window of time for a pending recovery to
3730              complete), then replay the transaction, checking all the
3731              reads and writes (checking that reads give the same data,
3732              and writes succeed). Then we retry the transaction to the
3733              other nodes
3734         */
3735
3736 again:
3737         if (h->m_write == NULL) {
3738                 /* no changes were made */
3739                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3740                 talloc_free(h);
3741                 return 0;
3742         }
3743
3744         /* tell ctdbd to commit to the other nodes */
3745         timeout = timeval_current_ofs(1, 0);
3746         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3747                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3748                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3749                            &timeout, NULL);
3750         if (ret != 0 || status != 0) {
3751                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3752                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3753                                      ", retrying after 1 second...\n",
3754                                      (retries==0)?"":"retry "));
3755                 sleep(1);
3756
3757                 if (ret != 0) {
3758                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3759                 } else {
3760                         /* work out what error code we will give if we 
3761                            have to fail the operation */
3762                         switch ((enum ctdb_trans2_commit_error)status) {
3763                         case CTDB_TRANS2_COMMIT_SUCCESS:
3764                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3765                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3766                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3767                                 break;
3768                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3769                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3770                                 break;
3771                         }
3772                 }
3773
3774                 if (++retries == 100) {
3775                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3776                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3777                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3778                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3779                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3780                         talloc_free(h);
3781                         return -1;
3782                 }               
3783
3784                 if (ctdb_replay_transaction(h) != 0) {
3785                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3786                                           "transaction on db 0x%08x, "
3787                                           "failure control =%u\n",
3788                                           h->ctdb_db->db_id,
3789                                           (unsigned)failure_control));
3790                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3791                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3792                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3793                         talloc_free(h);
3794                         return -1;
3795                 }
3796                 goto again;
3797         } else {
3798                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3799         }
3800
3801         /* do the real commit locally */
3802         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3803         if (ret != 0) {
3804                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3805                                   "on db id 0x%08x locally, "
3806                                   "failure_control=%u\n",
3807                                   h->ctdb_db->db_id,
3808                                   (unsigned)failure_control));
3809                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3810                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3811                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3812                 talloc_free(h);
3813                 return ret;
3814         }
3815
3816         /* tell ctdbd that we are finished with our local commit */
3817         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3818                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3819                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3820         talloc_free(h);
3821         return 0;
3822 }
3823
3824 /*
3825   recovery daemon ping to main daemon
3826  */
3827 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3828 {
3829         int ret;
3830         int32_t res;
3831
3832         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3833                            ctdb, NULL, &res, NULL, NULL);
3834         if (ret != 0 || res != 0) {
3835                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3836                 return -1;
3837         }
3838
3839         return 0;
3840 }
3841
3842 /* when forking the main daemon and the child process needs to connect back
3843  * to the daemon as a client process, this function can be used to change
3844  * the ctdb context from daemon into client mode
3845  */
3846 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3847 {
3848         int ret;
3849         va_list ap;
3850
3851         /* Add extra information so we can identify this in the logs */
3852         va_start(ap, fmt);
3853         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3854         va_end(ap);
3855
3856         /* shutdown the transport */
3857         if (ctdb->methods) {
3858                 ctdb->methods->shutdown(ctdb);
3859         }
3860
3861         /* get a new event context */
3862         talloc_free(ctdb->ev);
3863         ctdb->ev = event_context_init(ctdb);
3864         tevent_loop_allow_nesting(ctdb->ev);
3865
3866         close(ctdb->daemon.sd);
3867         ctdb->daemon.sd = -1;
3868
3869         /* the client does not need to be realtime */
3870         if (ctdb->do_setsched) {
3871                 ctdb_restore_scheduler(ctdb);
3872         }
3873
3874         /* initialise ctdb */
3875         ret = ctdb_socket_connect(ctdb);
3876         if (ret != 0) {
3877                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3878                 return -1;
3879         }
3880
3881          return 0;
3882 }
3883
3884 /*
3885   get the status of running the monitor eventscripts: NULL means never run.
3886  */
3887 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3888                 struct timeval timeout, uint32_t destnode, 
3889                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3890                 struct ctdb_scripts_wire **script_status)
3891 {
3892         int ret;
3893         TDB_DATA outdata, indata;
3894         int32_t res;
3895         uint32_t uinttype = type;
3896
3897         indata.dptr = (uint8_t *)&uinttype;
3898         indata.dsize = sizeof(uinttype);
3899
3900         ret = ctdb_control(ctdb, destnode, 0, 
3901                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3902                            mem_ctx, &outdata, &res, &timeout, NULL);
3903         if (ret != 0 || res != 0) {
3904                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3905                 return -1;
3906         }
3907
3908         if (outdata.dsize == 0) {
3909                 *script_status = NULL;
3910         } else {
3911                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3912                 talloc_free(outdata.dptr);
3913         }
3914                     
3915         return 0;
3916 }
3917
3918 /*
3919   tell the main daemon how long it took to lock the reclock file
3920  */
3921 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3922 {
3923         int ret;
3924         int32_t res;
3925         TDB_DATA data;
3926
3927         data.dptr = (uint8_t *)&latency;
3928         data.dsize = sizeof(latency);
3929
3930         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3931                            ctdb, NULL, &res, NULL, NULL);
3932         if (ret != 0 || res != 0) {
3933                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3934                 return -1;
3935         }
3936
3937         return 0;
3938 }
3939
3940 /*
3941   get the name of the reclock file
3942  */
3943 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3944                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3945                          const char **name)
3946 {
3947         int ret;
3948         int32_t res;
3949         TDB_DATA data;
3950
3951         ret = ctdb_control(ctdb, destnode, 0, 
3952                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3953                            mem_ctx, &data, &res, &timeout, NULL);
3954         if (ret != 0 || res != 0) {
3955                 return -1;
3956         }
3957
3958         if (data.dsize == 0) {
3959                 *name = NULL;
3960         } else {
3961                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3962         }
3963         talloc_free(data.dptr);
3964
3965         return 0;
3966 }
3967
3968 /*
3969   set the reclock filename for a node
3970  */
3971 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3972 {
3973         int ret;
3974         TDB_DATA data;
3975         int32_t res;
3976
3977         if (reclock == NULL) {
3978                 data.dsize = 0;
3979                 data.dptr  = NULL;
3980         } else {
3981                 data.dsize = strlen(reclock) + 1;
3982                 data.dptr  = discard_const(reclock);
3983         }
3984
3985         ret = ctdb_control(ctdb, destnode, 0, 
3986                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3987                            NULL, NULL, &res, &timeout, NULL);
3988         if (ret != 0 || res != 0) {
3989                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3990                 return -1;
3991         }
3992
3993         return 0;
3994 }
3995
3996 /*
3997   stop a node
3998  */
3999 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4000 {
4001         int ret;
4002         int32_t res;
4003
4004         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4005                            ctdb, NULL, &res, &timeout, NULL);
4006         if (ret != 0 || res != 0) {
4007                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4008                 return -1;
4009         }
4010
4011         return 0;
4012 }
4013
4014 /*
4015   continue a node
4016  */
4017 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4018 {
4019         int ret;
4020
4021         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4022                            ctdb, NULL, NULL, &timeout, NULL);
4023         if (ret != 0) {
4024                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4025                 return -1;
4026         }
4027
4028         return 0;
4029 }
4030
4031 /*
4032   set the natgw state for a node
4033  */
4034 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4035 {
4036         int ret;
4037         TDB_DATA data;
4038         int32_t res;
4039
4040         data.dsize = sizeof(natgwstate);
4041         data.dptr  = (uint8_t *)&natgwstate;
4042
4043         ret = ctdb_control(ctdb, destnode, 0, 
4044                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4045                            NULL, NULL, &res, &timeout, NULL);
4046         if (ret != 0 || res != 0) {
4047                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4048                 return -1;
4049         }
4050
4051         return 0;
4052 }
4053
4054 /*
4055   set the lmaster role for a node
4056  */
4057 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4058 {
4059         int ret;
4060         TDB_DATA data;
4061         int32_t res;
4062
4063         data.dsize = sizeof(lmasterrole);
4064         data.dptr  = (uint8_t *)&lmasterrole;
4065
4066         ret = ctdb_control(ctdb, destnode, 0, 
4067                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4068                            NULL, NULL, &res, &timeout, NULL);
4069         if (ret != 0 || res != 0) {
4070                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4071                 return -1;
4072         }
4073
4074         return 0;
4075 }
4076
4077 /*
4078   set the recmaster role for a node
4079  */
4080 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4081 {
4082         int ret;
4083         TDB_DATA data;
4084         int32_t res;
4085
4086         data.dsize = sizeof(recmasterrole);
4087         data.dptr  = (uint8_t *)&recmasterrole;
4088
4089         ret = ctdb_control(ctdb, destnode, 0, 
4090                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4091                            NULL, NULL, &res, &timeout, NULL);
4092         if (ret != 0 || res != 0) {
4093                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4094                 return -1;
4095         }
4096
4097         return 0;
4098 }
4099
4100 /* enable an eventscript
4101  */
4102 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4103 {
4104         int ret;
4105         TDB_DATA data;
4106         int32_t res;
4107
4108         data.dsize = strlen(script) + 1;
4109         data.dptr  = discard_const(script);
4110
4111         ret = ctdb_control(ctdb, destnode, 0, 
4112                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4113                            NULL, NULL, &res, &timeout, NULL);
4114         if (ret != 0 || res != 0) {
4115                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4116                 return -1;
4117         }
4118
4119         return 0;
4120 }
4121
4122 /* disable an eventscript
4123  */
4124 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4125 {
4126         int ret;
4127         TDB_DATA data;
4128         int32_t res;
4129
4130         data.dsize = strlen(script) + 1;
4131         data.dptr  = discard_const(script);
4132
4133         ret = ctdb_control(ctdb, destnode, 0, 
4134                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4135                            NULL, NULL, &res, &timeout, NULL);
4136         if (ret != 0 || res != 0) {
4137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4138                 return -1;
4139         }
4140
4141         return 0;
4142 }
4143
4144
4145 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4146 {
4147         int ret;
4148         TDB_DATA data;
4149         int32_t res;
4150
4151         data.dsize = sizeof(*bantime);
4152         data.dptr  = (uint8_t *)bantime;
4153
4154         ret = ctdb_control(ctdb, destnode, 0, 
4155                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4156                            NULL, NULL, &res, &timeout, NULL);
4157         if (ret != 0 || res != 0) {
4158                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4159                 return -1;
4160         }
4161
4162         return 0;
4163 }
4164
4165
4166 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4167 {
4168         int ret;
4169         TDB_DATA outdata;
4170         int32_t res;
4171         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4172
4173         ret = ctdb_control(ctdb, destnode, 0, 
4174                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4175                            tmp_ctx, &outdata, &res, &timeout, NULL);
4176         if (ret != 0 || res != 0) {
4177                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4178                 talloc_free(tmp_ctx);
4179                 return -1;
4180         }
4181
4182         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4183         talloc_free(tmp_ctx);
4184
4185         return 0;
4186 }
4187
4188
4189 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4190 {
4191         int ret;
4192         int32_t res;
4193         TDB_DATA data;
4194         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4195
4196         data.dptr = (uint8_t*)db_prio;
4197         data.dsize = sizeof(*db_prio);
4198
4199         ret = ctdb_control(ctdb, destnode, 0, 
4200                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4201                            tmp_ctx, NULL, &res, &timeout, NULL);
4202         if (ret != 0 || res != 0) {
4203                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4204                 talloc_free(tmp_ctx);
4205                 return -1;
4206         }
4207
4208         talloc_free(tmp_ctx);
4209
4210         return 0;
4211 }
4212
4213 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4214 {
4215         int ret;
4216         int32_t res;
4217         TDB_DATA data;
4218         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4219
4220         data.dptr = (uint8_t*)&db_id;
4221         data.dsize = sizeof(db_id);
4222
4223         ret = ctdb_control(ctdb, destnode, 0, 
4224                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4225                            tmp_ctx, NULL, &res, &timeout, NULL);
4226         if (ret != 0 || res < 0) {
4227                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4228                 talloc_free(tmp_ctx);
4229                 return -1;
4230         }
4231
4232         if (priority) {
4233                 *priority = res;
4234         }
4235
4236         talloc_free(tmp_ctx);
4237
4238         return 0;
4239 }
4240
4241 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4242 {
4243         int ret;
4244         TDB_DATA outdata;
4245         int32_t res;
4246
4247         ret = ctdb_control(ctdb, destnode, 0, 
4248                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4249                            mem_ctx, &outdata, &res, &timeout, NULL);
4250         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4251                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4252                 return -1;
4253         }
4254
4255         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4256         talloc_free(outdata.dptr);
4257                     
4258         return 0;
4259 }
4260
4261 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4262 {
4263         if (h == NULL) {
4264                 return NULL;
4265         }
4266
4267         return &h->header;
4268 }