4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
36 allocate a packet for use in client<->daemon communication
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
40 enum ctdb_operation operation,
41 size_t length, size_t slength,
45 struct ctdb_req_header *hdr;
47 length = MAX(length, slength);
48 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
50 hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
52 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53 operation, (unsigned)length));
56 talloc_set_name_const(hdr, type);
57 memset(hdr, 0, slength);
59 hdr->operation = operation;
60 hdr->ctdb_magic = CTDB_MAGIC;
61 hdr->ctdb_version = CTDB_VERSION;
62 hdr->srcnode = ctdb->pnn;
64 hdr->generation = ctdb->vnn_map->generation;
71 local version of ctdb_call
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
77 struct ctdb_call_info *c;
78 struct ctdb_registered_call *fn;
79 struct ctdb_context *ctdb = ctdb_db->ctdb;
81 c = talloc(ctdb, struct ctdb_call_info);
82 CTDB_NO_MEMORY(ctdb, c);
85 c->call_data = &call->call_data;
86 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87 c->record_data.dsize = data->dsize;
88 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
94 for (fn=ctdb_db->calls;fn;fn=fn->next) {
95 if (fn->id == call->call_id) break;
98 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
103 if (fn->fn(c) != 0) {
104 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
109 /* we need to force the record to be written out if this was a remote access */
110 if (c->new_data == NULL) {
111 c->new_data = &c->record_data;
115 /* XXX check that we always have the lock here? */
116 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124 call->reply_data = *c->reply_data;
126 talloc_steal(call, call->reply_data.dptr);
127 talloc_set_name_const(call->reply_data.dptr, __location__);
129 call->reply_data.dptr = NULL;
130 call->reply_data.dsize = 0;
132 call->status = c->status;
141 queue a packet for sending from client to daemon
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
145 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
150 called when a CTDB_REPLY_CALL packet comes in in the client
152 This packet comes in response to a CTDB_REQ_CALL request packet. It
153 contains any reply data from the call
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
157 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158 struct ctdb_client_call_state *state;
160 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
162 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
166 if (hdr->reqid != state->reqid) {
167 /* we found a record but it was the wrong one */
168 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
172 state->call->reply_data.dptr = c->data;
173 state->call->reply_data.dsize = c->datalen;
174 state->call->status = c->status;
176 talloc_steal(state, c);
178 state->state = CTDB_CALL_DONE;
180 if (state->async.fn) {
181 state->async.fn(state);
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
188 this is called in the client, when data comes in from the daemon
190 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
192 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
196 /* place the packet as a child of a tmp_ctx. We then use
197 talloc_free() below to free it. If any of the calls want
198 to keep it, then they will steal it somewhere else, and the
199 talloc_free() will be a no-op */
200 tmp_ctx = talloc_new(ctdb);
201 talloc_steal(tmp_ctx, hdr);
204 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
208 if (cnt < sizeof(*hdr)) {
209 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
212 if (cnt != hdr->length) {
213 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
214 (unsigned)hdr->length, (unsigned)cnt);
218 if (hdr->ctdb_magic != CTDB_MAGIC) {
219 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
223 if (hdr->ctdb_version != CTDB_VERSION) {
224 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
228 switch (hdr->operation) {
229 case CTDB_REPLY_CALL:
230 ctdb_client_reply_call(ctdb, hdr);
233 case CTDB_REQ_MESSAGE:
234 ctdb_request_message(ctdb, hdr);
237 case CTDB_REPLY_CONTROL:
238 ctdb_client_reply_control(ctdb, hdr);
242 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
246 talloc_free(tmp_ctx);
250 connect to a unix domain socket
252 int ctdb_socket_connect(struct ctdb_context *ctdb)
254 struct sockaddr_un addr;
256 memset(&addr, 0, sizeof(addr));
257 addr.sun_family = AF_UNIX;
258 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
260 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
261 if (ctdb->daemon.sd == -1) {
262 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
266 set_nonblocking(ctdb->daemon.sd);
267 set_close_on_exec(ctdb->daemon.sd);
269 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
270 close(ctdb->daemon.sd);
271 ctdb->daemon.sd = -1;
272 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
276 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
278 ctdb_client_read_cb, ctdb, "to-ctdbd");
283 struct ctdb_record_handle {
284 struct ctdb_db_context *ctdb_db;
287 struct ctdb_ltdb_header header;
292 make a recv call to the local ctdb daemon - called from client context
294 This is called when the program wants to wait for a ctdb_call to complete and get the
295 results. This call will block unless the call has already completed.
297 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
303 while (state->state < CTDB_CALL_DONE) {
304 event_loop_once(state->ctdb_db->ctdb->ev);
306 if (state->state != CTDB_CALL_DONE) {
307 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
312 if (state->call->reply_data.dsize) {
313 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
314 state->call->reply_data.dptr,
315 state->call->reply_data.dsize);
316 call->reply_data.dsize = state->call->reply_data.dsize;
318 call->reply_data.dptr = NULL;
319 call->reply_data.dsize = 0;
321 call->status = state->call->status;
331 destroy a ctdb_call in client
333 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
335 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
340 construct an event driven local ctdb_call
342 this is used so that locally processed ctdb_call requests are processed
343 in an event driven manner
345 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
346 struct ctdb_call *call,
347 struct ctdb_ltdb_header *header,
350 struct ctdb_client_call_state *state;
351 struct ctdb_context *ctdb = ctdb_db->ctdb;
354 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
355 CTDB_NO_MEMORY_NULL(ctdb, state);
356 state->call = talloc_zero(state, struct ctdb_call);
357 CTDB_NO_MEMORY_NULL(ctdb, state->call);
359 talloc_steal(state, data->dptr);
361 state->state = CTDB_CALL_DONE;
362 *(state->call) = *call;
363 state->ctdb_db = ctdb_db;
365 ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
371 make a ctdb call to the local daemon - async send. Called from client context.
373 This constructs a ctdb_call request and queues it for processing.
374 This call never blocks.
376 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
377 struct ctdb_call *call)
379 struct ctdb_client_call_state *state;
380 struct ctdb_context *ctdb = ctdb_db->ctdb;
381 struct ctdb_ltdb_header header;
385 struct ctdb_req_call *c;
387 /* if the domain socket is not yet open, open it */
388 if (ctdb->daemon.sd==-1) {
389 ctdb_socket_connect(ctdb);
392 ret = ctdb_ltdb_lock(ctdb_db, call->key);
394 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
398 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400 if (ret == 0 && header.dmaster == ctdb->pnn) {
401 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
402 talloc_free(data.dptr);
403 ctdb_ltdb_unlock(ctdb_db, call->key);
407 ctdb_ltdb_unlock(ctdb_db, call->key);
408 talloc_free(data.dptr);
410 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
412 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
415 state->call = talloc_zero(state, struct ctdb_call);
416 if (state->call == NULL) {
417 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
421 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
422 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
424 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
428 state->reqid = ctdb_reqid_new(ctdb, state);
429 state->ctdb_db = ctdb_db;
430 talloc_set_destructor(state, ctdb_client_call_destructor);
432 c->hdr.reqid = state->reqid;
433 c->flags = call->flags;
434 c->db_id = ctdb_db->db_id;
435 c->callid = call->call_id;
437 c->keylen = call->key.dsize;
438 c->calldatalen = call->call_data.dsize;
439 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
440 memcpy(&c->data[call->key.dsize],
441 call->call_data.dptr, call->call_data.dsize);
442 *(state->call) = *call;
443 state->call->call_data.dptr = &c->data[call->key.dsize];
444 state->call->key.dptr = &c->data[0];
446 state->state = CTDB_CALL_WAIT;
449 ctdb_client_queue_pkt(ctdb, &c->hdr);
456 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
458 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
460 struct ctdb_client_call_state *state;
462 state = ctdb_call_send(ctdb_db, call);
463 return ctdb_call_recv(state, call);
468 tell the daemon what messaging srvid we will use, and register the message
469 handler function in the client
471 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
472 ctdb_msg_fn_t handler,
479 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
480 tdb_null, NULL, NULL, &status, NULL, NULL);
481 if (res != 0 || status != 0) {
482 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
486 /* also need to register the handler with our own ctdb structure */
487 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
491 tell the daemon we no longer want a srvid
493 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
499 tdb_null, NULL, NULL, &status, NULL, NULL);
500 if (res != 0 || status != 0) {
501 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
505 /* also need to register the handler with our own ctdb structure */
506 ctdb_deregister_message_handler(ctdb, srvid, private_data);
512 send a message - from client context
514 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
515 uint64_t srvid, TDB_DATA data)
517 struct ctdb_req_message *r;
520 len = offsetof(struct ctdb_req_message, data) + data.dsize;
521 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
522 len, struct ctdb_req_message);
523 CTDB_NO_MEMORY(ctdb, r);
525 r->hdr.destnode = pnn;
527 r->datalen = data.dsize;
528 memcpy(&r->data[0], data.dptr, data.dsize);
530 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
541 cancel a ctdb_fetch_lock operation, releasing the lock
543 static int fetch_lock_destructor(struct ctdb_record_handle *h)
545 ctdb_ltdb_unlock(h->ctdb_db, h->key);
550 force the migration of a record to this node
552 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
554 struct ctdb_call call;
556 call.call_id = CTDB_NULL_FUNC;
558 call.flags = CTDB_IMMEDIATE_MIGRATION;
559 return ctdb_call(ctdb_db, &call);
563 get a lock on a record, and return the records data. Blocks until it gets the lock
565 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
566 TDB_DATA key, TDB_DATA *data)
569 struct ctdb_record_handle *h;
572 procedure is as follows:
574 1) get the chain lock.
575 2) check if we are dmaster
576 3) if we are the dmaster then return handle
577 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
579 5) when we get the reply, goto (1)
582 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
587 h->ctdb_db = ctdb_db;
589 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
590 if (h->key.dptr == NULL) {
596 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
597 (const char *)key.dptr));
600 /* step 1 - get the chain lock */
601 ret = ctdb_ltdb_lock(ctdb_db, key);
603 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
608 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
610 talloc_set_destructor(h, fetch_lock_destructor);
612 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
614 /* when torturing, ensure we test the remote path */
615 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
617 h->header.dmaster = (uint32_t)-1;
621 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
623 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
624 ctdb_ltdb_unlock(ctdb_db, key);
625 ret = ctdb_client_force_migration(ctdb_db, key);
627 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
634 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
639 store some data to the record that was locked with ctdb_fetch_lock()
641 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
643 if (h->ctdb_db->persistent) {
644 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
648 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
652 non-locking fetch of a record
654 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
655 TDB_DATA key, TDB_DATA *data)
657 struct ctdb_call call;
660 call.call_id = CTDB_FETCH_FUNC;
661 call.call_data.dptr = NULL;
662 call.call_data.dsize = 0;
664 ret = ctdb_call(ctdb_db, &call);
667 *data = call.reply_data;
668 talloc_steal(mem_ctx, data->dptr);
677 called when a control completes or timesout to invoke the callback
678 function the user provided
680 static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
681 struct timeval t, void *private_data)
683 struct ctdb_client_control_state *state;
684 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
687 state = talloc_get_type(private_data, struct ctdb_client_control_state);
688 talloc_steal(tmp_ctx, state);
690 ret = ctdb_control_recv(state->ctdb, state, state,
695 talloc_free(tmp_ctx);
699 called when a CTDB_REPLY_CONTROL packet comes in in the client
701 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
702 contains any reply data from the control
704 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
705 struct ctdb_req_header *hdr)
707 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
708 struct ctdb_client_control_state *state;
710 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
712 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
716 if (hdr->reqid != state->reqid) {
717 /* we found a record but it was the wrong one */
718 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
722 state->outdata.dptr = c->data;
723 state->outdata.dsize = c->datalen;
724 state->status = c->status;
726 state->errormsg = talloc_strndup(state,
727 (char *)&c->data[c->datalen],
731 /* state->outdata now uses resources from c so we dont want c
732 to just dissappear from under us while state is still alive
734 talloc_steal(state, c);
736 state->state = CTDB_CONTROL_DONE;
738 /* if we had a callback registered for this control, pull the response
739 and call the callback.
741 if (state->async.fn) {
742 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
748 destroy a ctdb_control in client
750 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
752 ctdb_reqid_remove(state->ctdb, state->reqid);
757 /* time out handler for ctdb_control */
758 static void control_timeout_func(struct event_context *ev, struct timed_event *te,
759 struct timeval t, void *private_data)
761 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
763 DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
764 "dstnode:%u\n", state->reqid, state->c->opcode,
765 state->c->hdr.destnode));
767 state->state = CTDB_CONTROL_TIMEOUT;
769 /* if we had a callback registered for this control, pull the response
770 and call the callback.
772 if (state->async.fn) {
773 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
777 /* async version of send control request */
778 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
779 uint32_t destnode, uint64_t srvid,
780 uint32_t opcode, uint32_t flags, TDB_DATA data,
782 struct timeval *timeout,
785 struct ctdb_client_control_state *state;
787 struct ctdb_req_control *c;
794 /* if the domain socket is not yet open, open it */
795 if (ctdb->daemon.sd==-1) {
796 ctdb_socket_connect(ctdb);
799 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
800 CTDB_NO_MEMORY_NULL(ctdb, state);
803 state->reqid = ctdb_reqid_new(ctdb, state);
804 state->state = CTDB_CONTROL_WAIT;
805 state->errormsg = NULL;
807 talloc_set_destructor(state, ctdb_control_destructor);
809 len = offsetof(struct ctdb_req_control, data) + data.dsize;
810 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
811 len, struct ctdb_req_control);
813 CTDB_NO_MEMORY_NULL(ctdb, c);
814 c->hdr.reqid = state->reqid;
815 c->hdr.destnode = destnode;
820 c->datalen = data.dsize;
822 memcpy(&c->data[0], data.dptr, data.dsize);
826 if (timeout && !timeval_is_zero(timeout)) {
827 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
830 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
836 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
845 /* async version of receive control reply */
846 int ctdb_control_recv(struct ctdb_context *ctdb,
847 struct ctdb_client_control_state *state,
849 TDB_DATA *outdata, int32_t *status, char **errormsg)
853 if (status != NULL) {
856 if (errormsg != NULL) {
864 /* prevent double free of state */
865 tmp_ctx = talloc_new(ctdb);
866 talloc_steal(tmp_ctx, state);
868 /* loop one event at a time until we either timeout or the control
871 while (state->state == CTDB_CONTROL_WAIT) {
872 event_loop_once(ctdb->ev);
875 if (state->state != CTDB_CONTROL_DONE) {
876 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
877 if (state->async.fn) {
878 state->async.fn(state);
880 talloc_free(tmp_ctx);
884 if (state->errormsg) {
885 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
887 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
889 if (state->async.fn) {
890 state->async.fn(state);
892 talloc_free(tmp_ctx);
897 *outdata = state->outdata;
898 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
902 *status = state->status;
905 if (state->async.fn) {
906 state->async.fn(state);
909 talloc_free(tmp_ctx);
916 send a ctdb control message
917 timeout specifies how long we should wait for a reply.
918 if timeout is NULL we wait indefinitely
920 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
921 uint32_t opcode, uint32_t flags, TDB_DATA data,
922 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
923 struct timeval *timeout,
926 struct ctdb_client_control_state *state;
928 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
929 flags, data, mem_ctx,
931 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
939 a process exists call. Returns 0 if process exists, -1 otherwise
941 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
947 data.dptr = (uint8_t*)&pid;
948 data.dsize = sizeof(pid);
950 ret = ctdb_control(ctdb, destnode, 0,
951 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
952 NULL, NULL, &status, NULL, NULL);
954 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
962 get remote statistics
964 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
970 ret = ctdb_control(ctdb, destnode, 0,
971 CTDB_CONTROL_STATISTICS, 0, tdb_null,
972 ctdb, &data, &res, NULL, NULL);
973 if (ret != 0 || res != 0) {
974 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
978 if (data.dsize != sizeof(struct ctdb_statistics)) {
979 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
980 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
984 *status = *(struct ctdb_statistics *)data.dptr;
985 talloc_free(data.dptr);
991 shutdown a remote ctdb node
993 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
995 struct ctdb_client_control_state *state;
997 state = ctdb_control_send(ctdb, destnode, 0,
998 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
999 NULL, &timeout, NULL);
1000 if (state == NULL) {
1001 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1009 get vnn map from a remote node
1011 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1016 struct ctdb_vnn_map_wire *map;
1018 ret = ctdb_control(ctdb, destnode, 0,
1019 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1020 mem_ctx, &outdata, &res, &timeout, NULL);
1021 if (ret != 0 || res != 0) {
1022 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1026 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1027 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1028 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1029 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1033 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1034 CTDB_NO_MEMORY(ctdb, *vnnmap);
1035 (*vnnmap)->generation = map->generation;
1036 (*vnnmap)->size = map->size;
1037 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1039 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1040 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1041 talloc_free(outdata.dptr);
1048 get the recovery mode of a remote node
1050 struct ctdb_client_control_state *
1051 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1053 return ctdb_control_send(ctdb, destnode, 0,
1054 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1055 mem_ctx, &timeout, NULL);
1058 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1063 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1065 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1070 *recmode = (uint32_t)res;
1076 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1078 struct ctdb_client_control_state *state;
1080 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1081 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1088 set the recovery mode of a remote node
1090 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1096 data.dsize = sizeof(uint32_t);
1097 data.dptr = (unsigned char *)&recmode;
1099 ret = ctdb_control(ctdb, destnode, 0,
1100 CTDB_CONTROL_SET_RECMODE, 0, data,
1101 NULL, NULL, &res, &timeout, NULL);
1102 if (ret != 0 || res != 0) {
1103 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1113 get the recovery master of a remote node
1115 struct ctdb_client_control_state *
1116 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1117 struct timeval timeout, uint32_t destnode)
1119 return ctdb_control_send(ctdb, destnode, 0,
1120 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1121 mem_ctx, &timeout, NULL);
1124 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1129 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1131 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1136 *recmaster = (uint32_t)res;
1142 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1144 struct ctdb_client_control_state *state;
1146 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1147 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1152 set the recovery master of a remote node
1154 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1161 data.dsize = sizeof(uint32_t);
1162 data.dptr = (unsigned char *)&recmaster;
1164 ret = ctdb_control(ctdb, destnode, 0,
1165 CTDB_CONTROL_SET_RECMASTER, 0, data,
1166 NULL, NULL, &res, &timeout, NULL);
1167 if (ret != 0 || res != 0) {
1168 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1177 get a list of databases off a remote node
1179 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1180 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1186 ret = ctdb_control(ctdb, destnode, 0,
1187 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1188 mem_ctx, &outdata, &res, &timeout, NULL);
1189 if (ret != 0 || res != 0) {
1190 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1194 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1195 talloc_free(outdata.dptr);
1201 get a list of nodes (vnn and flags ) from a remote node
1203 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1204 struct timeval timeout, uint32_t destnode,
1205 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1211 ret = ctdb_control(ctdb, destnode, 0,
1212 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1213 mem_ctx, &outdata, &res, &timeout, NULL);
1214 if (ret == 0 && res == -1 && outdata.dsize == 0) {
1215 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1216 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1218 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1219 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1223 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1224 talloc_free(outdata.dptr);
1230 old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1232 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb,
1233 struct timeval timeout, uint32_t destnode,
1234 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1238 struct ctdb_node_mapv4 *nodemapv4;
1241 ret = ctdb_control(ctdb, destnode, 0,
1242 CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null,
1243 mem_ctx, &outdata, &res, &timeout, NULL);
1244 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1245 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1249 nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1251 len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1252 (*nodemap) = talloc_zero_size(mem_ctx, len);
1253 CTDB_NO_MEMORY(ctdb, (*nodemap));
1255 (*nodemap)->num = nodemapv4->num;
1256 for (i=0; i<nodemapv4->num; i++) {
1257 (*nodemap)->nodes[i].pnn = nodemapv4->nodes[i].pnn;
1258 (*nodemap)->nodes[i].flags = nodemapv4->nodes[i].flags;
1259 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1260 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1263 talloc_free(outdata.dptr);
1269 drop the transport, reload the nodes file and restart the transport
1271 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1272 struct timeval timeout, uint32_t destnode)
1277 ret = ctdb_control(ctdb, destnode, 0,
1278 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1279 NULL, NULL, &res, &timeout, NULL);
1280 if (ret != 0 || res != 0) {
1281 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1290 set vnn map on a node
1292 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1293 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1298 struct ctdb_vnn_map_wire *map;
1301 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1302 map = talloc_size(mem_ctx, len);
1303 CTDB_NO_MEMORY(ctdb, map);
1305 map->generation = vnnmap->generation;
1306 map->size = vnnmap->size;
1307 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1310 data.dptr = (uint8_t *)map;
1312 ret = ctdb_control(ctdb, destnode, 0,
1313 CTDB_CONTROL_SETVNNMAP, 0, data,
1314 NULL, NULL, &res, &timeout, NULL);
1315 if (ret != 0 || res != 0) {
1316 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1327 async send for pull database
1329 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1330 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1331 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1334 struct ctdb_control_pulldb *pull;
1335 struct ctdb_client_control_state *state;
1337 pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1338 CTDB_NO_MEMORY_NULL(ctdb, pull);
1341 pull->lmaster = lmaster;
1343 indata.dsize = sizeof(struct ctdb_control_pulldb);
1344 indata.dptr = (unsigned char *)pull;
1346 state = ctdb_control_send(ctdb, destnode, 0,
1347 CTDB_CONTROL_PULL_DB, 0, indata,
1348 mem_ctx, &timeout, NULL);
1355 async recv for pull database
1357 int ctdb_ctrl_pulldb_recv(
1358 struct ctdb_context *ctdb,
1359 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1365 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1366 if ( (ret != 0) || (res != 0) ){
1367 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1375 pull all keys and records for a specific database on a node
1377 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1378 uint32_t dbid, uint32_t lmaster,
1379 TALLOC_CTX *mem_ctx, struct timeval timeout,
1382 struct ctdb_client_control_state *state;
1384 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1387 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1392 change dmaster for all keys in the database to the new value
1394 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1395 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1401 indata.dsize = 2*sizeof(uint32_t);
1402 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1404 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1405 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1407 ret = ctdb_control(ctdb, destnode, 0,
1408 CTDB_CONTROL_SET_DMASTER, 0, indata,
1409 NULL, NULL, &res, &timeout, NULL);
1410 if (ret != 0 || res != 0) {
1411 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1419 ping a node, return number of clients connected
1421 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1426 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1427 tdb_null, NULL, NULL, &res, NULL, NULL);
1435 find the real path to a ltdb
1437 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1444 data.dptr = (uint8_t *)&dbid;
1445 data.dsize = sizeof(dbid);
1447 ret = ctdb_control(ctdb, destnode, 0,
1448 CTDB_CONTROL_GETDBPATH, 0, data,
1449 mem_ctx, &data, &res, &timeout, NULL);
1450 if (ret != 0 || res != 0) {
1454 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1455 if ((*path) == NULL) {
1459 talloc_free(data.dptr);
1465 find the name of a db
1467 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1474 data.dptr = (uint8_t *)&dbid;
1475 data.dsize = sizeof(dbid);
1477 ret = ctdb_control(ctdb, destnode, 0,
1478 CTDB_CONTROL_GET_DBNAME, 0, data,
1479 mem_ctx, &data, &res, &timeout, NULL);
1480 if (ret != 0 || res != 0) {
1484 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1485 if ((*name) == NULL) {
1489 talloc_free(data.dptr);
1495 get the health status of a db
1497 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1498 struct timeval timeout,
1500 uint32_t dbid, TALLOC_CTX *mem_ctx,
1501 const char **reason)
1507 data.dptr = (uint8_t *)&dbid;
1508 data.dsize = sizeof(dbid);
1510 ret = ctdb_control(ctdb, destnode, 0,
1511 CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1512 mem_ctx, &data, &res, &timeout, NULL);
1513 if (ret != 0 || res != 0) {
1517 if (data.dsize == 0) {
1522 (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1523 if ((*reason) == NULL) {
1527 talloc_free(data.dptr);
1535 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1536 TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1542 data.dptr = discard_const(name);
1543 data.dsize = strlen(name)+1;
1545 ret = ctdb_control(ctdb, destnode, 0,
1546 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1548 mem_ctx, &data, &res, &timeout, NULL);
1550 if (ret != 0 || res != 0) {
1558 get debug level on a node
1560 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1566 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1567 ctdb, &data, &res, NULL, NULL);
1568 if (ret != 0 || res != 0) {
1571 if (data.dsize != sizeof(int32_t)) {
1572 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1573 (unsigned)data.dsize));
1576 *level = *(int32_t *)data.dptr;
1577 talloc_free(data.dptr);
1582 set debug level on a node
1584 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1590 data.dptr = (uint8_t *)&level;
1591 data.dsize = sizeof(level);
1593 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1594 NULL, NULL, &res, NULL, NULL);
1595 if (ret != 0 || res != 0) {
1603 get a list of connected nodes
1605 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1606 struct timeval timeout,
1607 TALLOC_CTX *mem_ctx,
1608 uint32_t *num_nodes)
1610 struct ctdb_node_map *map=NULL;
1616 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1621 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1622 if (nodes == NULL) {
1626 for (i=0;i<map->num;i++) {
1627 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1628 nodes[*num_nodes] = map->nodes[i].pnn;
1640 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1645 ret = ctdb_control(ctdb, destnode, 0,
1646 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1647 NULL, NULL, &res, NULL, NULL);
1648 if (ret != 0 || res != 0) {
1649 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1656 this is the dummy null procedure that all databases support
1658 static int ctdb_null_func(struct ctdb_call_info *call)
1664 this is a plain fetch procedure that all databases support
1666 static int ctdb_fetch_func(struct ctdb_call_info *call)
1668 call->reply_data = &call->record_data;
1673 this is a plain fetch procedure that all databases support
1674 this returns the full record including the ltdb header
1676 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
1678 call->reply_data = talloc(call, TDB_DATA);
1679 if (call->reply_data == NULL) {
1682 call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
1683 call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize);
1684 if (call->reply_data->dptr == NULL) {
1687 memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
1688 memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
1694 attach to a specific database - client call
1696 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1698 struct ctdb_db_context *ctdb_db;
1703 ctdb_db = ctdb_db_handle(ctdb, name);
1708 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1709 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1711 ctdb_db->ctdb = ctdb;
1712 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1713 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1715 data.dptr = discard_const(name);
1716 data.dsize = strlen(name)+1;
1718 /* tell ctdb daemon to attach */
1719 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
1720 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1721 0, data, ctdb_db, &data, &res, NULL, NULL);
1722 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1723 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1724 talloc_free(ctdb_db);
1728 ctdb_db->db_id = *(uint32_t *)data.dptr;
1729 talloc_free(data.dptr);
1731 ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1733 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1734 talloc_free(ctdb_db);
1738 tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1739 if (ctdb->valgrinding) {
1740 tdb_flags |= TDB_NOMMAP;
1742 tdb_flags |= TDB_DISALLOW_NESTING;
1744 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1745 if (ctdb_db->ltdb == NULL) {
1746 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1747 talloc_free(ctdb_db);
1751 ctdb_db->persistent = persistent;
1753 DLIST_ADD(ctdb->db_list, ctdb_db);
1755 /* add well known functions */
1756 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1757 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1758 ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1765 setup a call for a database
1767 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1769 struct ctdb_registered_call *call;
1774 struct ctdb_control_set_call c;
1777 /* this is no longer valid with the separate daemon architecture */
1778 c.db_id = ctdb_db->db_id;
1782 data.dptr = (uint8_t *)&c;
1783 data.dsize = sizeof(c);
1785 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1786 data, NULL, NULL, &status, NULL, NULL);
1787 if (ret != 0 || status != 0) {
1788 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1793 /* also register locally */
1794 call = talloc(ctdb_db, struct ctdb_registered_call);
1798 DLIST_ADD(ctdb_db->calls, call);
1803 struct traverse_state {
1806 ctdb_traverse_func fn;
1811 called on each key during a ctdb_traverse
1813 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1815 struct traverse_state *state = (struct traverse_state *)p;
1816 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1819 if (data.dsize < sizeof(uint32_t) ||
1820 d->length != data.dsize) {
1821 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1826 key.dsize = d->keylen;
1827 key.dptr = &d->data[0];
1828 data.dsize = d->datalen;
1829 data.dptr = &d->data[d->keylen];
1831 if (key.dsize == 0 && data.dsize == 0) {
1832 /* end of traverse */
1837 if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1838 /* empty records are deleted records in ctdb */
1842 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1851 start a cluster wide traverse, calling the supplied fn on each record
1852 return the number of records traversed, or -1 on error
1854 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1857 struct ctdb_traverse_start t;
1860 uint64_t srvid = (getpid() | 0xFLL<<60);
1861 struct traverse_state state;
1865 state.private_data = private_data;
1868 ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1870 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1874 t.db_id = ctdb_db->db_id;
1878 data.dptr = (uint8_t *)&t;
1879 data.dsize = sizeof(t);
1881 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1882 data, NULL, NULL, &status, NULL, NULL);
1883 if (ret != 0 || status != 0) {
1884 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1885 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1889 while (!state.done) {
1890 event_loop_once(ctdb_db->ctdb->ev);
1893 ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1895 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1902 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1904 called on each key during a catdb
1906 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1909 FILE *f = (FILE *)p;
1910 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1912 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1913 for (i=0;i<key.dsize;i++) {
1914 if (ISASCII(key.dptr[i])) {
1915 fprintf(f, "%c", key.dptr[i]);
1917 fprintf(f, "\\%02X", key.dptr[i]);
1922 fprintf(f, "dmaster: %u\n", h->dmaster);
1923 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1925 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1926 for (i=sizeof(*h);i<data.dsize;i++) {
1927 if (ISASCII(data.dptr[i])) {
1928 fprintf(f, "%c", data.dptr[i]);
1930 fprintf(f, "\\%02X", data.dptr[i]);
1941 convenience function to list all keys to stdout
1943 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1945 return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1949 get the pid of a ctdb daemon
1951 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1956 ret = ctdb_control(ctdb, destnode, 0,
1957 CTDB_CONTROL_GET_PID, 0, tdb_null,
1958 NULL, NULL, &res, &timeout, NULL);
1960 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1971 async freeze send control
1973 struct ctdb_client_control_state *
1974 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1976 return ctdb_control_send(ctdb, destnode, priority,
1977 CTDB_CONTROL_FREEZE, 0, tdb_null,
1978 mem_ctx, &timeout, NULL);
1982 async freeze recv control
1984 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1989 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1990 if ( (ret != 0) || (res != 0) ){
1991 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1999 freeze databases of a certain priority
2001 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2003 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2004 struct ctdb_client_control_state *state;
2007 state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2008 ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2009 talloc_free(tmp_ctx);
2014 /* Freeze all databases */
2015 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2019 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2020 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2028 thaw databases of a certain priority
2030 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2035 ret = ctdb_control(ctdb, destnode, priority,
2036 CTDB_CONTROL_THAW, 0, tdb_null,
2037 NULL, NULL, &res, &timeout, NULL);
2038 if (ret != 0 || res != 0) {
2039 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2046 /* thaw all databases */
2047 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2049 return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2053 get pnn of a node, or -1
2055 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2060 ret = ctdb_control(ctdb, destnode, 0,
2061 CTDB_CONTROL_GET_PNN, 0, tdb_null,
2062 NULL, NULL, &res, &timeout, NULL);
2064 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2072 get the monitoring mode of a remote node
2074 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2079 ret = ctdb_control(ctdb, destnode, 0,
2080 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
2081 NULL, NULL, &res, &timeout, NULL);
2083 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2094 set the monitoring mode of a remote node to active
2096 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2101 ret = ctdb_control(ctdb, destnode, 0,
2102 CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null,
2103 NULL, NULL,NULL, &timeout, NULL);
2105 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2115 set the monitoring mode of a remote node to disable
2117 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2122 ret = ctdb_control(ctdb, destnode, 0,
2123 CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null,
2124 NULL, NULL, NULL, &timeout, NULL);
2126 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2138 sent to a node to make it take over an ip address
2140 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2141 uint32_t destnode, struct ctdb_public_ip *ip)
2144 struct ctdb_public_ipv4 ipv4;
2148 if (ip->addr.sa.sa_family == AF_INET) {
2150 ipv4.sin = ip->addr.ip;
2152 data.dsize = sizeof(ipv4);
2153 data.dptr = (uint8_t *)&ipv4;
2155 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2156 NULL, &res, &timeout, NULL);
2158 data.dsize = sizeof(*ip);
2159 data.dptr = (uint8_t *)ip;
2161 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2162 NULL, &res, &timeout, NULL);
2165 if (ret != 0 || res != 0) {
2166 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2175 sent to a node to make it release an ip address
2177 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2178 uint32_t destnode, struct ctdb_public_ip *ip)
2181 struct ctdb_public_ipv4 ipv4;
2185 if (ip->addr.sa.sa_family == AF_INET) {
2187 ipv4.sin = ip->addr.ip;
2189 data.dsize = sizeof(ipv4);
2190 data.dptr = (uint8_t *)&ipv4;
2192 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2193 NULL, &res, &timeout, NULL);
2195 data.dsize = sizeof(*ip);
2196 data.dptr = (uint8_t *)ip;
2198 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2199 NULL, &res, &timeout, NULL);
2202 if (ret != 0 || res != 0) {
2203 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2214 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2215 struct timeval timeout,
2217 const char *name, uint32_t *value)
2219 struct ctdb_control_get_tunable *t;
2220 TDB_DATA data, outdata;
2224 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2225 data.dptr = talloc_size(ctdb, data.dsize);
2226 CTDB_NO_MEMORY(ctdb, data.dptr);
2228 t = (struct ctdb_control_get_tunable *)data.dptr;
2229 t->length = strlen(name)+1;
2230 memcpy(t->name, name, t->length);
2232 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2233 &outdata, &res, &timeout, NULL);
2234 talloc_free(data.dptr);
2235 if (ret != 0 || res != 0) {
2236 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2240 if (outdata.dsize != sizeof(uint32_t)) {
2241 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2242 talloc_free(outdata.dptr);
2246 *value = *(uint32_t *)outdata.dptr;
2247 talloc_free(outdata.dptr);
2255 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2256 struct timeval timeout,
2258 const char *name, uint32_t value)
2260 struct ctdb_control_set_tunable *t;
2265 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2266 data.dptr = talloc_size(ctdb, data.dsize);
2267 CTDB_NO_MEMORY(ctdb, data.dptr);
2269 t = (struct ctdb_control_set_tunable *)data.dptr;
2270 t->length = strlen(name)+1;
2271 memcpy(t->name, name, t->length);
2274 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2275 NULL, &res, &timeout, NULL);
2276 talloc_free(data.dptr);
2277 if (ret != 0 || res != 0) {
2278 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2288 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2289 struct timeval timeout,
2291 TALLOC_CTX *mem_ctx,
2292 const char ***list, uint32_t *count)
2297 struct ctdb_control_list_tunable *t;
2300 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2301 mem_ctx, &outdata, &res, &timeout, NULL);
2302 if (ret != 0 || res != 0) {
2303 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2307 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2308 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2309 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2310 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2311 talloc_free(outdata.dptr);
2315 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2316 CTDB_NO_MEMORY(ctdb, p);
2318 talloc_free(outdata.dptr);
2323 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2324 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2325 CTDB_NO_MEMORY(ctdb, *list);
2326 (*list)[*count] = talloc_strdup(*list, s);
2327 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2337 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2338 struct timeval timeout, uint32_t destnode,
2339 TALLOC_CTX *mem_ctx,
2341 struct ctdb_all_public_ips **ips)
2347 ret = ctdb_control(ctdb, destnode, 0,
2348 CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2349 mem_ctx, &outdata, &res, &timeout, NULL);
2350 if (ret == 0 && res == -1) {
2351 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2352 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2354 if (ret != 0 || res != 0) {
2355 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2359 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2360 talloc_free(outdata.dptr);
2365 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2366 struct timeval timeout, uint32_t destnode,
2367 TALLOC_CTX *mem_ctx,
2368 struct ctdb_all_public_ips **ips)
2370 return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2375 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb,
2376 struct timeval timeout, uint32_t destnode,
2377 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2382 struct ctdb_all_public_ipsv4 *ipsv4;
2384 ret = ctdb_control(ctdb, destnode, 0,
2385 CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null,
2386 mem_ctx, &outdata, &res, &timeout, NULL);
2387 if (ret != 0 || res != 0) {
2388 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2392 ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2393 len = offsetof(struct ctdb_all_public_ips, ips) +
2394 ipsv4->num*sizeof(struct ctdb_public_ip);
2395 *ips = talloc_zero_size(mem_ctx, len);
2396 CTDB_NO_MEMORY(ctdb, *ips);
2397 (*ips)->num = ipsv4->num;
2398 for (i=0; i<ipsv4->num; i++) {
2399 (*ips)->ips[i].pnn = ipsv4->ips[i].pnn;
2400 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2403 talloc_free(outdata.dptr);
2408 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2409 struct timeval timeout, uint32_t destnode,
2410 TALLOC_CTX *mem_ctx,
2411 const ctdb_sock_addr *addr,
2412 struct ctdb_control_public_ip_info **_info)
2418 struct ctdb_control_public_ip_info *info;
2422 indata.dptr = discard_const_p(uint8_t, addr);
2423 indata.dsize = sizeof(*addr);
2425 ret = ctdb_control(ctdb, destnode, 0,
2426 CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2427 mem_ctx, &outdata, &res, &timeout, NULL);
2428 if (ret != 0 || res != 0) {
2429 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2430 "failed ret:%d res:%d\n",
2435 len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2436 if (len > outdata.dsize) {
2437 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2438 "returned invalid data with size %u > %u\n",
2439 (unsigned int)outdata.dsize,
2440 (unsigned int)len));
2441 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2445 info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2446 len += info->num*sizeof(struct ctdb_control_iface_info);
2448 if (len > outdata.dsize) {
2449 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2450 "returned invalid data with size %u > %u\n",
2451 (unsigned int)outdata.dsize,
2452 (unsigned int)len));
2453 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2457 /* make sure we null terminate the returned strings */
2458 for (i=0; i < info->num; i++) {
2459 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2462 *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2465 talloc_free(outdata.dptr);
2466 if (*_info == NULL) {
2467 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2468 "talloc_memdup size %u failed\n",
2469 (unsigned int)outdata.dsize));
2476 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2477 struct timeval timeout, uint32_t destnode,
2478 TALLOC_CTX *mem_ctx,
2479 struct ctdb_control_get_ifaces **_ifaces)
2484 struct ctdb_control_get_ifaces *ifaces;
2488 ret = ctdb_control(ctdb, destnode, 0,
2489 CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2490 mem_ctx, &outdata, &res, &timeout, NULL);
2491 if (ret != 0 || res != 0) {
2492 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2493 "failed ret:%d res:%d\n",
2498 len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2499 if (len > outdata.dsize) {
2500 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2501 "returned invalid data with size %u > %u\n",
2502 (unsigned int)outdata.dsize,
2503 (unsigned int)len));
2504 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2508 ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2509 len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2511 if (len > outdata.dsize) {
2512 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2513 "returned invalid data with size %u > %u\n",
2514 (unsigned int)outdata.dsize,
2515 (unsigned int)len));
2516 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2520 /* make sure we null terminate the returned strings */
2521 for (i=0; i < ifaces->num; i++) {
2522 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2525 *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2528 talloc_free(outdata.dptr);
2529 if (*_ifaces == NULL) {
2530 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2531 "talloc_memdup size %u failed\n",
2532 (unsigned int)outdata.dsize));
2539 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2540 struct timeval timeout, uint32_t destnode,
2541 TALLOC_CTX *mem_ctx,
2542 const struct ctdb_control_iface_info *info)
2548 indata.dptr = discard_const_p(uint8_t, info);
2549 indata.dsize = sizeof(*info);
2551 ret = ctdb_control(ctdb, destnode, 0,
2552 CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2553 mem_ctx, NULL, &res, &timeout, NULL);
2554 if (ret != 0 || res != 0) {
2555 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2556 "failed ret:%d res:%d\n",
2565 set/clear the permanent disabled bit on a remote node
2567 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2568 uint32_t set, uint32_t clear)
2572 struct ctdb_node_map *nodemap=NULL;
2573 struct ctdb_node_flag_change c;
2574 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2579 /* find the recovery master */
2580 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2582 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2583 talloc_free(tmp_ctx);
2588 /* read the node flags from the recmaster */
2589 ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2591 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2592 talloc_free(tmp_ctx);
2595 if (destnode >= nodemap->num) {
2596 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2597 talloc_free(tmp_ctx);
2602 c.old_flags = nodemap->nodes[destnode].flags;
2603 c.new_flags = c.old_flags;
2605 c.new_flags &= ~clear;
2607 data.dsize = sizeof(c);
2608 data.dptr = (unsigned char *)&c;
2610 /* send the flags update to all connected nodes */
2611 nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2613 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2615 timeout, false, data,
2618 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2620 talloc_free(tmp_ctx);
2624 talloc_free(tmp_ctx);
2632 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2633 struct timeval timeout,
2635 struct ctdb_tunable *tunables)
2641 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2642 &outdata, &res, &timeout, NULL);
2643 if (ret != 0 || res != 0) {
2644 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2648 if (outdata.dsize != sizeof(*tunables)) {
2649 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2650 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2654 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2655 talloc_free(outdata.dptr);
2660 add a public address to a node
2662 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2663 struct timeval timeout,
2665 struct ctdb_control_ip_iface *pub)
2671 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2672 data.dptr = (unsigned char *)pub;
2674 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2675 NULL, &res, &timeout, NULL);
2676 if (ret != 0 || res != 0) {
2677 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2685 delete a public address from a node
2687 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2688 struct timeval timeout,
2690 struct ctdb_control_ip_iface *pub)
2696 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2697 data.dptr = (unsigned char *)pub;
2699 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2700 NULL, &res, &timeout, NULL);
2701 if (ret != 0 || res != 0) {
2702 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2710 kill a tcp connection
2712 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb,
2713 struct timeval timeout,
2715 struct ctdb_control_killtcp *killtcp)
2721 data.dsize = sizeof(struct ctdb_control_killtcp);
2722 data.dptr = (unsigned char *)killtcp;
2724 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2725 NULL, &res, &timeout, NULL);
2726 if (ret != 0 || res != 0) {
2727 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2737 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2738 struct timeval timeout,
2740 ctdb_sock_addr *addr,
2746 struct ctdb_control_gratious_arp *gratious_arp;
2747 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2750 len = strlen(ifname)+1;
2751 gratious_arp = talloc_size(tmp_ctx,
2752 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2753 CTDB_NO_MEMORY(ctdb, gratious_arp);
2755 gratious_arp->addr = *addr;
2756 gratious_arp->len = len;
2757 memcpy(&gratious_arp->iface[0], ifname, len);
2760 data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2761 data.dptr = (unsigned char *)gratious_arp;
2763 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2764 NULL, &res, &timeout, NULL);
2765 if (ret != 0 || res != 0) {
2766 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2767 talloc_free(tmp_ctx);
2771 talloc_free(tmp_ctx);
2776 get a list of all tcp tickles that a node knows about for a particular vnn
2778 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2779 struct timeval timeout, uint32_t destnode,
2780 TALLOC_CTX *mem_ctx,
2781 ctdb_sock_addr *addr,
2782 struct ctdb_control_tcp_tickle_list **list)
2785 TDB_DATA data, outdata;
2788 data.dptr = (uint8_t*)addr;
2789 data.dsize = sizeof(ctdb_sock_addr);
2791 ret = ctdb_control(ctdb, destnode, 0,
2792 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
2793 mem_ctx, &outdata, &status, NULL, NULL);
2794 if (ret != 0 || status != 0) {
2795 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2799 *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2805 register a server id
2807 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
2808 struct timeval timeout,
2809 struct ctdb_server_id *id)
2815 data.dsize = sizeof(struct ctdb_server_id);
2816 data.dptr = (unsigned char *)id;
2818 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2819 CTDB_CONTROL_REGISTER_SERVER_ID,
2821 NULL, &res, &timeout, NULL);
2822 if (ret != 0 || res != 0) {
2823 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2831 unregister a server id
2833 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
2834 struct timeval timeout,
2835 struct ctdb_server_id *id)
2841 data.dsize = sizeof(struct ctdb_server_id);
2842 data.dptr = (unsigned char *)id;
2844 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2845 CTDB_CONTROL_UNREGISTER_SERVER_ID,
2847 NULL, &res, &timeout, NULL);
2848 if (ret != 0 || res != 0) {
2849 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2858 check if a server id exists
2860 if a server id does exist, return *status == 1, otherwise *status == 0
2862 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
2863 struct timeval timeout,
2865 struct ctdb_server_id *id,
2872 data.dsize = sizeof(struct ctdb_server_id);
2873 data.dptr = (unsigned char *)id;
2875 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID,
2877 NULL, &res, &timeout, NULL);
2879 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2893 get the list of server ids that are registered on a node
2895 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2896 TALLOC_CTX *mem_ctx,
2897 struct timeval timeout, uint32_t destnode,
2898 struct ctdb_server_id_list **svid_list)
2904 ret = ctdb_control(ctdb, destnode, 0,
2905 CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null,
2906 mem_ctx, &outdata, &res, &timeout, NULL);
2907 if (ret != 0 || res != 0) {
2908 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2912 *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2918 initialise the ctdb daemon for client applications
2920 NOTE: In current code the daemon does not fork. This is for testing purposes only
2921 and to simplify the code.
2923 struct ctdb_context *ctdb_init(struct event_context *ev)
2926 struct ctdb_context *ctdb;
2928 ctdb = talloc_zero(ev, struct ctdb_context);
2930 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2934 ctdb->idr = idr_init(ctdb);
2935 /* Wrap early to exercise code. */
2936 ctdb->lastid = INT_MAX-200;
2937 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2939 ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2941 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2946 ctdb->statistics.statistics_start_time = timeval_current();
2955 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2957 ctdb->flags |= flags;
2961 setup the local socket name
2963 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2965 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2966 CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2971 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2973 return ctdb->daemon.name;
2977 return the pnn of this node
2979 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2986 get the uptime of a remote node
2988 struct ctdb_client_control_state *
2989 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2991 return ctdb_control_send(ctdb, destnode, 0,
2992 CTDB_CONTROL_UPTIME, 0, tdb_null,
2993 mem_ctx, &timeout, NULL);
2996 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3002 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3003 if (ret != 0 || res != 0) {
3004 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3008 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3013 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3015 struct ctdb_client_control_state *state;
3017 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3018 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3022 send a control to execute the "recovered" event script on a node
3024 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3029 ret = ctdb_control(ctdb, destnode, 0,
3030 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
3031 NULL, NULL, &status, &timeout, NULL);
3032 if (ret != 0 || status != 0) {
3033 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3041 callback for the async helpers used when sending the same control
3042 to multiple nodes in parallell.
3044 static void async_callback(struct ctdb_client_control_state *state)
3046 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3047 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3051 uint32_t destnode = state->c->hdr.destnode;
3053 /* one more node has responded with recmode data */
3056 /* if we failed to push the db, then return an error and let
3057 the main loop try again.
3059 if (state->state != CTDB_CONTROL_DONE) {
3060 if ( !data->dont_log_errors) {
3061 DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3064 if (data->fail_callback) {
3065 data->fail_callback(ctdb, destnode, res, outdata,
3066 data->callback_data);
3071 state->async.fn = NULL;
3073 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3074 if ((ret != 0) || (res != 0)) {
3075 if ( !data->dont_log_errors) {
3076 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3079 if (data->fail_callback) {
3080 data->fail_callback(ctdb, destnode, res, outdata,
3081 data->callback_data);
3084 if ((ret == 0) && (data->callback != NULL)) {
3085 data->callback(ctdb, destnode, res, outdata,
3086 data->callback_data);
3091 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3093 /* set up the callback functions */
3094 state->async.fn = async_callback;
3095 state->async.private_data = data;
3097 /* one more control to wait for to complete */
3102 /* wait for up to the maximum number of seconds allowed
3103 or until all nodes we expect a response from has replied
3105 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3107 while (data->count > 0) {
3108 event_loop_once(ctdb->ev);
3110 if (data->fail_count != 0) {
3111 if (!data->dont_log_errors) {
3112 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
3122 perform a simple control on the listed nodes
3123 The control cannot return data
3125 int ctdb_client_async_control(struct ctdb_context *ctdb,
3126 enum ctdb_controls opcode,
3129 struct timeval timeout,
3130 bool dont_log_errors,
3132 client_async_callback client_callback,
3133 client_async_callback fail_callback,
3134 void *callback_data)
3136 struct client_async_data *async_data;
3137 struct ctdb_client_control_state *state;
3140 async_data = talloc_zero(ctdb, struct client_async_data);
3141 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3142 async_data->dont_log_errors = dont_log_errors;
3143 async_data->callback = client_callback;
3144 async_data->fail_callback = fail_callback;
3145 async_data->callback_data = callback_data;
3146 async_data->opcode = opcode;
3148 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3150 /* loop over all nodes and send an async control to each of them */
3151 for (j=0; j<num_nodes; j++) {
3152 uint32_t pnn = nodes[j];
3154 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
3155 0, data, async_data, &timeout, NULL);
3156 if (state == NULL) {
3157 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3158 talloc_free(async_data);
3162 ctdb_client_async_add(async_data, state);
3165 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3166 talloc_free(async_data);
3170 talloc_free(async_data);
3174 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3175 struct ctdb_vnn_map *vnn_map,
3176 TALLOC_CTX *mem_ctx,
3179 int i, j, num_nodes;
3182 for (i=num_nodes=0;i<vnn_map->size;i++) {
3183 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3189 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3190 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3192 for (i=j=0;i<vnn_map->size;i++) {
3193 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3196 nodes[j++] = vnn_map->map[i];
3202 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3203 struct ctdb_node_map *node_map,
3204 TALLOC_CTX *mem_ctx,
3207 int i, j, num_nodes;
3210 for (i=num_nodes=0;i<node_map->num;i++) {
3211 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3214 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3220 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3221 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3223 for (i=j=0;i<node_map->num;i++) {
3224 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3227 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3230 nodes[j++] = node_map->nodes[i].pnn;
3236 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3237 struct ctdb_node_map *node_map,
3238 TALLOC_CTX *mem_ctx,
3241 int i, j, num_nodes;
3244 for (i=num_nodes=0;i<node_map->num;i++) {
3245 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3248 if (node_map->nodes[i].pnn == pnn) {
3254 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3255 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3257 for (i=j=0;i<node_map->num;i++) {
3258 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3261 if (node_map->nodes[i].pnn == pnn) {
3264 nodes[j++] = node_map->nodes[i].pnn;
3270 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3271 struct ctdb_node_map *node_map,
3272 TALLOC_CTX *mem_ctx,
3275 int i, j, num_nodes;
3278 for (i=num_nodes=0;i<node_map->num;i++) {
3279 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3282 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3288 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3289 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3291 for (i=j=0;i<node_map->num;i++) {
3292 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3295 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3298 nodes[j++] = node_map->nodes[i].pnn;
3305 this is used to test if a pnn lock exists and if it exists will return
3306 the number of connections that pnn has reported or -1 if that recovery
3307 daemon is not running.
3310 ctdb_read_pnn_lock(int fd, int32_t pnn)
3315 lock.l_type = F_WRLCK;
3316 lock.l_whence = SEEK_SET;
3321 if (fcntl(fd, F_GETLK, &lock) != 0) {
3322 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3326 if (lock.l_type == F_UNLCK) {
3330 if (pread(fd, &c, 1, pnn) == -1) {
3331 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3339 get capabilities of a remote node
3341 struct ctdb_client_control_state *
3342 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3344 return ctdb_control_send(ctdb, destnode, 0,
3345 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
3346 mem_ctx, &timeout, NULL);
3349 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3355 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3356 if ( (ret != 0) || (res != 0) ) {
3357 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3362 *capabilities = *((uint32_t *)outdata.dptr);
3368 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3370 struct ctdb_client_control_state *state;
3371 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3374 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3375 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3376 talloc_free(tmp_ctx);
3381 * check whether a transaction is active on a given db on a given node
3383 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3391 indata.dptr = (uint8_t *)&db_id;
3392 indata.dsize = sizeof(db_id);
3394 ret = ctdb_control(ctdb, destnode, 0,
3395 CTDB_CONTROL_TRANS2_ACTIVE,
3396 0, indata, NULL, NULL, &status,
3400 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3408 struct ctdb_transaction_handle {
3409 struct ctdb_db_context *ctdb_db;
3412 * we store the reads and writes done under a transaction:
3413 * - one list stores both reads and writes (m_all),
3414 * - the other just writes (m_write)
3416 struct ctdb_marshall_buffer *m_all;
3417 struct ctdb_marshall_buffer *m_write;
3420 /* start a transaction on a database */
3421 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3423 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3427 /* start a transaction on a database */
3428 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3430 struct ctdb_record_handle *rh;
3433 struct ctdb_ltdb_header header;
3434 TALLOC_CTX *tmp_ctx;
3435 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3437 struct ctdb_db_context *ctdb_db = h->ctdb_db;
3441 key.dptr = discard_const(keyname);
3442 key.dsize = strlen(keyname);
3444 if (!ctdb_db->persistent) {
3445 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3450 tmp_ctx = talloc_new(h);
3452 rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3454 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3455 talloc_free(tmp_ctx);
3459 status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3463 unsigned long int usec = (1000 + random()) % 100000;
3464 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3465 "on db_id[0x%08x]. waiting for %lu "
3467 ctdb_db->db_id, usec));
3468 talloc_free(tmp_ctx);
3474 * store the pid in the database:
3475 * it is not enough that the node is dmaster...
3478 data.dptr = (unsigned char *)&pid;
3479 data.dsize = sizeof(pid_t);
3481 rh->header.dmaster = ctdb_db->ctdb->pnn;
3482 ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3484 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3485 "transaction record\n"));
3486 talloc_free(tmp_ctx);
3492 ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3494 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3495 talloc_free(tmp_ctx);
3499 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3501 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3502 "lock record inside transaction\n"));
3503 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3504 talloc_free(tmp_ctx);
3508 if (header.dmaster != ctdb_db->ctdb->pnn) {
3509 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3510 "transaction lock record\n"));
3511 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3512 talloc_free(tmp_ctx);
3516 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3517 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3518 "the transaction lock record\n"));
3519 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3520 talloc_free(tmp_ctx);
3524 talloc_free(tmp_ctx);
3530 /* start a transaction on a database */
3531 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3532 TALLOC_CTX *mem_ctx)
3534 struct ctdb_transaction_handle *h;
3537 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3539 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
3543 h->ctdb_db = ctdb_db;
3545 ret = ctdb_transaction_fetch_start(h);
3551 talloc_set_destructor(h, ctdb_transaction_destructor);
3559 fetch a record inside a transaction
3561 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3562 TALLOC_CTX *mem_ctx,
3563 TDB_DATA key, TDB_DATA *data)
3565 struct ctdb_ltdb_header header;
3568 ZERO_STRUCT(header);
3570 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3571 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3572 /* record doesn't exist yet */
3581 if (!h->in_replay) {
3582 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3583 if (h->m_all == NULL) {
3584 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3593 stores a record inside a transaction
3595 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3596 TDB_DATA key, TDB_DATA data)
3598 TALLOC_CTX *tmp_ctx = talloc_new(h);
3599 struct ctdb_ltdb_header header;
3603 ZERO_STRUCT(header);
3605 /* we need the header so we can update the RSN */
3606 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3607 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3608 /* the record doesn't exist - create one with us as dmaster.
3609 This is only safe because we are in a transaction and this
3610 is a persistent database */
3611 ZERO_STRUCT(header);
3612 } else if (ret != 0) {
3613 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3614 talloc_free(tmp_ctx);
3618 if (data.dsize == olddata.dsize &&
3619 memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3620 /* save writing the same data */
3621 talloc_free(tmp_ctx);
3625 header.dmaster = h->ctdb_db->ctdb->pnn;
3628 if (!h->in_replay) {
3629 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3630 if (h->m_all == NULL) {
3631 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3632 talloc_free(tmp_ctx);
3637 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3638 if (h->m_write == NULL) {
3639 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3640 talloc_free(tmp_ctx);
3644 ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3646 talloc_free(tmp_ctx);
3652 replay a transaction
3654 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3657 struct ctdb_rec_data *rec = NULL;
3659 h->in_replay = true;
3660 talloc_free(h->m_write);
3663 ret = ctdb_transaction_fetch_start(h);
3668 for (i=0;i<h->m_all->count;i++) {
3671 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3673 DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3677 if (rec->reqid == 0) {
3679 if (ctdb_transaction_store(h, key, data) != 0) {
3684 TALLOC_CTX *tmp_ctx = talloc_new(h);
3686 if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3687 talloc_free(tmp_ctx);
3690 if (data2.dsize != data.dsize ||
3691 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3692 /* the record has changed on us - we have to give up */
3693 talloc_free(tmp_ctx);
3696 talloc_free(tmp_ctx);
3703 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3709 commit a transaction
3711 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3715 struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3716 struct timeval timeout;
3717 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3719 talloc_set_destructor(h, NULL);
3721 /* our commit strategy is quite complex.
3723 - we first try to commit the changes to all other nodes
3725 - if that works, then we commit locally and we are done
3727 - if a commit on another node fails, then we need to cancel
3728 the transaction, then restart the transaction (thus
3729 opening a window of time for a pending recovery to
3730 complete), then replay the transaction, checking all the
3731 reads and writes (checking that reads give the same data,
3732 and writes succeed). Then we retry the transaction to the
3737 if (h->m_write == NULL) {
3738 /* no changes were made */
3739 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3744 /* tell ctdbd to commit to the other nodes */
3745 timeout = timeval_current_ofs(1, 0);
3746 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3747 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
3748 ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
3750 if (ret != 0 || status != 0) {
3751 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3752 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3753 ", retrying after 1 second...\n",
3754 (retries==0)?"":"retry "));
3758 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3760 /* work out what error code we will give if we
3761 have to fail the operation */
3762 switch ((enum ctdb_trans2_commit_error)status) {
3763 case CTDB_TRANS2_COMMIT_SUCCESS:
3764 case CTDB_TRANS2_COMMIT_SOMEFAIL:
3765 case CTDB_TRANS2_COMMIT_TIMEOUT:
3766 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3768 case CTDB_TRANS2_COMMIT_ALLFAIL:
3769 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3774 if (++retries == 100) {
3775 DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
3776 h->ctdb_db->db_id, retries, (unsigned)failure_control));
3777 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3778 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3779 tdb_null, NULL, NULL, NULL, NULL, NULL);
3784 if (ctdb_replay_transaction(h) != 0) {
3785 DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3786 "transaction on db 0x%08x, "
3787 "failure control =%u\n",
3789 (unsigned)failure_control));
3790 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3791 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3792 tdb_null, NULL, NULL, NULL, NULL, NULL);
3798 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3801 /* do the real commit locally */
3802 ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3804 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3805 "on db id 0x%08x locally, "
3806 "failure_control=%u\n",
3808 (unsigned)failure_control));
3809 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3810 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3811 tdb_null, NULL, NULL, NULL, NULL, NULL);
3816 /* tell ctdbd that we are finished with our local commit */
3817 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3818 CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
3819 tdb_null, NULL, NULL, NULL, NULL, NULL);
3825 recovery daemon ping to main daemon
3827 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3832 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
3833 ctdb, NULL, &res, NULL, NULL);
3834 if (ret != 0 || res != 0) {
3835 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3842 /* when forking the main daemon and the child process needs to connect back
3843 * to the daemon as a client process, this function can be used to change
3844 * the ctdb context from daemon into client mode
3846 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3851 /* Add extra information so we can identify this in the logs */
3853 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3856 /* shutdown the transport */
3857 if (ctdb->methods) {
3858 ctdb->methods->shutdown(ctdb);
3861 /* get a new event context */
3862 talloc_free(ctdb->ev);
3863 ctdb->ev = event_context_init(ctdb);
3864 tevent_loop_allow_nesting(ctdb->ev);
3866 close(ctdb->daemon.sd);
3867 ctdb->daemon.sd = -1;
3869 /* the client does not need to be realtime */
3870 if (ctdb->do_setsched) {
3871 ctdb_restore_scheduler(ctdb);
3874 /* initialise ctdb */
3875 ret = ctdb_socket_connect(ctdb);
3877 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3885 get the status of running the monitor eventscripts: NULL means never run.
3887 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
3888 struct timeval timeout, uint32_t destnode,
3889 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3890 struct ctdb_scripts_wire **script_status)
3893 TDB_DATA outdata, indata;
3895 uint32_t uinttype = type;
3897 indata.dptr = (uint8_t *)&uinttype;
3898 indata.dsize = sizeof(uinttype);
3900 ret = ctdb_control(ctdb, destnode, 0,
3901 CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3902 mem_ctx, &outdata, &res, &timeout, NULL);
3903 if (ret != 0 || res != 0) {
3904 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3908 if (outdata.dsize == 0) {
3909 *script_status = NULL;
3911 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3912 talloc_free(outdata.dptr);
3919 tell the main daemon how long it took to lock the reclock file
3921 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3927 data.dptr = (uint8_t *)&latency;
3928 data.dsize = sizeof(latency);
3930 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
3931 ctdb, NULL, &res, NULL, NULL);
3932 if (ret != 0 || res != 0) {
3933 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3941 get the name of the reclock file
3943 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3944 uint32_t destnode, TALLOC_CTX *mem_ctx,
3951 ret = ctdb_control(ctdb, destnode, 0,
3952 CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null,
3953 mem_ctx, &data, &res, &timeout, NULL);
3954 if (ret != 0 || res != 0) {
3958 if (data.dsize == 0) {
3961 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3963 talloc_free(data.dptr);
3969 set the reclock filename for a node
3971 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3977 if (reclock == NULL) {
3981 data.dsize = strlen(reclock) + 1;
3982 data.dptr = discard_const(reclock);
3985 ret = ctdb_control(ctdb, destnode, 0,
3986 CTDB_CONTROL_SET_RECLOCK_FILE, 0, data,
3987 NULL, NULL, &res, &timeout, NULL);
3988 if (ret != 0 || res != 0) {
3989 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3999 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4004 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null,
4005 ctdb, NULL, &res, &timeout, NULL);
4006 if (ret != 0 || res != 0) {
4007 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4017 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4021 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null,
4022 ctdb, NULL, NULL, &timeout, NULL);
4024 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4032 set the natgw state for a node
4034 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4040 data.dsize = sizeof(natgwstate);
4041 data.dptr = (uint8_t *)&natgwstate;
4043 ret = ctdb_control(ctdb, destnode, 0,
4044 CTDB_CONTROL_SET_NATGWSTATE, 0, data,
4045 NULL, NULL, &res, &timeout, NULL);
4046 if (ret != 0 || res != 0) {
4047 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4055 set the lmaster role for a node
4057 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4063 data.dsize = sizeof(lmasterrole);
4064 data.dptr = (uint8_t *)&lmasterrole;
4066 ret = ctdb_control(ctdb, destnode, 0,
4067 CTDB_CONTROL_SET_LMASTERROLE, 0, data,
4068 NULL, NULL, &res, &timeout, NULL);
4069 if (ret != 0 || res != 0) {
4070 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4078 set the recmaster role for a node
4080 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4086 data.dsize = sizeof(recmasterrole);
4087 data.dptr = (uint8_t *)&recmasterrole;
4089 ret = ctdb_control(ctdb, destnode, 0,
4090 CTDB_CONTROL_SET_RECMASTERROLE, 0, data,
4091 NULL, NULL, &res, &timeout, NULL);
4092 if (ret != 0 || res != 0) {
4093 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4100 /* enable an eventscript
4102 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4108 data.dsize = strlen(script) + 1;
4109 data.dptr = discard_const(script);
4111 ret = ctdb_control(ctdb, destnode, 0,
4112 CTDB_CONTROL_ENABLE_SCRIPT, 0, data,
4113 NULL, NULL, &res, &timeout, NULL);
4114 if (ret != 0 || res != 0) {
4115 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4122 /* disable an eventscript
4124 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4130 data.dsize = strlen(script) + 1;
4131 data.dptr = discard_const(script);
4133 ret = ctdb_control(ctdb, destnode, 0,
4134 CTDB_CONTROL_DISABLE_SCRIPT, 0, data,
4135 NULL, NULL, &res, &timeout, NULL);
4136 if (ret != 0 || res != 0) {
4137 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4145 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4151 data.dsize = sizeof(*bantime);
4152 data.dptr = (uint8_t *)bantime;
4154 ret = ctdb_control(ctdb, destnode, 0,
4155 CTDB_CONTROL_SET_BAN_STATE, 0, data,
4156 NULL, NULL, &res, &timeout, NULL);
4157 if (ret != 0 || res != 0) {
4158 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4166 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4171 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4173 ret = ctdb_control(ctdb, destnode, 0,
4174 CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4175 tmp_ctx, &outdata, &res, &timeout, NULL);
4176 if (ret != 0 || res != 0) {
4177 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4178 talloc_free(tmp_ctx);
4182 *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4183 talloc_free(tmp_ctx);
4189 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4194 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4196 data.dptr = (uint8_t*)db_prio;
4197 data.dsize = sizeof(*db_prio);
4199 ret = ctdb_control(ctdb, destnode, 0,
4200 CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4201 tmp_ctx, NULL, &res, &timeout, NULL);
4202 if (ret != 0 || res != 0) {
4203 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4204 talloc_free(tmp_ctx);
4208 talloc_free(tmp_ctx);
4213 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4218 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4220 data.dptr = (uint8_t*)&db_id;
4221 data.dsize = sizeof(db_id);
4223 ret = ctdb_control(ctdb, destnode, 0,
4224 CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4225 tmp_ctx, NULL, &res, &timeout, NULL);
4226 if (ret != 0 || res < 0) {
4227 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4228 talloc_free(tmp_ctx);
4236 talloc_free(tmp_ctx);
4241 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4247 ret = ctdb_control(ctdb, destnode, 0,
4248 CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null,
4249 mem_ctx, &outdata, &res, &timeout, NULL);
4250 if (ret != 0 || res != 0 || outdata.dsize == 0) {
4251 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4255 *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4256 talloc_free(outdata.dptr);
4261 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)