4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
36 allocate a packet for use in client<->daemon communication
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
40 enum ctdb_operation operation,
41 size_t length, size_t slength,
45 struct ctdb_req_header *hdr;
47 length = MAX(length, slength);
48 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
50 hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
52 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53 operation, (unsigned)length));
56 talloc_set_name_const(hdr, type);
57 memset(hdr, 0, slength);
59 hdr->operation = operation;
60 hdr->ctdb_magic = CTDB_MAGIC;
61 hdr->ctdb_version = CTDB_VERSION;
62 hdr->srcnode = ctdb->pnn;
64 hdr->generation = ctdb->vnn_map->generation;
71 local version of ctdb_call
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
77 struct ctdb_call_info *c;
78 struct ctdb_registered_call *fn;
79 struct ctdb_context *ctdb = ctdb_db->ctdb;
81 c = talloc(ctdb, struct ctdb_call_info);
82 CTDB_NO_MEMORY(ctdb, c);
85 c->call_data = &call->call_data;
86 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87 c->record_data.dsize = data->dsize;
88 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
93 for (fn=ctdb_db->calls;fn;fn=fn->next) {
94 if (fn->id == call->call_id) break;
97 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
102 if (fn->fn(c) != 0) {
103 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
108 /* we need to force the record to be written out if this was a remote access */
109 if (c->new_data == NULL) {
110 c->new_data = &c->record_data;
114 /* XXX check that we always have the lock here? */
115 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
116 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
123 call->reply_data = *c->reply_data;
125 talloc_steal(call, call->reply_data.dptr);
126 talloc_set_name_const(call->reply_data.dptr, __location__);
128 call->reply_data.dptr = NULL;
129 call->reply_data.dsize = 0;
131 call->status = c->status;
140 queue a packet for sending from client to daemon
142 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
149 called when a CTDB_REPLY_CALL packet comes in in the client
151 This packet comes in response to a CTDB_REQ_CALL request packet. It
152 contains any reply data from the call
154 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
157 struct ctdb_client_call_state *state;
159 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
165 if (hdr->reqid != state->reqid) {
166 /* we found a record but it was the wrong one */
167 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
171 state->call->reply_data.dptr = c->data;
172 state->call->reply_data.dsize = c->datalen;
173 state->call->status = c->status;
175 talloc_steal(state, c);
177 state->state = CTDB_CALL_DONE;
179 if (state->async.fn) {
180 state->async.fn(state);
184 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
187 this is called in the client, when data comes in from the daemon
189 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
192 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
195 /* place the packet as a child of a tmp_ctx. We then use
196 talloc_free() below to free it. If any of the calls want
197 to keep it, then they will steal it somewhere else, and the
198 talloc_free() will be a no-op */
199 tmp_ctx = talloc_new(ctdb);
200 talloc_steal(tmp_ctx, hdr);
203 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
207 if (cnt < sizeof(*hdr)) {
208 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
211 if (cnt != hdr->length) {
212 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
213 (unsigned)hdr->length, (unsigned)cnt);
217 if (hdr->ctdb_magic != CTDB_MAGIC) {
218 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
222 if (hdr->ctdb_version != CTDB_VERSION) {
223 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
227 switch (hdr->operation) {
228 case CTDB_REPLY_CALL:
229 ctdb_client_reply_call(ctdb, hdr);
232 case CTDB_REQ_MESSAGE:
233 ctdb_request_message(ctdb, hdr);
236 case CTDB_REPLY_CONTROL:
237 ctdb_client_reply_control(ctdb, hdr);
241 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
245 talloc_free(tmp_ctx);
249 connect with exponential backoff, thanks Stevens
251 #define CONNECT_MAXSLEEP 64
252 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 struct sockaddr_un addr;
258 memset(&addr, 0, sizeof(addr));
259 addr.sun_family = AF_UNIX;
260 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262 for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
263 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265 if ((ret == 0) || (errno != EAGAIN)) {
269 if (secs <= (CONNECT_MAXSLEEP / 2)) {
270 DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
271 strerror(errno), secs));
280 connect to a unix domain socket
282 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
285 if (ctdb->daemon.sd == -1) {
286 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
290 set_nonblocking(ctdb->daemon.sd);
291 set_close_on_exec(ctdb->daemon.sd);
293 if (ctdb_connect_retry(ctdb) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
295 close(ctdb->daemon.sd);
296 ctdb->daemon.sd = -1;
300 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
302 ctdb_client_read_cb, ctdb, "to-ctdbd");
307 struct ctdb_record_handle {
308 struct ctdb_db_context *ctdb_db;
311 struct ctdb_ltdb_header header;
316 make a recv call to the local ctdb daemon - called from client context
318 This is called when the program wants to wait for a ctdb_call to complete and get the
319 results. This call will block unless the call has already completed.
321 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
327 while (state->state < CTDB_CALL_DONE) {
328 event_loop_once(state->ctdb_db->ctdb->ev);
330 if (state->state != CTDB_CALL_DONE) {
331 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
336 if (state->call->reply_data.dsize) {
337 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
338 state->call->reply_data.dptr,
339 state->call->reply_data.dsize);
340 call->reply_data.dsize = state->call->reply_data.dsize;
342 call->reply_data.dptr = NULL;
343 call->reply_data.dsize = 0;
345 call->status = state->call->status;
355 destroy a ctdb_call in client
357 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
359 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
364 construct an event driven local ctdb_call
366 this is used so that locally processed ctdb_call requests are processed
367 in an event driven manner
369 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
370 struct ctdb_call *call,
371 struct ctdb_ltdb_header *header,
374 struct ctdb_client_call_state *state;
375 struct ctdb_context *ctdb = ctdb_db->ctdb;
378 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
379 CTDB_NO_MEMORY_NULL(ctdb, state);
380 state->call = talloc_zero(state, struct ctdb_call);
381 CTDB_NO_MEMORY_NULL(ctdb, state->call);
383 talloc_steal(state, data->dptr);
385 state->state = CTDB_CALL_DONE;
386 *(state->call) = *call;
387 state->ctdb_db = ctdb_db;
389 ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
395 make a ctdb call to the local daemon - async send. Called from client context.
397 This constructs a ctdb_call request and queues it for processing.
398 This call never blocks.
400 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
401 struct ctdb_call *call)
403 struct ctdb_client_call_state *state;
404 struct ctdb_context *ctdb = ctdb_db->ctdb;
405 struct ctdb_ltdb_header header;
409 struct ctdb_req_call *c;
411 /* if the domain socket is not yet open, open it */
412 if (ctdb->daemon.sd==-1) {
413 ctdb_socket_connect(ctdb);
416 ret = ctdb_ltdb_lock(ctdb_db, call->key);
418 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
422 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
424 if (ret == 0 && header.dmaster == ctdb->pnn) {
425 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
426 talloc_free(data.dptr);
427 ctdb_ltdb_unlock(ctdb_db, call->key);
431 ctdb_ltdb_unlock(ctdb_db, call->key);
432 talloc_free(data.dptr);
434 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
436 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
439 state->call = talloc_zero(state, struct ctdb_call);
440 if (state->call == NULL) {
441 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
445 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
446 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
448 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
452 state->reqid = ctdb_reqid_new(ctdb, state);
453 state->ctdb_db = ctdb_db;
454 talloc_set_destructor(state, ctdb_client_call_destructor);
456 c->hdr.reqid = state->reqid;
457 c->flags = call->flags;
458 c->db_id = ctdb_db->db_id;
459 c->callid = call->call_id;
461 c->keylen = call->key.dsize;
462 c->calldatalen = call->call_data.dsize;
463 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
464 memcpy(&c->data[call->key.dsize],
465 call->call_data.dptr, call->call_data.dsize);
466 *(state->call) = *call;
467 state->call->call_data.dptr = &c->data[call->key.dsize];
468 state->call->key.dptr = &c->data[0];
470 state->state = CTDB_CALL_WAIT;
473 ctdb_client_queue_pkt(ctdb, &c->hdr);
480 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
482 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
484 struct ctdb_client_call_state *state;
486 state = ctdb_call_send(ctdb_db, call);
487 return ctdb_call_recv(state, call);
492 tell the daemon what messaging srvid we will use, and register the message
493 handler function in the client
495 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
496 ctdb_msg_fn_t handler,
503 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
504 tdb_null, NULL, NULL, &status, NULL, NULL);
505 if (res != 0 || status != 0) {
506 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
510 /* also need to register the handler with our own ctdb structure */
511 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
515 tell the daemon we no longer want a srvid
517 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
522 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
523 tdb_null, NULL, NULL, &status, NULL, NULL);
524 if (res != 0 || status != 0) {
525 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
529 /* also need to register the handler with our own ctdb structure */
530 ctdb_deregister_message_handler(ctdb, srvid, private_data);
536 send a message - from client context
538 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
539 uint64_t srvid, TDB_DATA data)
541 struct ctdb_req_message *r;
544 len = offsetof(struct ctdb_req_message, data) + data.dsize;
545 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
546 len, struct ctdb_req_message);
547 CTDB_NO_MEMORY(ctdb, r);
549 r->hdr.destnode = pnn;
551 r->datalen = data.dsize;
552 memcpy(&r->data[0], data.dptr, data.dsize);
554 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
565 cancel a ctdb_fetch_lock operation, releasing the lock
567 static int fetch_lock_destructor(struct ctdb_record_handle *h)
569 ctdb_ltdb_unlock(h->ctdb_db, h->key);
574 force the migration of a record to this node
576 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
578 struct ctdb_call call;
580 call.call_id = CTDB_NULL_FUNC;
582 call.flags = CTDB_IMMEDIATE_MIGRATION;
583 return ctdb_call(ctdb_db, &call);
587 get a lock on a record, and return the records data. Blocks until it gets the lock
589 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
590 TDB_DATA key, TDB_DATA *data)
593 struct ctdb_record_handle *h;
596 procedure is as follows:
598 1) get the chain lock.
599 2) check if we are dmaster
600 3) if we are the dmaster then return handle
601 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
603 5) when we get the reply, goto (1)
606 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
611 h->ctdb_db = ctdb_db;
613 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
614 if (h->key.dptr == NULL) {
620 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
621 (const char *)key.dptr));
624 /* step 1 - get the chain lock */
625 ret = ctdb_ltdb_lock(ctdb_db, key);
627 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
632 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
634 talloc_set_destructor(h, fetch_lock_destructor);
636 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
638 /* when torturing, ensure we test the remote path */
639 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
641 h->header.dmaster = (uint32_t)-1;
645 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
647 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
648 ctdb_ltdb_unlock(ctdb_db, key);
649 ret = ctdb_client_force_migration(ctdb_db, key);
651 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
658 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
663 store some data to the record that was locked with ctdb_fetch_lock()
665 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
667 if (h->ctdb_db->persistent) {
668 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
672 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
676 non-locking fetch of a record
678 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
679 TDB_DATA key, TDB_DATA *data)
681 struct ctdb_call call;
684 call.call_id = CTDB_FETCH_FUNC;
685 call.call_data.dptr = NULL;
686 call.call_data.dsize = 0;
688 ret = ctdb_call(ctdb_db, &call);
691 *data = call.reply_data;
692 talloc_steal(mem_ctx, data->dptr);
701 called when a control completes or timesout to invoke the callback
702 function the user provided
704 static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
705 struct timeval t, void *private_data)
707 struct ctdb_client_control_state *state;
708 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
711 state = talloc_get_type(private_data, struct ctdb_client_control_state);
712 talloc_steal(tmp_ctx, state);
714 ret = ctdb_control_recv(state->ctdb, state, state,
719 talloc_free(tmp_ctx);
723 called when a CTDB_REPLY_CONTROL packet comes in in the client
725 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
726 contains any reply data from the control
728 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
729 struct ctdb_req_header *hdr)
731 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
732 struct ctdb_client_control_state *state;
734 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
736 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
740 if (hdr->reqid != state->reqid) {
741 /* we found a record but it was the wrong one */
742 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
746 state->outdata.dptr = c->data;
747 state->outdata.dsize = c->datalen;
748 state->status = c->status;
750 state->errormsg = talloc_strndup(state,
751 (char *)&c->data[c->datalen],
755 /* state->outdata now uses resources from c so we dont want c
756 to just dissappear from under us while state is still alive
758 talloc_steal(state, c);
760 state->state = CTDB_CONTROL_DONE;
762 /* if we had a callback registered for this control, pull the response
763 and call the callback.
765 if (state->async.fn) {
766 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
772 destroy a ctdb_control in client
774 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
776 ctdb_reqid_remove(state->ctdb, state->reqid);
781 /* time out handler for ctdb_control */
782 static void control_timeout_func(struct event_context *ev, struct timed_event *te,
783 struct timeval t, void *private_data)
785 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
787 DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
788 "dstnode:%u\n", state->reqid, state->c->opcode,
789 state->c->hdr.destnode));
791 state->state = CTDB_CONTROL_TIMEOUT;
793 /* if we had a callback registered for this control, pull the response
794 and call the callback.
796 if (state->async.fn) {
797 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
801 /* async version of send control request */
802 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
803 uint32_t destnode, uint64_t srvid,
804 uint32_t opcode, uint32_t flags, TDB_DATA data,
806 struct timeval *timeout,
809 struct ctdb_client_control_state *state;
811 struct ctdb_req_control *c;
818 /* if the domain socket is not yet open, open it */
819 if (ctdb->daemon.sd==-1) {
820 ctdb_socket_connect(ctdb);
823 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
824 CTDB_NO_MEMORY_NULL(ctdb, state);
827 state->reqid = ctdb_reqid_new(ctdb, state);
828 state->state = CTDB_CONTROL_WAIT;
829 state->errormsg = NULL;
831 talloc_set_destructor(state, ctdb_control_destructor);
833 len = offsetof(struct ctdb_req_control, data) + data.dsize;
834 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
835 len, struct ctdb_req_control);
837 CTDB_NO_MEMORY_NULL(ctdb, c);
838 c->hdr.reqid = state->reqid;
839 c->hdr.destnode = destnode;
844 c->datalen = data.dsize;
846 memcpy(&c->data[0], data.dptr, data.dsize);
850 if (timeout && !timeval_is_zero(timeout)) {
851 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
854 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
860 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
869 /* async version of receive control reply */
870 int ctdb_control_recv(struct ctdb_context *ctdb,
871 struct ctdb_client_control_state *state,
873 TDB_DATA *outdata, int32_t *status, char **errormsg)
877 if (status != NULL) {
880 if (errormsg != NULL) {
888 /* prevent double free of state */
889 tmp_ctx = talloc_new(ctdb);
890 talloc_steal(tmp_ctx, state);
892 /* loop one event at a time until we either timeout or the control
895 while (state->state == CTDB_CONTROL_WAIT) {
896 event_loop_once(ctdb->ev);
899 if (state->state != CTDB_CONTROL_DONE) {
900 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
901 if (state->async.fn) {
902 state->async.fn(state);
904 talloc_free(tmp_ctx);
908 if (state->errormsg) {
909 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
911 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
913 if (state->async.fn) {
914 state->async.fn(state);
916 talloc_free(tmp_ctx);
921 *outdata = state->outdata;
922 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
926 *status = state->status;
929 if (state->async.fn) {
930 state->async.fn(state);
933 talloc_free(tmp_ctx);
940 send a ctdb control message
941 timeout specifies how long we should wait for a reply.
942 if timeout is NULL we wait indefinitely
944 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
945 uint32_t opcode, uint32_t flags, TDB_DATA data,
946 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
947 struct timeval *timeout,
950 struct ctdb_client_control_state *state;
952 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
953 flags, data, mem_ctx,
955 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
963 a process exists call. Returns 0 if process exists, -1 otherwise
965 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
971 data.dptr = (uint8_t*)&pid;
972 data.dsize = sizeof(pid);
974 ret = ctdb_control(ctdb, destnode, 0,
975 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
976 NULL, NULL, &status, NULL, NULL);
978 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
986 get remote statistics
988 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
994 ret = ctdb_control(ctdb, destnode, 0,
995 CTDB_CONTROL_STATISTICS, 0, tdb_null,
996 ctdb, &data, &res, NULL, NULL);
997 if (ret != 0 || res != 0) {
998 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1002 if (data.dsize != sizeof(struct ctdb_statistics)) {
1003 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1004 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1008 *status = *(struct ctdb_statistics *)data.dptr;
1009 talloc_free(data.dptr);
1015 shutdown a remote ctdb node
1017 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1019 struct ctdb_client_control_state *state;
1021 state = ctdb_control_send(ctdb, destnode, 0,
1022 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
1023 NULL, &timeout, NULL);
1024 if (state == NULL) {
1025 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1033 get vnn map from a remote node
1035 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1040 struct ctdb_vnn_map_wire *map;
1042 ret = ctdb_control(ctdb, destnode, 0,
1043 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1044 mem_ctx, &outdata, &res, &timeout, NULL);
1045 if (ret != 0 || res != 0) {
1046 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1050 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1051 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1052 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1053 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1057 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1058 CTDB_NO_MEMORY(ctdb, *vnnmap);
1059 (*vnnmap)->generation = map->generation;
1060 (*vnnmap)->size = map->size;
1061 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1063 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1064 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1065 talloc_free(outdata.dptr);
1072 get the recovery mode of a remote node
1074 struct ctdb_client_control_state *
1075 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1077 return ctdb_control_send(ctdb, destnode, 0,
1078 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1079 mem_ctx, &timeout, NULL);
1082 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1087 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1089 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1094 *recmode = (uint32_t)res;
1100 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1102 struct ctdb_client_control_state *state;
1104 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1105 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1112 set the recovery mode of a remote node
1114 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1120 data.dsize = sizeof(uint32_t);
1121 data.dptr = (unsigned char *)&recmode;
1123 ret = ctdb_control(ctdb, destnode, 0,
1124 CTDB_CONTROL_SET_RECMODE, 0, data,
1125 NULL, NULL, &res, &timeout, NULL);
1126 if (ret != 0 || res != 0) {
1127 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1137 get the recovery master of a remote node
1139 struct ctdb_client_control_state *
1140 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1141 struct timeval timeout, uint32_t destnode)
1143 return ctdb_control_send(ctdb, destnode, 0,
1144 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1145 mem_ctx, &timeout, NULL);
1148 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1153 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1155 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1160 *recmaster = (uint32_t)res;
1166 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1168 struct ctdb_client_control_state *state;
1170 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1171 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1176 set the recovery master of a remote node
1178 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1185 data.dsize = sizeof(uint32_t);
1186 data.dptr = (unsigned char *)&recmaster;
1188 ret = ctdb_control(ctdb, destnode, 0,
1189 CTDB_CONTROL_SET_RECMASTER, 0, data,
1190 NULL, NULL, &res, &timeout, NULL);
1191 if (ret != 0 || res != 0) {
1192 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1201 get a list of databases off a remote node
1203 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1204 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1210 ret = ctdb_control(ctdb, destnode, 0,
1211 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1212 mem_ctx, &outdata, &res, &timeout, NULL);
1213 if (ret != 0 || res != 0) {
1214 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1218 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1219 talloc_free(outdata.dptr);
1225 get a list of nodes (vnn and flags ) from a remote node
1227 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1228 struct timeval timeout, uint32_t destnode,
1229 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1235 ret = ctdb_control(ctdb, destnode, 0,
1236 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1237 mem_ctx, &outdata, &res, &timeout, NULL);
1238 if (ret == 0 && res == -1 && outdata.dsize == 0) {
1239 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1240 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1242 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1243 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1247 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1248 talloc_free(outdata.dptr);
1254 old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1256 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb,
1257 struct timeval timeout, uint32_t destnode,
1258 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1262 struct ctdb_node_mapv4 *nodemapv4;
1265 ret = ctdb_control(ctdb, destnode, 0,
1266 CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null,
1267 mem_ctx, &outdata, &res, &timeout, NULL);
1268 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1269 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1273 nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1275 len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1276 (*nodemap) = talloc_zero_size(mem_ctx, len);
1277 CTDB_NO_MEMORY(ctdb, (*nodemap));
1279 (*nodemap)->num = nodemapv4->num;
1280 for (i=0; i<nodemapv4->num; i++) {
1281 (*nodemap)->nodes[i].pnn = nodemapv4->nodes[i].pnn;
1282 (*nodemap)->nodes[i].flags = nodemapv4->nodes[i].flags;
1283 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1284 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1287 talloc_free(outdata.dptr);
1293 drop the transport, reload the nodes file and restart the transport
1295 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1296 struct timeval timeout, uint32_t destnode)
1301 ret = ctdb_control(ctdb, destnode, 0,
1302 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1303 NULL, NULL, &res, &timeout, NULL);
1304 if (ret != 0 || res != 0) {
1305 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1314 set vnn map on a node
1316 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1317 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1322 struct ctdb_vnn_map_wire *map;
1325 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1326 map = talloc_size(mem_ctx, len);
1327 CTDB_NO_MEMORY(ctdb, map);
1329 map->generation = vnnmap->generation;
1330 map->size = vnnmap->size;
1331 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1334 data.dptr = (uint8_t *)map;
1336 ret = ctdb_control(ctdb, destnode, 0,
1337 CTDB_CONTROL_SETVNNMAP, 0, data,
1338 NULL, NULL, &res, &timeout, NULL);
1339 if (ret != 0 || res != 0) {
1340 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1351 async send for pull database
1353 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1354 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1355 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1358 struct ctdb_control_pulldb *pull;
1359 struct ctdb_client_control_state *state;
1361 pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1362 CTDB_NO_MEMORY_NULL(ctdb, pull);
1365 pull->lmaster = lmaster;
1367 indata.dsize = sizeof(struct ctdb_control_pulldb);
1368 indata.dptr = (unsigned char *)pull;
1370 state = ctdb_control_send(ctdb, destnode, 0,
1371 CTDB_CONTROL_PULL_DB, 0, indata,
1372 mem_ctx, &timeout, NULL);
1379 async recv for pull database
1381 int ctdb_ctrl_pulldb_recv(
1382 struct ctdb_context *ctdb,
1383 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1389 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1390 if ( (ret != 0) || (res != 0) ){
1391 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1399 pull all keys and records for a specific database on a node
1401 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1402 uint32_t dbid, uint32_t lmaster,
1403 TALLOC_CTX *mem_ctx, struct timeval timeout,
1406 struct ctdb_client_control_state *state;
1408 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1411 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1416 change dmaster for all keys in the database to the new value
1418 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1419 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1425 indata.dsize = 2*sizeof(uint32_t);
1426 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1428 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1429 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1431 ret = ctdb_control(ctdb, destnode, 0,
1432 CTDB_CONTROL_SET_DMASTER, 0, indata,
1433 NULL, NULL, &res, &timeout, NULL);
1434 if (ret != 0 || res != 0) {
1435 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1443 ping a node, return number of clients connected
1445 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1450 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1451 tdb_null, NULL, NULL, &res, NULL, NULL);
1459 find the real path to a ltdb
1461 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1468 data.dptr = (uint8_t *)&dbid;
1469 data.dsize = sizeof(dbid);
1471 ret = ctdb_control(ctdb, destnode, 0,
1472 CTDB_CONTROL_GETDBPATH, 0, data,
1473 mem_ctx, &data, &res, &timeout, NULL);
1474 if (ret != 0 || res != 0) {
1478 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1479 if ((*path) == NULL) {
1483 talloc_free(data.dptr);
1489 find the name of a db
1491 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1498 data.dptr = (uint8_t *)&dbid;
1499 data.dsize = sizeof(dbid);
1501 ret = ctdb_control(ctdb, destnode, 0,
1502 CTDB_CONTROL_GET_DBNAME, 0, data,
1503 mem_ctx, &data, &res, &timeout, NULL);
1504 if (ret != 0 || res != 0) {
1508 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1509 if ((*name) == NULL) {
1513 talloc_free(data.dptr);
1519 get the health status of a db
1521 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1522 struct timeval timeout,
1524 uint32_t dbid, TALLOC_CTX *mem_ctx,
1525 const char **reason)
1531 data.dptr = (uint8_t *)&dbid;
1532 data.dsize = sizeof(dbid);
1534 ret = ctdb_control(ctdb, destnode, 0,
1535 CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1536 mem_ctx, &data, &res, &timeout, NULL);
1537 if (ret != 0 || res != 0) {
1541 if (data.dsize == 0) {
1546 (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1547 if ((*reason) == NULL) {
1551 talloc_free(data.dptr);
1559 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1560 TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1566 data.dptr = discard_const(name);
1567 data.dsize = strlen(name)+1;
1569 ret = ctdb_control(ctdb, destnode, 0,
1570 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1572 mem_ctx, &data, &res, &timeout, NULL);
1574 if (ret != 0 || res != 0) {
1582 get debug level on a node
1584 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1590 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1591 ctdb, &data, &res, NULL, NULL);
1592 if (ret != 0 || res != 0) {
1595 if (data.dsize != sizeof(int32_t)) {
1596 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1597 (unsigned)data.dsize));
1600 *level = *(int32_t *)data.dptr;
1601 talloc_free(data.dptr);
1606 set debug level on a node
1608 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1614 data.dptr = (uint8_t *)&level;
1615 data.dsize = sizeof(level);
1617 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1618 NULL, NULL, &res, NULL, NULL);
1619 if (ret != 0 || res != 0) {
1627 get a list of connected nodes
1629 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1630 struct timeval timeout,
1631 TALLOC_CTX *mem_ctx,
1632 uint32_t *num_nodes)
1634 struct ctdb_node_map *map=NULL;
1640 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1645 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1646 if (nodes == NULL) {
1650 for (i=0;i<map->num;i++) {
1651 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1652 nodes[*num_nodes] = map->nodes[i].pnn;
1664 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1669 ret = ctdb_control(ctdb, destnode, 0,
1670 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1671 NULL, NULL, &res, NULL, NULL);
1672 if (ret != 0 || res != 0) {
1673 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1680 this is the dummy null procedure that all databases support
1682 static int ctdb_null_func(struct ctdb_call_info *call)
1688 this is a plain fetch procedure that all databases support
1690 static int ctdb_fetch_func(struct ctdb_call_info *call)
1692 call->reply_data = &call->record_data;
1697 attach to a specific database - client call
1699 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1700 struct timeval timeout,
1705 struct ctdb_db_context *ctdb_db;
1710 ctdb_db = ctdb_db_handle(ctdb, name);
1715 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1716 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1718 ctdb_db->ctdb = ctdb;
1719 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1720 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1722 data.dptr = discard_const(name);
1723 data.dsize = strlen(name)+1;
1725 /* tell ctdb daemon to attach */
1726 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
1727 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1728 0, data, ctdb_db, &data, &res, NULL, NULL);
1729 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1730 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1731 talloc_free(ctdb_db);
1735 ctdb_db->db_id = *(uint32_t *)data.dptr;
1736 talloc_free(data.dptr);
1738 ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1740 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1741 talloc_free(ctdb_db);
1745 tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1746 if (ctdb->valgrinding) {
1747 tdb_flags |= TDB_NOMMAP;
1749 tdb_flags |= TDB_DISALLOW_NESTING;
1751 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1752 if (ctdb_db->ltdb == NULL) {
1753 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1754 talloc_free(ctdb_db);
1758 ctdb_db->persistent = persistent;
1760 DLIST_ADD(ctdb->db_list, ctdb_db);
1762 /* add well known functions */
1763 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1764 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1771 setup a call for a database
1773 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1775 struct ctdb_registered_call *call;
1780 struct ctdb_control_set_call c;
1783 /* this is no longer valid with the separate daemon architecture */
1784 c.db_id = ctdb_db->db_id;
1788 data.dptr = (uint8_t *)&c;
1789 data.dsize = sizeof(c);
1791 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1792 data, NULL, NULL, &status, NULL, NULL);
1793 if (ret != 0 || status != 0) {
1794 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1799 /* also register locally */
1800 call = talloc(ctdb_db, struct ctdb_registered_call);
1804 DLIST_ADD(ctdb_db->calls, call);
1809 struct traverse_state {
1812 ctdb_traverse_func fn;
1817 called on each key during a ctdb_traverse
1819 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1821 struct traverse_state *state = (struct traverse_state *)p;
1822 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1825 if (data.dsize < sizeof(uint32_t) ||
1826 d->length != data.dsize) {
1827 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1832 key.dsize = d->keylen;
1833 key.dptr = &d->data[0];
1834 data.dsize = d->datalen;
1835 data.dptr = &d->data[d->keylen];
1837 if (key.dsize == 0 && data.dsize == 0) {
1838 /* end of traverse */
1843 if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1844 /* empty records are deleted records in ctdb */
1848 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1857 start a cluster wide traverse, calling the supplied fn on each record
1858 return the number of records traversed, or -1 on error
1860 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1863 struct ctdb_traverse_start t;
1866 uint64_t srvid = (getpid() | 0xFLL<<60);
1867 struct traverse_state state;
1871 state.private_data = private_data;
1874 ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1876 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1880 t.db_id = ctdb_db->db_id;
1884 data.dptr = (uint8_t *)&t;
1885 data.dsize = sizeof(t);
1887 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1888 data, NULL, NULL, &status, NULL, NULL);
1889 if (ret != 0 || status != 0) {
1890 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1891 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1895 while (!state.done) {
1896 event_loop_once(ctdb_db->ctdb->ev);
1899 ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1901 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1908 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1910 called on each key during a catdb
1912 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1915 FILE *f = (FILE *)p;
1916 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1918 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1919 for (i=0;i<key.dsize;i++) {
1920 if (ISASCII(key.dptr[i])) {
1921 fprintf(f, "%c", key.dptr[i]);
1923 fprintf(f, "\\%02X", key.dptr[i]);
1928 fprintf(f, "dmaster: %u\n", h->dmaster);
1929 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1931 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1932 for (i=sizeof(*h);i<data.dsize;i++) {
1933 if (ISASCII(data.dptr[i])) {
1934 fprintf(f, "%c", data.dptr[i]);
1936 fprintf(f, "\\%02X", data.dptr[i]);
1947 convenience function to list all keys to stdout
1949 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1951 return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1955 get the pid of a ctdb daemon
1957 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1962 ret = ctdb_control(ctdb, destnode, 0,
1963 CTDB_CONTROL_GET_PID, 0, tdb_null,
1964 NULL, NULL, &res, &timeout, NULL);
1966 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1977 async freeze send control
1979 struct ctdb_client_control_state *
1980 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1982 return ctdb_control_send(ctdb, destnode, priority,
1983 CTDB_CONTROL_FREEZE, 0, tdb_null,
1984 mem_ctx, &timeout, NULL);
1988 async freeze recv control
1990 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1995 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1996 if ( (ret != 0) || (res != 0) ){
1997 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2005 freeze databases of a certain priority
2007 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2009 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2010 struct ctdb_client_control_state *state;
2013 state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2014 ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2015 talloc_free(tmp_ctx);
2020 /* Freeze all databases */
2021 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2025 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2026 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2034 thaw databases of a certain priority
2036 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2041 ret = ctdb_control(ctdb, destnode, priority,
2042 CTDB_CONTROL_THAW, 0, tdb_null,
2043 NULL, NULL, &res, &timeout, NULL);
2044 if (ret != 0 || res != 0) {
2045 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2052 /* thaw all databases */
2053 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2055 return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2059 get pnn of a node, or -1
2061 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2066 ret = ctdb_control(ctdb, destnode, 0,
2067 CTDB_CONTROL_GET_PNN, 0, tdb_null,
2068 NULL, NULL, &res, &timeout, NULL);
2070 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2078 get the monitoring mode of a remote node
2080 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2085 ret = ctdb_control(ctdb, destnode, 0,
2086 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
2087 NULL, NULL, &res, &timeout, NULL);
2089 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2100 set the monitoring mode of a remote node to active
2102 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2107 ret = ctdb_control(ctdb, destnode, 0,
2108 CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null,
2109 NULL, NULL,NULL, &timeout, NULL);
2111 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2121 set the monitoring mode of a remote node to disable
2123 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2128 ret = ctdb_control(ctdb, destnode, 0,
2129 CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null,
2130 NULL, NULL, NULL, &timeout, NULL);
2132 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2144 sent to a node to make it take over an ip address
2146 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2147 uint32_t destnode, struct ctdb_public_ip *ip)
2150 struct ctdb_public_ipv4 ipv4;
2154 if (ip->addr.sa.sa_family == AF_INET) {
2156 ipv4.sin = ip->addr.ip;
2158 data.dsize = sizeof(ipv4);
2159 data.dptr = (uint8_t *)&ipv4;
2161 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2162 NULL, &res, &timeout, NULL);
2164 data.dsize = sizeof(*ip);
2165 data.dptr = (uint8_t *)ip;
2167 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2168 NULL, &res, &timeout, NULL);
2171 if (ret != 0 || res != 0) {
2172 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2181 sent to a node to make it release an ip address
2183 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2184 uint32_t destnode, struct ctdb_public_ip *ip)
2187 struct ctdb_public_ipv4 ipv4;
2191 if (ip->addr.sa.sa_family == AF_INET) {
2193 ipv4.sin = ip->addr.ip;
2195 data.dsize = sizeof(ipv4);
2196 data.dptr = (uint8_t *)&ipv4;
2198 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2199 NULL, &res, &timeout, NULL);
2201 data.dsize = sizeof(*ip);
2202 data.dptr = (uint8_t *)ip;
2204 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2205 NULL, &res, &timeout, NULL);
2208 if (ret != 0 || res != 0) {
2209 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2220 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2221 struct timeval timeout,
2223 const char *name, uint32_t *value)
2225 struct ctdb_control_get_tunable *t;
2226 TDB_DATA data, outdata;
2230 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2231 data.dptr = talloc_size(ctdb, data.dsize);
2232 CTDB_NO_MEMORY(ctdb, data.dptr);
2234 t = (struct ctdb_control_get_tunable *)data.dptr;
2235 t->length = strlen(name)+1;
2236 memcpy(t->name, name, t->length);
2238 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2239 &outdata, &res, &timeout, NULL);
2240 talloc_free(data.dptr);
2241 if (ret != 0 || res != 0) {
2242 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2246 if (outdata.dsize != sizeof(uint32_t)) {
2247 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2248 talloc_free(outdata.dptr);
2252 *value = *(uint32_t *)outdata.dptr;
2253 talloc_free(outdata.dptr);
2261 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2262 struct timeval timeout,
2264 const char *name, uint32_t value)
2266 struct ctdb_control_set_tunable *t;
2271 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2272 data.dptr = talloc_size(ctdb, data.dsize);
2273 CTDB_NO_MEMORY(ctdb, data.dptr);
2275 t = (struct ctdb_control_set_tunable *)data.dptr;
2276 t->length = strlen(name)+1;
2277 memcpy(t->name, name, t->length);
2280 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2281 NULL, &res, &timeout, NULL);
2282 talloc_free(data.dptr);
2283 if (ret != 0 || res != 0) {
2284 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2294 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2295 struct timeval timeout,
2297 TALLOC_CTX *mem_ctx,
2298 const char ***list, uint32_t *count)
2303 struct ctdb_control_list_tunable *t;
2306 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2307 mem_ctx, &outdata, &res, &timeout, NULL);
2308 if (ret != 0 || res != 0) {
2309 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2313 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2314 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2315 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2316 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2317 talloc_free(outdata.dptr);
2321 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2322 CTDB_NO_MEMORY(ctdb, p);
2324 talloc_free(outdata.dptr);
2329 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2330 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2331 CTDB_NO_MEMORY(ctdb, *list);
2332 (*list)[*count] = talloc_strdup(*list, s);
2333 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2343 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2344 struct timeval timeout, uint32_t destnode,
2345 TALLOC_CTX *mem_ctx,
2347 struct ctdb_all_public_ips **ips)
2353 ret = ctdb_control(ctdb, destnode, 0,
2354 CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2355 mem_ctx, &outdata, &res, &timeout, NULL);
2356 if (ret == 0 && res == -1) {
2357 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2358 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2360 if (ret != 0 || res != 0) {
2361 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2365 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2366 talloc_free(outdata.dptr);
2371 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2372 struct timeval timeout, uint32_t destnode,
2373 TALLOC_CTX *mem_ctx,
2374 struct ctdb_all_public_ips **ips)
2376 return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2381 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb,
2382 struct timeval timeout, uint32_t destnode,
2383 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2388 struct ctdb_all_public_ipsv4 *ipsv4;
2390 ret = ctdb_control(ctdb, destnode, 0,
2391 CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null,
2392 mem_ctx, &outdata, &res, &timeout, NULL);
2393 if (ret != 0 || res != 0) {
2394 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2398 ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2399 len = offsetof(struct ctdb_all_public_ips, ips) +
2400 ipsv4->num*sizeof(struct ctdb_public_ip);
2401 *ips = talloc_zero_size(mem_ctx, len);
2402 CTDB_NO_MEMORY(ctdb, *ips);
2403 (*ips)->num = ipsv4->num;
2404 for (i=0; i<ipsv4->num; i++) {
2405 (*ips)->ips[i].pnn = ipsv4->ips[i].pnn;
2406 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2409 talloc_free(outdata.dptr);
2414 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2415 struct timeval timeout, uint32_t destnode,
2416 TALLOC_CTX *mem_ctx,
2417 const ctdb_sock_addr *addr,
2418 struct ctdb_control_public_ip_info **_info)
2424 struct ctdb_control_public_ip_info *info;
2428 indata.dptr = discard_const_p(uint8_t, addr);
2429 indata.dsize = sizeof(*addr);
2431 ret = ctdb_control(ctdb, destnode, 0,
2432 CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2433 mem_ctx, &outdata, &res, &timeout, NULL);
2434 if (ret != 0 || res != 0) {
2435 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2436 "failed ret:%d res:%d\n",
2441 len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2442 if (len > outdata.dsize) {
2443 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2444 "returned invalid data with size %u > %u\n",
2445 (unsigned int)outdata.dsize,
2446 (unsigned int)len));
2447 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2451 info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2452 len += info->num*sizeof(struct ctdb_control_iface_info);
2454 if (len > outdata.dsize) {
2455 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2456 "returned invalid data with size %u > %u\n",
2457 (unsigned int)outdata.dsize,
2458 (unsigned int)len));
2459 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2463 /* make sure we null terminate the returned strings */
2464 for (i=0; i < info->num; i++) {
2465 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2468 *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2471 talloc_free(outdata.dptr);
2472 if (*_info == NULL) {
2473 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2474 "talloc_memdup size %u failed\n",
2475 (unsigned int)outdata.dsize));
2482 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2483 struct timeval timeout, uint32_t destnode,
2484 TALLOC_CTX *mem_ctx,
2485 struct ctdb_control_get_ifaces **_ifaces)
2490 struct ctdb_control_get_ifaces *ifaces;
2494 ret = ctdb_control(ctdb, destnode, 0,
2495 CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2496 mem_ctx, &outdata, &res, &timeout, NULL);
2497 if (ret != 0 || res != 0) {
2498 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2499 "failed ret:%d res:%d\n",
2504 len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2505 if (len > outdata.dsize) {
2506 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2507 "returned invalid data with size %u > %u\n",
2508 (unsigned int)outdata.dsize,
2509 (unsigned int)len));
2510 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2514 ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2515 len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2517 if (len > outdata.dsize) {
2518 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2519 "returned invalid data with size %u > %u\n",
2520 (unsigned int)outdata.dsize,
2521 (unsigned int)len));
2522 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2526 /* make sure we null terminate the returned strings */
2527 for (i=0; i < ifaces->num; i++) {
2528 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2531 *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2534 talloc_free(outdata.dptr);
2535 if (*_ifaces == NULL) {
2536 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2537 "talloc_memdup size %u failed\n",
2538 (unsigned int)outdata.dsize));
2545 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2546 struct timeval timeout, uint32_t destnode,
2547 TALLOC_CTX *mem_ctx,
2548 const struct ctdb_control_iface_info *info)
2554 indata.dptr = discard_const_p(uint8_t, info);
2555 indata.dsize = sizeof(*info);
2557 ret = ctdb_control(ctdb, destnode, 0,
2558 CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2559 mem_ctx, NULL, &res, &timeout, NULL);
2560 if (ret != 0 || res != 0) {
2561 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2562 "failed ret:%d res:%d\n",
2571 set/clear the permanent disabled bit on a remote node
2573 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2574 uint32_t set, uint32_t clear)
2578 struct ctdb_node_map *nodemap=NULL;
2579 struct ctdb_node_flag_change c;
2580 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2585 /* find the recovery master */
2586 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2588 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2589 talloc_free(tmp_ctx);
2594 /* read the node flags from the recmaster */
2595 ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2597 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2598 talloc_free(tmp_ctx);
2601 if (destnode >= nodemap->num) {
2602 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2603 talloc_free(tmp_ctx);
2608 c.old_flags = nodemap->nodes[destnode].flags;
2609 c.new_flags = c.old_flags;
2611 c.new_flags &= ~clear;
2613 data.dsize = sizeof(c);
2614 data.dptr = (unsigned char *)&c;
2616 /* send the flags update to all connected nodes */
2617 nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2619 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2621 timeout, false, data,
2624 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2626 talloc_free(tmp_ctx);
2630 talloc_free(tmp_ctx);
2638 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2639 struct timeval timeout,
2641 struct ctdb_tunable *tunables)
2647 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2648 &outdata, &res, &timeout, NULL);
2649 if (ret != 0 || res != 0) {
2650 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2654 if (outdata.dsize != sizeof(*tunables)) {
2655 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2656 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2660 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2661 talloc_free(outdata.dptr);
2666 add a public address to a node
2668 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2669 struct timeval timeout,
2671 struct ctdb_control_ip_iface *pub)
2677 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2678 data.dptr = (unsigned char *)pub;
2680 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2681 NULL, &res, &timeout, NULL);
2682 if (ret != 0 || res != 0) {
2683 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2691 delete a public address from a node
2693 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2694 struct timeval timeout,
2696 struct ctdb_control_ip_iface *pub)
2702 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2703 data.dptr = (unsigned char *)pub;
2705 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2706 NULL, &res, &timeout, NULL);
2707 if (ret != 0 || res != 0) {
2708 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2716 kill a tcp connection
2718 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb,
2719 struct timeval timeout,
2721 struct ctdb_control_killtcp *killtcp)
2727 data.dsize = sizeof(struct ctdb_control_killtcp);
2728 data.dptr = (unsigned char *)killtcp;
2730 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2731 NULL, &res, &timeout, NULL);
2732 if (ret != 0 || res != 0) {
2733 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2743 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2744 struct timeval timeout,
2746 ctdb_sock_addr *addr,
2752 struct ctdb_control_gratious_arp *gratious_arp;
2753 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2756 len = strlen(ifname)+1;
2757 gratious_arp = talloc_size(tmp_ctx,
2758 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2759 CTDB_NO_MEMORY(ctdb, gratious_arp);
2761 gratious_arp->addr = *addr;
2762 gratious_arp->len = len;
2763 memcpy(&gratious_arp->iface[0], ifname, len);
2766 data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2767 data.dptr = (unsigned char *)gratious_arp;
2769 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2770 NULL, &res, &timeout, NULL);
2771 if (ret != 0 || res != 0) {
2772 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2773 talloc_free(tmp_ctx);
2777 talloc_free(tmp_ctx);
2782 get a list of all tcp tickles that a node knows about for a particular vnn
2784 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2785 struct timeval timeout, uint32_t destnode,
2786 TALLOC_CTX *mem_ctx,
2787 ctdb_sock_addr *addr,
2788 struct ctdb_control_tcp_tickle_list **list)
2791 TDB_DATA data, outdata;
2794 data.dptr = (uint8_t*)addr;
2795 data.dsize = sizeof(ctdb_sock_addr);
2797 ret = ctdb_control(ctdb, destnode, 0,
2798 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
2799 mem_ctx, &outdata, &status, NULL, NULL);
2800 if (ret != 0 || status != 0) {
2801 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2805 *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2811 register a server id
2813 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
2814 struct timeval timeout,
2815 struct ctdb_server_id *id)
2821 data.dsize = sizeof(struct ctdb_server_id);
2822 data.dptr = (unsigned char *)id;
2824 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2825 CTDB_CONTROL_REGISTER_SERVER_ID,
2827 NULL, &res, &timeout, NULL);
2828 if (ret != 0 || res != 0) {
2829 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2837 unregister a server id
2839 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
2840 struct timeval timeout,
2841 struct ctdb_server_id *id)
2847 data.dsize = sizeof(struct ctdb_server_id);
2848 data.dptr = (unsigned char *)id;
2850 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2851 CTDB_CONTROL_UNREGISTER_SERVER_ID,
2853 NULL, &res, &timeout, NULL);
2854 if (ret != 0 || res != 0) {
2855 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2864 check if a server id exists
2866 if a server id does exist, return *status == 1, otherwise *status == 0
2868 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
2869 struct timeval timeout,
2871 struct ctdb_server_id *id,
2878 data.dsize = sizeof(struct ctdb_server_id);
2879 data.dptr = (unsigned char *)id;
2881 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID,
2883 NULL, &res, &timeout, NULL);
2885 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2899 get the list of server ids that are registered on a node
2901 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2902 TALLOC_CTX *mem_ctx,
2903 struct timeval timeout, uint32_t destnode,
2904 struct ctdb_server_id_list **svid_list)
2910 ret = ctdb_control(ctdb, destnode, 0,
2911 CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null,
2912 mem_ctx, &outdata, &res, &timeout, NULL);
2913 if (ret != 0 || res != 0) {
2914 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2918 *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2924 initialise the ctdb daemon for client applications
2926 NOTE: In current code the daemon does not fork. This is for testing purposes only
2927 and to simplify the code.
2929 struct ctdb_context *ctdb_init(struct event_context *ev)
2932 struct ctdb_context *ctdb;
2934 ctdb = talloc_zero(ev, struct ctdb_context);
2936 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2940 ctdb->idr = idr_init(ctdb);
2941 /* Wrap early to exercise code. */
2942 ctdb->lastid = INT_MAX-200;
2943 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2945 ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2947 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2952 ctdb->statistics.statistics_start_time = timeval_current();
2961 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2963 ctdb->flags |= flags;
2967 setup the local socket name
2969 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2971 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2972 CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2977 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2979 return ctdb->daemon.name;
2983 return the pnn of this node
2985 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2992 get the uptime of a remote node
2994 struct ctdb_client_control_state *
2995 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2997 return ctdb_control_send(ctdb, destnode, 0,
2998 CTDB_CONTROL_UPTIME, 0, tdb_null,
2999 mem_ctx, &timeout, NULL);
3002 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3008 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3009 if (ret != 0 || res != 0) {
3010 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3014 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3019 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3021 struct ctdb_client_control_state *state;
3023 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3024 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3028 send a control to execute the "recovered" event script on a node
3030 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3035 ret = ctdb_control(ctdb, destnode, 0,
3036 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
3037 NULL, NULL, &status, &timeout, NULL);
3038 if (ret != 0 || status != 0) {
3039 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3047 callback for the async helpers used when sending the same control
3048 to multiple nodes in parallell.
3050 static void async_callback(struct ctdb_client_control_state *state)
3052 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3053 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3057 uint32_t destnode = state->c->hdr.destnode;
3059 /* one more node has responded with recmode data */
3062 /* if we failed to push the db, then return an error and let
3063 the main loop try again.
3065 if (state->state != CTDB_CONTROL_DONE) {
3066 if ( !data->dont_log_errors) {
3067 DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3070 if (data->fail_callback) {
3071 data->fail_callback(ctdb, destnode, res, outdata,
3072 data->callback_data);
3077 state->async.fn = NULL;
3079 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3080 if ((ret != 0) || (res != 0)) {
3081 if ( !data->dont_log_errors) {
3082 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3085 if (data->fail_callback) {
3086 data->fail_callback(ctdb, destnode, res, outdata,
3087 data->callback_data);
3090 if ((ret == 0) && (data->callback != NULL)) {
3091 data->callback(ctdb, destnode, res, outdata,
3092 data->callback_data);
3097 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3099 /* set up the callback functions */
3100 state->async.fn = async_callback;
3101 state->async.private_data = data;
3103 /* one more control to wait for to complete */
3108 /* wait for up to the maximum number of seconds allowed
3109 or until all nodes we expect a response from has replied
3111 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3113 while (data->count > 0) {
3114 event_loop_once(ctdb->ev);
3116 if (data->fail_count != 0) {
3117 if (!data->dont_log_errors) {
3118 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
3128 perform a simple control on the listed nodes
3129 The control cannot return data
3131 int ctdb_client_async_control(struct ctdb_context *ctdb,
3132 enum ctdb_controls opcode,
3135 struct timeval timeout,
3136 bool dont_log_errors,
3138 client_async_callback client_callback,
3139 client_async_callback fail_callback,
3140 void *callback_data)
3142 struct client_async_data *async_data;
3143 struct ctdb_client_control_state *state;
3146 async_data = talloc_zero(ctdb, struct client_async_data);
3147 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3148 async_data->dont_log_errors = dont_log_errors;
3149 async_data->callback = client_callback;
3150 async_data->fail_callback = fail_callback;
3151 async_data->callback_data = callback_data;
3152 async_data->opcode = opcode;
3154 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3156 /* loop over all nodes and send an async control to each of them */
3157 for (j=0; j<num_nodes; j++) {
3158 uint32_t pnn = nodes[j];
3160 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
3161 0, data, async_data, &timeout, NULL);
3162 if (state == NULL) {
3163 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3164 talloc_free(async_data);
3168 ctdb_client_async_add(async_data, state);
3171 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3172 talloc_free(async_data);
3176 talloc_free(async_data);
3180 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3181 struct ctdb_vnn_map *vnn_map,
3182 TALLOC_CTX *mem_ctx,
3185 int i, j, num_nodes;
3188 for (i=num_nodes=0;i<vnn_map->size;i++) {
3189 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3195 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3196 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3198 for (i=j=0;i<vnn_map->size;i++) {
3199 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3202 nodes[j++] = vnn_map->map[i];
3208 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3209 struct ctdb_node_map *node_map,
3210 TALLOC_CTX *mem_ctx,
3213 int i, j, num_nodes;
3216 for (i=num_nodes=0;i<node_map->num;i++) {
3217 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3220 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3226 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3227 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3229 for (i=j=0;i<node_map->num;i++) {
3230 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3233 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3236 nodes[j++] = node_map->nodes[i].pnn;
3242 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3243 struct ctdb_node_map *node_map,
3244 TALLOC_CTX *mem_ctx,
3247 int i, j, num_nodes;
3250 for (i=num_nodes=0;i<node_map->num;i++) {
3251 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3254 if (node_map->nodes[i].pnn == pnn) {
3260 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3261 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3263 for (i=j=0;i<node_map->num;i++) {
3264 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3267 if (node_map->nodes[i].pnn == pnn) {
3270 nodes[j++] = node_map->nodes[i].pnn;
3276 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3277 struct ctdb_node_map *node_map,
3278 TALLOC_CTX *mem_ctx,
3281 int i, j, num_nodes;
3284 for (i=num_nodes=0;i<node_map->num;i++) {
3285 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3288 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3294 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3295 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3297 for (i=j=0;i<node_map->num;i++) {
3298 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3301 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3304 nodes[j++] = node_map->nodes[i].pnn;
3311 this is used to test if a pnn lock exists and if it exists will return
3312 the number of connections that pnn has reported or -1 if that recovery
3313 daemon is not running.
3316 ctdb_read_pnn_lock(int fd, int32_t pnn)
3321 lock.l_type = F_WRLCK;
3322 lock.l_whence = SEEK_SET;
3327 if (fcntl(fd, F_GETLK, &lock) != 0) {
3328 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3332 if (lock.l_type == F_UNLCK) {
3336 if (pread(fd, &c, 1, pnn) == -1) {
3337 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3345 get capabilities of a remote node
3347 struct ctdb_client_control_state *
3348 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3350 return ctdb_control_send(ctdb, destnode, 0,
3351 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
3352 mem_ctx, &timeout, NULL);
3355 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3361 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3362 if ( (ret != 0) || (res != 0) ) {
3363 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3368 *capabilities = *((uint32_t *)outdata.dptr);
3374 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3376 struct ctdb_client_control_state *state;
3377 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3380 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3381 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3382 talloc_free(tmp_ctx);
3387 * check whether a transaction is active on a given db on a given node
3389 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3397 indata.dptr = (uint8_t *)&db_id;
3398 indata.dsize = sizeof(db_id);
3400 ret = ctdb_control(ctdb, destnode, 0,
3401 CTDB_CONTROL_TRANS2_ACTIVE,
3402 0, indata, NULL, NULL, &status,
3406 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3414 struct ctdb_transaction_handle {
3415 struct ctdb_db_context *ctdb_db;
3418 * we store the reads and writes done under a transaction:
3419 * - one list stores both reads and writes (m_all),
3420 * - the other just writes (m_write)
3422 struct ctdb_marshall_buffer *m_all;
3423 struct ctdb_marshall_buffer *m_write;
3426 /* start a transaction on a database */
3427 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3429 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3433 /* start a transaction on a database */
3434 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3436 struct ctdb_record_handle *rh;
3439 struct ctdb_ltdb_header header;
3440 TALLOC_CTX *tmp_ctx;
3441 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3443 struct ctdb_db_context *ctdb_db = h->ctdb_db;
3447 key.dptr = discard_const(keyname);
3448 key.dsize = strlen(keyname);
3450 if (!ctdb_db->persistent) {
3451 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3456 tmp_ctx = talloc_new(h);
3458 rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3460 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3461 talloc_free(tmp_ctx);
3465 status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3469 unsigned long int usec = (1000 + random()) % 100000;
3470 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3471 "on db_id[0x%08x]. waiting for %lu "
3473 ctdb_db->db_id, usec));
3474 talloc_free(tmp_ctx);
3480 * store the pid in the database:
3481 * it is not enough that the node is dmaster...
3484 data.dptr = (unsigned char *)&pid;
3485 data.dsize = sizeof(pid_t);
3487 rh->header.dmaster = ctdb_db->ctdb->pnn;
3488 ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3490 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3491 "transaction record\n"));
3492 talloc_free(tmp_ctx);
3498 ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3500 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3501 talloc_free(tmp_ctx);
3505 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3507 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3508 "lock record inside transaction\n"));
3509 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3510 talloc_free(tmp_ctx);
3514 if (header.dmaster != ctdb_db->ctdb->pnn) {
3515 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3516 "transaction lock record\n"));
3517 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3518 talloc_free(tmp_ctx);
3522 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3523 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3524 "the transaction lock record\n"));
3525 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3526 talloc_free(tmp_ctx);
3530 talloc_free(tmp_ctx);
3536 /* start a transaction on a database */
3537 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3538 TALLOC_CTX *mem_ctx)
3540 struct ctdb_transaction_handle *h;
3543 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3545 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
3549 h->ctdb_db = ctdb_db;
3551 ret = ctdb_transaction_fetch_start(h);
3557 talloc_set_destructor(h, ctdb_transaction_destructor);
3565 fetch a record inside a transaction
3567 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3568 TALLOC_CTX *mem_ctx,
3569 TDB_DATA key, TDB_DATA *data)
3571 struct ctdb_ltdb_header header;
3574 ZERO_STRUCT(header);
3576 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3577 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3578 /* record doesn't exist yet */
3587 if (!h->in_replay) {
3588 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3589 if (h->m_all == NULL) {
3590 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3599 stores a record inside a transaction
3601 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3602 TDB_DATA key, TDB_DATA data)
3604 TALLOC_CTX *tmp_ctx = talloc_new(h);
3605 struct ctdb_ltdb_header header;
3609 ZERO_STRUCT(header);
3611 /* we need the header so we can update the RSN */
3612 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3613 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3614 /* the record doesn't exist - create one with us as dmaster.
3615 This is only safe because we are in a transaction and this
3616 is a persistent database */
3617 ZERO_STRUCT(header);
3618 } else if (ret != 0) {
3619 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3620 talloc_free(tmp_ctx);
3624 if (data.dsize == olddata.dsize &&
3625 memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3626 /* save writing the same data */
3627 talloc_free(tmp_ctx);
3631 header.dmaster = h->ctdb_db->ctdb->pnn;
3634 if (!h->in_replay) {
3635 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3636 if (h->m_all == NULL) {
3637 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3638 talloc_free(tmp_ctx);
3643 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3644 if (h->m_write == NULL) {
3645 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3646 talloc_free(tmp_ctx);
3650 ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3652 talloc_free(tmp_ctx);
3658 replay a transaction
3660 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3663 struct ctdb_rec_data *rec = NULL;
3665 h->in_replay = true;
3666 talloc_free(h->m_write);
3669 ret = ctdb_transaction_fetch_start(h);
3674 for (i=0;i<h->m_all->count;i++) {
3677 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3679 DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3683 if (rec->reqid == 0) {
3685 if (ctdb_transaction_store(h, key, data) != 0) {
3690 TALLOC_CTX *tmp_ctx = talloc_new(h);
3692 if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3693 talloc_free(tmp_ctx);
3696 if (data2.dsize != data.dsize ||
3697 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3698 /* the record has changed on us - we have to give up */
3699 talloc_free(tmp_ctx);
3702 talloc_free(tmp_ctx);
3709 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3715 commit a transaction
3717 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3721 struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3722 struct timeval timeout;
3723 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3725 talloc_set_destructor(h, NULL);
3727 /* our commit strategy is quite complex.
3729 - we first try to commit the changes to all other nodes
3731 - if that works, then we commit locally and we are done
3733 - if a commit on another node fails, then we need to cancel
3734 the transaction, then restart the transaction (thus
3735 opening a window of time for a pending recovery to
3736 complete), then replay the transaction, checking all the
3737 reads and writes (checking that reads give the same data,
3738 and writes succeed). Then we retry the transaction to the
3743 if (h->m_write == NULL) {
3744 /* no changes were made */
3745 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3750 /* tell ctdbd to commit to the other nodes */
3751 timeout = timeval_current_ofs(1, 0);
3752 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3753 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
3754 ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
3756 if (ret != 0 || status != 0) {
3757 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3758 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3759 ", retrying after 1 second...\n",
3760 (retries==0)?"":"retry "));
3764 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3766 /* work out what error code we will give if we
3767 have to fail the operation */
3768 switch ((enum ctdb_trans2_commit_error)status) {
3769 case CTDB_TRANS2_COMMIT_SUCCESS:
3770 case CTDB_TRANS2_COMMIT_SOMEFAIL:
3771 case CTDB_TRANS2_COMMIT_TIMEOUT:
3772 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3774 case CTDB_TRANS2_COMMIT_ALLFAIL:
3775 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3780 if (++retries == 100) {
3781 DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
3782 h->ctdb_db->db_id, retries, (unsigned)failure_control));
3783 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3784 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3785 tdb_null, NULL, NULL, NULL, NULL, NULL);
3790 if (ctdb_replay_transaction(h) != 0) {
3791 DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3792 "transaction on db 0x%08x, "
3793 "failure control =%u\n",
3795 (unsigned)failure_control));
3796 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3797 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3798 tdb_null, NULL, NULL, NULL, NULL, NULL);
3804 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3807 /* do the real commit locally */
3808 ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3810 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3811 "on db id 0x%08x locally, "
3812 "failure_control=%u\n",
3814 (unsigned)failure_control));
3815 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3816 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3817 tdb_null, NULL, NULL, NULL, NULL, NULL);
3822 /* tell ctdbd that we are finished with our local commit */
3823 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3824 CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
3825 tdb_null, NULL, NULL, NULL, NULL, NULL);
3831 recovery daemon ping to main daemon
3833 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3838 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
3839 ctdb, NULL, &res, NULL, NULL);
3840 if (ret != 0 || res != 0) {
3841 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3848 /* when forking the main daemon and the child process needs to connect back
3849 * to the daemon as a client process, this function can be used to change
3850 * the ctdb context from daemon into client mode
3852 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3857 /* Add extra information so we can identify this in the logs */
3859 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3862 /* shutdown the transport */
3863 if (ctdb->methods) {
3864 ctdb->methods->shutdown(ctdb);
3867 /* get a new event context */
3868 talloc_free(ctdb->ev);
3869 ctdb->ev = event_context_init(ctdb);
3870 tevent_loop_allow_nesting(ctdb->ev);
3872 close(ctdb->daemon.sd);
3873 ctdb->daemon.sd = -1;
3875 /* the client does not need to be realtime */
3876 if (ctdb->do_setsched) {
3877 ctdb_restore_scheduler(ctdb);
3880 /* initialise ctdb */
3881 ret = ctdb_socket_connect(ctdb);
3883 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3891 get the status of running the monitor eventscripts: NULL means never run.
3893 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
3894 struct timeval timeout, uint32_t destnode,
3895 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3896 struct ctdb_scripts_wire **script_status)
3899 TDB_DATA outdata, indata;
3901 uint32_t uinttype = type;
3903 indata.dptr = (uint8_t *)&uinttype;
3904 indata.dsize = sizeof(uinttype);
3906 ret = ctdb_control(ctdb, destnode, 0,
3907 CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3908 mem_ctx, &outdata, &res, &timeout, NULL);
3909 if (ret != 0 || res != 0) {
3910 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3914 if (outdata.dsize == 0) {
3915 *script_status = NULL;
3917 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3918 talloc_free(outdata.dptr);
3925 tell the main daemon how long it took to lock the reclock file
3927 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3933 data.dptr = (uint8_t *)&latency;
3934 data.dsize = sizeof(latency);
3936 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
3937 ctdb, NULL, &res, NULL, NULL);
3938 if (ret != 0 || res != 0) {
3939 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3947 get the name of the reclock file
3949 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3950 uint32_t destnode, TALLOC_CTX *mem_ctx,
3957 ret = ctdb_control(ctdb, destnode, 0,
3958 CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null,
3959 mem_ctx, &data, &res, &timeout, NULL);
3960 if (ret != 0 || res != 0) {
3964 if (data.dsize == 0) {
3967 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3969 talloc_free(data.dptr);
3975 set the reclock filename for a node
3977 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3983 if (reclock == NULL) {
3987 data.dsize = strlen(reclock) + 1;
3988 data.dptr = discard_const(reclock);
3991 ret = ctdb_control(ctdb, destnode, 0,
3992 CTDB_CONTROL_SET_RECLOCK_FILE, 0, data,
3993 NULL, NULL, &res, &timeout, NULL);
3994 if (ret != 0 || res != 0) {
3995 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4005 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4010 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null,
4011 ctdb, NULL, &res, &timeout, NULL);
4012 if (ret != 0 || res != 0) {
4013 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4023 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4027 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null,
4028 ctdb, NULL, NULL, &timeout, NULL);
4030 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4038 set the natgw state for a node
4040 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4046 data.dsize = sizeof(natgwstate);
4047 data.dptr = (uint8_t *)&natgwstate;
4049 ret = ctdb_control(ctdb, destnode, 0,
4050 CTDB_CONTROL_SET_NATGWSTATE, 0, data,
4051 NULL, NULL, &res, &timeout, NULL);
4052 if (ret != 0 || res != 0) {
4053 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4061 set the lmaster role for a node
4063 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4069 data.dsize = sizeof(lmasterrole);
4070 data.dptr = (uint8_t *)&lmasterrole;
4072 ret = ctdb_control(ctdb, destnode, 0,
4073 CTDB_CONTROL_SET_LMASTERROLE, 0, data,
4074 NULL, NULL, &res, &timeout, NULL);
4075 if (ret != 0 || res != 0) {
4076 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4084 set the recmaster role for a node
4086 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4092 data.dsize = sizeof(recmasterrole);
4093 data.dptr = (uint8_t *)&recmasterrole;
4095 ret = ctdb_control(ctdb, destnode, 0,
4096 CTDB_CONTROL_SET_RECMASTERROLE, 0, data,
4097 NULL, NULL, &res, &timeout, NULL);
4098 if (ret != 0 || res != 0) {
4099 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4106 /* enable an eventscript
4108 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4114 data.dsize = strlen(script) + 1;
4115 data.dptr = discard_const(script);
4117 ret = ctdb_control(ctdb, destnode, 0,
4118 CTDB_CONTROL_ENABLE_SCRIPT, 0, data,
4119 NULL, NULL, &res, &timeout, NULL);
4120 if (ret != 0 || res != 0) {
4121 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4128 /* disable an eventscript
4130 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4136 data.dsize = strlen(script) + 1;
4137 data.dptr = discard_const(script);
4139 ret = ctdb_control(ctdb, destnode, 0,
4140 CTDB_CONTROL_DISABLE_SCRIPT, 0, data,
4141 NULL, NULL, &res, &timeout, NULL);
4142 if (ret != 0 || res != 0) {
4143 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4151 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4157 data.dsize = sizeof(*bantime);
4158 data.dptr = (uint8_t *)bantime;
4160 ret = ctdb_control(ctdb, destnode, 0,
4161 CTDB_CONTROL_SET_BAN_STATE, 0, data,
4162 NULL, NULL, &res, &timeout, NULL);
4163 if (ret != 0 || res != 0) {
4164 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4172 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4177 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4179 ret = ctdb_control(ctdb, destnode, 0,
4180 CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4181 tmp_ctx, &outdata, &res, &timeout, NULL);
4182 if (ret != 0 || res != 0) {
4183 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4184 talloc_free(tmp_ctx);
4188 *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4189 talloc_free(tmp_ctx);
4195 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4200 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4202 data.dptr = (uint8_t*)db_prio;
4203 data.dsize = sizeof(*db_prio);
4205 ret = ctdb_control(ctdb, destnode, 0,
4206 CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4207 tmp_ctx, NULL, &res, &timeout, NULL);
4208 if (ret != 0 || res != 0) {
4209 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4210 talloc_free(tmp_ctx);
4214 talloc_free(tmp_ctx);
4219 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4224 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4226 data.dptr = (uint8_t*)&db_id;
4227 data.dsize = sizeof(db_id);
4229 ret = ctdb_control(ctdb, destnode, 0,
4230 CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4231 tmp_ctx, NULL, &res, &timeout, NULL);
4232 if (ret != 0 || res < 0) {
4233 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4234 talloc_free(tmp_ctx);
4242 talloc_free(tmp_ctx);
4247 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4253 ret = ctdb_control(ctdb, destnode, 0,
4254 CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null,
4255 mem_ctx, &outdata, &res, &timeout, NULL);
4256 if (ret != 0 || res != 0 || outdata.dsize == 0) {
4257 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4261 *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4262 talloc_free(outdata.dptr);
4267 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)