4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
36 allocate a packet for use in client<->daemon communication
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
40 enum ctdb_operation operation,
41 size_t length, size_t slength,
45 struct ctdb_req_header *hdr;
47 length = MAX(length, slength);
48 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
50 hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
52 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53 operation, (unsigned)length));
56 talloc_set_name_const(hdr, type);
57 memset(hdr, 0, slength);
59 hdr->operation = operation;
60 hdr->ctdb_magic = CTDB_MAGIC;
61 hdr->ctdb_version = CTDB_VERSION;
62 hdr->srcnode = ctdb->pnn;
64 hdr->generation = ctdb->vnn_map->generation;
71 local version of ctdb_call
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
77 struct ctdb_call_info *c;
78 struct ctdb_registered_call *fn;
79 struct ctdb_context *ctdb = ctdb_db->ctdb;
81 c = talloc(ctdb, struct ctdb_call_info);
82 CTDB_NO_MEMORY(ctdb, c);
85 c->call_data = &call->call_data;
86 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87 c->record_data.dsize = data->dsize;
88 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
93 for (fn=ctdb_db->calls;fn;fn=fn->next) {
94 if (fn->id == call->call_id) break;
97 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
102 if (fn->fn(c) != 0) {
103 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
108 /* we need to force the record to be written out if this was a remote access */
109 if (c->new_data == NULL) {
110 c->new_data = &c->record_data;
114 /* XXX check that we always have the lock here? */
115 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
116 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
123 call->reply_data = *c->reply_data;
125 talloc_steal(call, call->reply_data.dptr);
126 talloc_set_name_const(call->reply_data.dptr, __location__);
128 call->reply_data.dptr = NULL;
129 call->reply_data.dsize = 0;
131 call->status = c->status;
140 queue a packet for sending from client to daemon
142 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
149 called when a CTDB_REPLY_CALL packet comes in in the client
151 This packet comes in response to a CTDB_REQ_CALL request packet. It
152 contains any reply data from the call
154 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
157 struct ctdb_client_call_state *state;
159 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
165 if (hdr->reqid != state->reqid) {
166 /* we found a record but it was the wrong one */
167 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
171 state->call->reply_data.dptr = c->data;
172 state->call->reply_data.dsize = c->datalen;
173 state->call->status = c->status;
175 talloc_steal(state, c);
177 state->state = CTDB_CALL_DONE;
179 if (state->async.fn) {
180 state->async.fn(state);
184 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
187 this is called in the client, when data comes in from the daemon
189 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
192 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
195 /* place the packet as a child of a tmp_ctx. We then use
196 talloc_free() below to free it. If any of the calls want
197 to keep it, then they will steal it somewhere else, and the
198 talloc_free() will be a no-op */
199 tmp_ctx = talloc_new(ctdb);
200 talloc_steal(tmp_ctx, hdr);
203 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
207 if (cnt < sizeof(*hdr)) {
208 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
211 if (cnt != hdr->length) {
212 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
213 (unsigned)hdr->length, (unsigned)cnt);
217 if (hdr->ctdb_magic != CTDB_MAGIC) {
218 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
222 if (hdr->ctdb_version != CTDB_VERSION) {
223 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
227 switch (hdr->operation) {
228 case CTDB_REPLY_CALL:
229 ctdb_client_reply_call(ctdb, hdr);
232 case CTDB_REQ_MESSAGE:
233 ctdb_request_message(ctdb, hdr);
236 case CTDB_REPLY_CONTROL:
237 ctdb_client_reply_control(ctdb, hdr);
241 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
245 talloc_free(tmp_ctx);
249 connect with exponential backoff, thanks Stevens
251 #define CONNECT_MAXSLEEP 64
252 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 struct sockaddr_un addr;
258 memset(&addr, 0, sizeof(addr));
259 addr.sun_family = AF_UNIX;
260 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262 for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
263 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265 if ((ret == 0) || (errno != EAGAIN)) {
269 if (secs <= (CONNECT_MAXSLEEP / 2)) {
270 DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
271 strerror(errno), secs));
280 connect to a unix domain socket
282 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
285 if (ctdb->daemon.sd == -1) {
286 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
290 set_nonblocking(ctdb->daemon.sd);
291 set_close_on_exec(ctdb->daemon.sd);
293 if (ctdb_connect_retry(ctdb) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
295 close(ctdb->daemon.sd);
296 ctdb->daemon.sd = -1;
300 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
302 ctdb_client_read_cb, ctdb, "to-ctdbd");
307 struct ctdb_record_handle {
308 struct ctdb_db_context *ctdb_db;
311 struct ctdb_ltdb_header header;
316 make a recv call to the local ctdb daemon - called from client context
318 This is called when the program wants to wait for a ctdb_call to complete and get the
319 results. This call will block unless the call has already completed.
321 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
327 while (state->state < CTDB_CALL_DONE) {
328 event_loop_once(state->ctdb_db->ctdb->ev);
330 if (state->state != CTDB_CALL_DONE) {
331 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
336 if (state->call->reply_data.dsize) {
337 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
338 state->call->reply_data.dptr,
339 state->call->reply_data.dsize);
340 call->reply_data.dsize = state->call->reply_data.dsize;
342 call->reply_data.dptr = NULL;
343 call->reply_data.dsize = 0;
345 call->status = state->call->status;
355 destroy a ctdb_call in client
357 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
359 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
364 construct an event driven local ctdb_call
366 this is used so that locally processed ctdb_call requests are processed
367 in an event driven manner
369 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
370 struct ctdb_call *call,
371 struct ctdb_ltdb_header *header,
374 struct ctdb_client_call_state *state;
375 struct ctdb_context *ctdb = ctdb_db->ctdb;
378 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
379 CTDB_NO_MEMORY_NULL(ctdb, state);
380 state->call = talloc_zero(state, struct ctdb_call);
381 CTDB_NO_MEMORY_NULL(ctdb, state->call);
383 talloc_steal(state, data->dptr);
385 state->state = CTDB_CALL_DONE;
386 *(state->call) = *call;
387 state->ctdb_db = ctdb_db;
389 ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
395 make a ctdb call to the local daemon - async send. Called from client context.
397 This constructs a ctdb_call request and queues it for processing.
398 This call never blocks.
400 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
401 struct ctdb_call *call)
403 struct ctdb_client_call_state *state;
404 struct ctdb_context *ctdb = ctdb_db->ctdb;
405 struct ctdb_ltdb_header header;
409 struct ctdb_req_call *c;
411 /* if the domain socket is not yet open, open it */
412 if (ctdb->daemon.sd==-1) {
413 ctdb_socket_connect(ctdb);
416 ret = ctdb_ltdb_lock(ctdb_db, call->key);
418 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
422 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
424 if (ret == 0 && header.dmaster == ctdb->pnn) {
425 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
426 talloc_free(data.dptr);
427 ctdb_ltdb_unlock(ctdb_db, call->key);
431 ctdb_ltdb_unlock(ctdb_db, call->key);
432 talloc_free(data.dptr);
434 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
436 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
439 state->call = talloc_zero(state, struct ctdb_call);
440 if (state->call == NULL) {
441 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
445 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
446 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
448 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
452 state->reqid = ctdb_reqid_new(ctdb, state);
453 state->ctdb_db = ctdb_db;
454 talloc_set_destructor(state, ctdb_client_call_destructor);
456 c->hdr.reqid = state->reqid;
457 c->flags = call->flags;
458 c->db_id = ctdb_db->db_id;
459 c->callid = call->call_id;
461 c->keylen = call->key.dsize;
462 c->calldatalen = call->call_data.dsize;
463 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
464 memcpy(&c->data[call->key.dsize],
465 call->call_data.dptr, call->call_data.dsize);
466 *(state->call) = *call;
467 state->call->call_data.dptr = &c->data[call->key.dsize];
468 state->call->key.dptr = &c->data[0];
470 state->state = CTDB_CALL_WAIT;
473 ctdb_client_queue_pkt(ctdb, &c->hdr);
480 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
482 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
484 struct ctdb_client_call_state *state;
486 state = ctdb_call_send(ctdb_db, call);
487 return ctdb_call_recv(state, call);
492 tell the daemon what messaging srvid we will use, and register the message
493 handler function in the client
495 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
496 ctdb_msg_fn_t handler,
503 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
504 tdb_null, NULL, NULL, &status, NULL, NULL);
505 if (res != 0 || status != 0) {
506 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
510 /* also need to register the handler with our own ctdb structure */
511 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
515 tell the daemon we no longer want a srvid
517 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
522 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
523 tdb_null, NULL, NULL, &status, NULL, NULL);
524 if (res != 0 || status != 0) {
525 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
529 /* also need to register the handler with our own ctdb structure */
530 ctdb_deregister_message_handler(ctdb, srvid, private_data);
536 send a message - from client context
538 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
539 uint64_t srvid, TDB_DATA data)
541 struct ctdb_req_message *r;
544 len = offsetof(struct ctdb_req_message, data) + data.dsize;
545 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
546 len, struct ctdb_req_message);
547 CTDB_NO_MEMORY(ctdb, r);
549 r->hdr.destnode = pnn;
551 r->datalen = data.dsize;
552 memcpy(&r->data[0], data.dptr, data.dsize);
554 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
565 cancel a ctdb_fetch_lock operation, releasing the lock
567 static int fetch_lock_destructor(struct ctdb_record_handle *h)
569 ctdb_ltdb_unlock(h->ctdb_db, h->key);
574 force the migration of a record to this node
576 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
578 struct ctdb_call call;
580 call.call_id = CTDB_NULL_FUNC;
582 call.flags = CTDB_IMMEDIATE_MIGRATION;
583 return ctdb_call(ctdb_db, &call);
587 get a lock on a record, and return the records data. Blocks until it gets the lock
589 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
590 TDB_DATA key, TDB_DATA *data)
593 struct ctdb_record_handle *h;
596 procedure is as follows:
598 1) get the chain lock.
599 2) check if we are dmaster
600 3) if we are the dmaster then return handle
601 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
603 5) when we get the reply, goto (1)
606 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
611 h->ctdb_db = ctdb_db;
613 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
614 if (h->key.dptr == NULL) {
620 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
621 (const char *)key.dptr));
624 /* step 1 - get the chain lock */
625 ret = ctdb_ltdb_lock(ctdb_db, key);
627 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
632 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
634 talloc_set_destructor(h, fetch_lock_destructor);
636 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
638 /* when torturing, ensure we test the remote path */
639 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
641 h->header.dmaster = (uint32_t)-1;
645 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
647 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
648 ctdb_ltdb_unlock(ctdb_db, key);
649 ret = ctdb_client_force_migration(ctdb_db, key);
651 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
658 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
663 store some data to the record that was locked with ctdb_fetch_lock()
665 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
667 if (h->ctdb_db->persistent) {
668 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
672 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
676 non-locking fetch of a record
678 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
679 TDB_DATA key, TDB_DATA *data)
681 struct ctdb_call call;
684 call.call_id = CTDB_FETCH_FUNC;
685 call.call_data.dptr = NULL;
686 call.call_data.dsize = 0;
688 ret = ctdb_call(ctdb_db, &call);
691 *data = call.reply_data;
692 talloc_steal(mem_ctx, data->dptr);
701 called when a control completes or timesout to invoke the callback
702 function the user provided
704 static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
705 struct timeval t, void *private_data)
707 struct ctdb_client_control_state *state;
708 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
711 state = talloc_get_type(private_data, struct ctdb_client_control_state);
712 talloc_steal(tmp_ctx, state);
714 ret = ctdb_control_recv(state->ctdb, state, state,
719 talloc_free(tmp_ctx);
723 called when a CTDB_REPLY_CONTROL packet comes in in the client
725 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
726 contains any reply data from the control
728 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
729 struct ctdb_req_header *hdr)
731 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
732 struct ctdb_client_control_state *state;
734 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
736 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
740 if (hdr->reqid != state->reqid) {
741 /* we found a record but it was the wrong one */
742 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
746 state->outdata.dptr = c->data;
747 state->outdata.dsize = c->datalen;
748 state->status = c->status;
750 state->errormsg = talloc_strndup(state,
751 (char *)&c->data[c->datalen],
755 /* state->outdata now uses resources from c so we dont want c
756 to just dissappear from under us while state is still alive
758 talloc_steal(state, c);
760 state->state = CTDB_CONTROL_DONE;
762 /* if we had a callback registered for this control, pull the response
763 and call the callback.
765 if (state->async.fn) {
766 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
772 destroy a ctdb_control in client
774 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
776 ctdb_reqid_remove(state->ctdb, state->reqid);
781 /* time out handler for ctdb_control */
782 static void control_timeout_func(struct event_context *ev, struct timed_event *te,
783 struct timeval t, void *private_data)
785 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
787 DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
788 "dstnode:%u\n", state->reqid, state->c->opcode,
789 state->c->hdr.destnode));
791 state->state = CTDB_CONTROL_TIMEOUT;
793 /* if we had a callback registered for this control, pull the response
794 and call the callback.
796 if (state->async.fn) {
797 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
801 /* async version of send control request */
802 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
803 uint32_t destnode, uint64_t srvid,
804 uint32_t opcode, uint32_t flags, TDB_DATA data,
806 struct timeval *timeout,
809 struct ctdb_client_control_state *state;
811 struct ctdb_req_control *c;
818 /* if the domain socket is not yet open, open it */
819 if (ctdb->daemon.sd==-1) {
820 ctdb_socket_connect(ctdb);
823 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
824 CTDB_NO_MEMORY_NULL(ctdb, state);
827 state->reqid = ctdb_reqid_new(ctdb, state);
828 state->state = CTDB_CONTROL_WAIT;
829 state->errormsg = NULL;
831 talloc_set_destructor(state, ctdb_control_destructor);
833 len = offsetof(struct ctdb_req_control, data) + data.dsize;
834 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
835 len, struct ctdb_req_control);
837 CTDB_NO_MEMORY_NULL(ctdb, c);
838 c->hdr.reqid = state->reqid;
839 c->hdr.destnode = destnode;
844 c->datalen = data.dsize;
846 memcpy(&c->data[0], data.dptr, data.dsize);
850 if (timeout && !timeval_is_zero(timeout)) {
851 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
854 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
860 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
869 /* async version of receive control reply */
870 int ctdb_control_recv(struct ctdb_context *ctdb,
871 struct ctdb_client_control_state *state,
873 TDB_DATA *outdata, int32_t *status, char **errormsg)
877 if (status != NULL) {
880 if (errormsg != NULL) {
888 /* prevent double free of state */
889 tmp_ctx = talloc_new(ctdb);
890 talloc_steal(tmp_ctx, state);
892 /* loop one event at a time until we either timeout or the control
895 while (state->state == CTDB_CONTROL_WAIT) {
896 event_loop_once(ctdb->ev);
899 if (state->state != CTDB_CONTROL_DONE) {
900 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
901 if (state->async.fn) {
902 state->async.fn(state);
904 talloc_free(tmp_ctx);
908 if (state->errormsg) {
909 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
911 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
913 if (state->async.fn) {
914 state->async.fn(state);
916 talloc_free(tmp_ctx);
921 *outdata = state->outdata;
922 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
926 *status = state->status;
929 if (state->async.fn) {
930 state->async.fn(state);
933 talloc_free(tmp_ctx);
940 send a ctdb control message
941 timeout specifies how long we should wait for a reply.
942 if timeout is NULL we wait indefinitely
944 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
945 uint32_t opcode, uint32_t flags, TDB_DATA data,
946 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
947 struct timeval *timeout,
950 struct ctdb_client_control_state *state;
952 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
953 flags, data, mem_ctx,
955 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
963 a process exists call. Returns 0 if process exists, -1 otherwise
965 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
971 data.dptr = (uint8_t*)&pid;
972 data.dsize = sizeof(pid);
974 ret = ctdb_control(ctdb, destnode, 0,
975 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
976 NULL, NULL, &status, NULL, NULL);
978 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
986 get remote statistics
988 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
994 ret = ctdb_control(ctdb, destnode, 0,
995 CTDB_CONTROL_STATISTICS, 0, tdb_null,
996 ctdb, &data, &res, NULL, NULL);
997 if (ret != 0 || res != 0) {
998 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1002 if (data.dsize != sizeof(struct ctdb_statistics)) {
1003 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1004 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1008 *status = *(struct ctdb_statistics *)data.dptr;
1009 talloc_free(data.dptr);
1015 shutdown a remote ctdb node
1017 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1019 struct ctdb_client_control_state *state;
1021 state = ctdb_control_send(ctdb, destnode, 0,
1022 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
1023 NULL, &timeout, NULL);
1024 if (state == NULL) {
1025 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1033 get vnn map from a remote node
1035 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1040 struct ctdb_vnn_map_wire *map;
1042 ret = ctdb_control(ctdb, destnode, 0,
1043 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1044 mem_ctx, &outdata, &res, &timeout, NULL);
1045 if (ret != 0 || res != 0) {
1046 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1050 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1051 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1052 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1053 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1057 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1058 CTDB_NO_MEMORY(ctdb, *vnnmap);
1059 (*vnnmap)->generation = map->generation;
1060 (*vnnmap)->size = map->size;
1061 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1063 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1064 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1065 talloc_free(outdata.dptr);
1072 get the recovery mode of a remote node
1074 struct ctdb_client_control_state *
1075 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1077 return ctdb_control_send(ctdb, destnode, 0,
1078 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1079 mem_ctx, &timeout, NULL);
1082 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1087 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1089 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1094 *recmode = (uint32_t)res;
1100 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1102 struct ctdb_client_control_state *state;
1104 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1105 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1112 set the recovery mode of a remote node
1114 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1120 data.dsize = sizeof(uint32_t);
1121 data.dptr = (unsigned char *)&recmode;
1123 ret = ctdb_control(ctdb, destnode, 0,
1124 CTDB_CONTROL_SET_RECMODE, 0, data,
1125 NULL, NULL, &res, &timeout, NULL);
1126 if (ret != 0 || res != 0) {
1127 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1137 get the recovery master of a remote node
1139 struct ctdb_client_control_state *
1140 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1141 struct timeval timeout, uint32_t destnode)
1143 return ctdb_control_send(ctdb, destnode, 0,
1144 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1145 mem_ctx, &timeout, NULL);
1148 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1153 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1155 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1160 *recmaster = (uint32_t)res;
1166 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1168 struct ctdb_client_control_state *state;
1170 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1171 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1176 set the recovery master of a remote node
1178 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1185 data.dsize = sizeof(uint32_t);
1186 data.dptr = (unsigned char *)&recmaster;
1188 ret = ctdb_control(ctdb, destnode, 0,
1189 CTDB_CONTROL_SET_RECMASTER, 0, data,
1190 NULL, NULL, &res, &timeout, NULL);
1191 if (ret != 0 || res != 0) {
1192 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1201 get a list of databases off a remote node
1203 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1204 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1210 ret = ctdb_control(ctdb, destnode, 0,
1211 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1212 mem_ctx, &outdata, &res, &timeout, NULL);
1213 if (ret != 0 || res != 0) {
1214 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1218 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1219 talloc_free(outdata.dptr);
1225 get a list of nodes (vnn and flags ) from a remote node
1227 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1228 struct timeval timeout, uint32_t destnode,
1229 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1235 ret = ctdb_control(ctdb, destnode, 0,
1236 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1237 mem_ctx, &outdata, &res, &timeout, NULL);
1238 if (ret == 0 && res == -1 && outdata.dsize == 0) {
1239 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1240 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1242 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1243 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1247 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1248 talloc_free(outdata.dptr);
1254 old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1256 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb,
1257 struct timeval timeout, uint32_t destnode,
1258 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1262 struct ctdb_node_mapv4 *nodemapv4;
1265 ret = ctdb_control(ctdb, destnode, 0,
1266 CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null,
1267 mem_ctx, &outdata, &res, &timeout, NULL);
1268 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1269 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1273 nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1275 len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1276 (*nodemap) = talloc_zero_size(mem_ctx, len);
1277 CTDB_NO_MEMORY(ctdb, (*nodemap));
1279 (*nodemap)->num = nodemapv4->num;
1280 for (i=0; i<nodemapv4->num; i++) {
1281 (*nodemap)->nodes[i].pnn = nodemapv4->nodes[i].pnn;
1282 (*nodemap)->nodes[i].flags = nodemapv4->nodes[i].flags;
1283 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1284 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1287 talloc_free(outdata.dptr);
1293 drop the transport, reload the nodes file and restart the transport
1295 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1296 struct timeval timeout, uint32_t destnode)
1301 ret = ctdb_control(ctdb, destnode, 0,
1302 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1303 NULL, NULL, &res, &timeout, NULL);
1304 if (ret != 0 || res != 0) {
1305 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1314 set vnn map on a node
1316 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1317 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1322 struct ctdb_vnn_map_wire *map;
1325 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1326 map = talloc_size(mem_ctx, len);
1327 CTDB_NO_MEMORY(ctdb, map);
1329 map->generation = vnnmap->generation;
1330 map->size = vnnmap->size;
1331 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1334 data.dptr = (uint8_t *)map;
1336 ret = ctdb_control(ctdb, destnode, 0,
1337 CTDB_CONTROL_SETVNNMAP, 0, data,
1338 NULL, NULL, &res, &timeout, NULL);
1339 if (ret != 0 || res != 0) {
1340 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1351 async send for pull database
1353 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1354 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1355 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1358 struct ctdb_control_pulldb *pull;
1359 struct ctdb_client_control_state *state;
1361 pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1362 CTDB_NO_MEMORY_NULL(ctdb, pull);
1365 pull->lmaster = lmaster;
1367 indata.dsize = sizeof(struct ctdb_control_pulldb);
1368 indata.dptr = (unsigned char *)pull;
1370 state = ctdb_control_send(ctdb, destnode, 0,
1371 CTDB_CONTROL_PULL_DB, 0, indata,
1372 mem_ctx, &timeout, NULL);
1379 async recv for pull database
1381 int ctdb_ctrl_pulldb_recv(
1382 struct ctdb_context *ctdb,
1383 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1389 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1390 if ( (ret != 0) || (res != 0) ){
1391 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1399 pull all keys and records for a specific database on a node
1401 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1402 uint32_t dbid, uint32_t lmaster,
1403 TALLOC_CTX *mem_ctx, struct timeval timeout,
1406 struct ctdb_client_control_state *state;
1408 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1411 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1416 change dmaster for all keys in the database to the new value
1418 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1419 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1425 indata.dsize = 2*sizeof(uint32_t);
1426 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1428 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1429 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1431 ret = ctdb_control(ctdb, destnode, 0,
1432 CTDB_CONTROL_SET_DMASTER, 0, indata,
1433 NULL, NULL, &res, &timeout, NULL);
1434 if (ret != 0 || res != 0) {
1435 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1443 ping a node, return number of clients connected
1445 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1450 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1451 tdb_null, NULL, NULL, &res, NULL, NULL);
1459 find the real path to a ltdb
1461 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1468 data.dptr = (uint8_t *)&dbid;
1469 data.dsize = sizeof(dbid);
1471 ret = ctdb_control(ctdb, destnode, 0,
1472 CTDB_CONTROL_GETDBPATH, 0, data,
1473 mem_ctx, &data, &res, &timeout, NULL);
1474 if (ret != 0 || res != 0) {
1478 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1479 if ((*path) == NULL) {
1483 talloc_free(data.dptr);
1489 find the name of a db
1491 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1498 data.dptr = (uint8_t *)&dbid;
1499 data.dsize = sizeof(dbid);
1501 ret = ctdb_control(ctdb, destnode, 0,
1502 CTDB_CONTROL_GET_DBNAME, 0, data,
1503 mem_ctx, &data, &res, &timeout, NULL);
1504 if (ret != 0 || res != 0) {
1508 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1509 if ((*name) == NULL) {
1513 talloc_free(data.dptr);
1519 get the health status of a db
1521 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1522 struct timeval timeout,
1524 uint32_t dbid, TALLOC_CTX *mem_ctx,
1525 const char **reason)
1531 data.dptr = (uint8_t *)&dbid;
1532 data.dsize = sizeof(dbid);
1534 ret = ctdb_control(ctdb, destnode, 0,
1535 CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1536 mem_ctx, &data, &res, &timeout, NULL);
1537 if (ret != 0 || res != 0) {
1541 if (data.dsize == 0) {
1546 (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1547 if ((*reason) == NULL) {
1551 talloc_free(data.dptr);
1559 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1560 TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1566 data.dptr = discard_const(name);
1567 data.dsize = strlen(name)+1;
1569 ret = ctdb_control(ctdb, destnode, 0,
1570 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1572 mem_ctx, &data, &res, &timeout, NULL);
1574 if (ret != 0 || res != 0) {
1582 get debug level on a node
1584 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1590 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1591 ctdb, &data, &res, NULL, NULL);
1592 if (ret != 0 || res != 0) {
1595 if (data.dsize != sizeof(int32_t)) {
1596 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1597 (unsigned)data.dsize));
1600 *level = *(int32_t *)data.dptr;
1601 talloc_free(data.dptr);
1606 set debug level on a node
1608 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1614 data.dptr = (uint8_t *)&level;
1615 data.dsize = sizeof(level);
1617 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1618 NULL, NULL, &res, NULL, NULL);
1619 if (ret != 0 || res != 0) {
1627 get a list of connected nodes
1629 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1630 struct timeval timeout,
1631 TALLOC_CTX *mem_ctx,
1632 uint32_t *num_nodes)
1634 struct ctdb_node_map *map=NULL;
1640 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1645 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1646 if (nodes == NULL) {
1650 for (i=0;i<map->num;i++) {
1651 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1652 nodes[*num_nodes] = map->nodes[i].pnn;
1664 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1669 ret = ctdb_control(ctdb, destnode, 0,
1670 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1671 NULL, NULL, &res, NULL, NULL);
1672 if (ret != 0 || res != 0) {
1673 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1680 this is the dummy null procedure that all databases support
1682 static int ctdb_null_func(struct ctdb_call_info *call)
1688 this is a plain fetch procedure that all databases support
1690 static int ctdb_fetch_func(struct ctdb_call_info *call)
1692 call->reply_data = &call->record_data;
1697 attach to a specific database - client call
1699 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1701 struct ctdb_db_context *ctdb_db;
1706 ctdb_db = ctdb_db_handle(ctdb, name);
1711 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1712 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1714 ctdb_db->ctdb = ctdb;
1715 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1716 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1718 data.dptr = discard_const(name);
1719 data.dsize = strlen(name)+1;
1721 /* tell ctdb daemon to attach */
1722 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
1723 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1724 0, data, ctdb_db, &data, &res, NULL, NULL);
1725 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1726 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1727 talloc_free(ctdb_db);
1731 ctdb_db->db_id = *(uint32_t *)data.dptr;
1732 talloc_free(data.dptr);
1734 ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1736 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1737 talloc_free(ctdb_db);
1741 tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1742 if (ctdb->valgrinding) {
1743 tdb_flags |= TDB_NOMMAP;
1745 tdb_flags |= TDB_DISALLOW_NESTING;
1747 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1748 if (ctdb_db->ltdb == NULL) {
1749 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1750 talloc_free(ctdb_db);
1754 ctdb_db->persistent = persistent;
1756 DLIST_ADD(ctdb->db_list, ctdb_db);
1758 /* add well known functions */
1759 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1760 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1767 setup a call for a database
1769 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1771 struct ctdb_registered_call *call;
1776 struct ctdb_control_set_call c;
1779 /* this is no longer valid with the separate daemon architecture */
1780 c.db_id = ctdb_db->db_id;
1784 data.dptr = (uint8_t *)&c;
1785 data.dsize = sizeof(c);
1787 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1788 data, NULL, NULL, &status, NULL, NULL);
1789 if (ret != 0 || status != 0) {
1790 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1795 /* also register locally */
1796 call = talloc(ctdb_db, struct ctdb_registered_call);
1800 DLIST_ADD(ctdb_db->calls, call);
1805 struct traverse_state {
1808 ctdb_traverse_func fn;
1813 called on each key during a ctdb_traverse
1815 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1817 struct traverse_state *state = (struct traverse_state *)p;
1818 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1821 if (data.dsize < sizeof(uint32_t) ||
1822 d->length != data.dsize) {
1823 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1828 key.dsize = d->keylen;
1829 key.dptr = &d->data[0];
1830 data.dsize = d->datalen;
1831 data.dptr = &d->data[d->keylen];
1833 if (key.dsize == 0 && data.dsize == 0) {
1834 /* end of traverse */
1839 if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1840 /* empty records are deleted records in ctdb */
1844 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1853 start a cluster wide traverse, calling the supplied fn on each record
1854 return the number of records traversed, or -1 on error
1856 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1859 struct ctdb_traverse_start t;
1862 uint64_t srvid = (getpid() | 0xFLL<<60);
1863 struct traverse_state state;
1867 state.private_data = private_data;
1870 ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1872 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1876 t.db_id = ctdb_db->db_id;
1880 data.dptr = (uint8_t *)&t;
1881 data.dsize = sizeof(t);
1883 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1884 data, NULL, NULL, &status, NULL, NULL);
1885 if (ret != 0 || status != 0) {
1886 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1887 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1891 while (!state.done) {
1892 event_loop_once(ctdb_db->ctdb->ev);
1895 ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1897 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1904 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1906 called on each key during a catdb
1908 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1911 FILE *f = (FILE *)p;
1912 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1914 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1915 for (i=0;i<key.dsize;i++) {
1916 if (ISASCII(key.dptr[i])) {
1917 fprintf(f, "%c", key.dptr[i]);
1919 fprintf(f, "\\%02X", key.dptr[i]);
1924 fprintf(f, "dmaster: %u\n", h->dmaster);
1925 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1927 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1928 for (i=sizeof(*h);i<data.dsize;i++) {
1929 if (ISASCII(data.dptr[i])) {
1930 fprintf(f, "%c", data.dptr[i]);
1932 fprintf(f, "\\%02X", data.dptr[i]);
1943 convenience function to list all keys to stdout
1945 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1947 return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1951 get the pid of a ctdb daemon
1953 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1958 ret = ctdb_control(ctdb, destnode, 0,
1959 CTDB_CONTROL_GET_PID, 0, tdb_null,
1960 NULL, NULL, &res, &timeout, NULL);
1962 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1973 async freeze send control
1975 struct ctdb_client_control_state *
1976 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1978 return ctdb_control_send(ctdb, destnode, priority,
1979 CTDB_CONTROL_FREEZE, 0, tdb_null,
1980 mem_ctx, &timeout, NULL);
1984 async freeze recv control
1986 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1991 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1992 if ( (ret != 0) || (res != 0) ){
1993 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2001 freeze databases of a certain priority
2003 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2005 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2006 struct ctdb_client_control_state *state;
2009 state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2010 ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2011 talloc_free(tmp_ctx);
2016 /* Freeze all databases */
2017 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2021 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2022 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2030 thaw databases of a certain priority
2032 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2037 ret = ctdb_control(ctdb, destnode, priority,
2038 CTDB_CONTROL_THAW, 0, tdb_null,
2039 NULL, NULL, &res, &timeout, NULL);
2040 if (ret != 0 || res != 0) {
2041 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2048 /* thaw all databases */
2049 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2051 return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2055 get pnn of a node, or -1
2057 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2062 ret = ctdb_control(ctdb, destnode, 0,
2063 CTDB_CONTROL_GET_PNN, 0, tdb_null,
2064 NULL, NULL, &res, &timeout, NULL);
2066 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2074 get the monitoring mode of a remote node
2076 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2081 ret = ctdb_control(ctdb, destnode, 0,
2082 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
2083 NULL, NULL, &res, &timeout, NULL);
2085 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2096 set the monitoring mode of a remote node to active
2098 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2103 ret = ctdb_control(ctdb, destnode, 0,
2104 CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null,
2105 NULL, NULL,NULL, &timeout, NULL);
2107 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2117 set the monitoring mode of a remote node to disable
2119 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2124 ret = ctdb_control(ctdb, destnode, 0,
2125 CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null,
2126 NULL, NULL, NULL, &timeout, NULL);
2128 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2140 sent to a node to make it take over an ip address
2142 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2143 uint32_t destnode, struct ctdb_public_ip *ip)
2146 struct ctdb_public_ipv4 ipv4;
2150 if (ip->addr.sa.sa_family == AF_INET) {
2152 ipv4.sin = ip->addr.ip;
2154 data.dsize = sizeof(ipv4);
2155 data.dptr = (uint8_t *)&ipv4;
2157 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2158 NULL, &res, &timeout, NULL);
2160 data.dsize = sizeof(*ip);
2161 data.dptr = (uint8_t *)ip;
2163 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2164 NULL, &res, &timeout, NULL);
2167 if (ret != 0 || res != 0) {
2168 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2177 sent to a node to make it release an ip address
2179 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2180 uint32_t destnode, struct ctdb_public_ip *ip)
2183 struct ctdb_public_ipv4 ipv4;
2187 if (ip->addr.sa.sa_family == AF_INET) {
2189 ipv4.sin = ip->addr.ip;
2191 data.dsize = sizeof(ipv4);
2192 data.dptr = (uint8_t *)&ipv4;
2194 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2195 NULL, &res, &timeout, NULL);
2197 data.dsize = sizeof(*ip);
2198 data.dptr = (uint8_t *)ip;
2200 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2201 NULL, &res, &timeout, NULL);
2204 if (ret != 0 || res != 0) {
2205 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2216 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2217 struct timeval timeout,
2219 const char *name, uint32_t *value)
2221 struct ctdb_control_get_tunable *t;
2222 TDB_DATA data, outdata;
2226 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2227 data.dptr = talloc_size(ctdb, data.dsize);
2228 CTDB_NO_MEMORY(ctdb, data.dptr);
2230 t = (struct ctdb_control_get_tunable *)data.dptr;
2231 t->length = strlen(name)+1;
2232 memcpy(t->name, name, t->length);
2234 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2235 &outdata, &res, &timeout, NULL);
2236 talloc_free(data.dptr);
2237 if (ret != 0 || res != 0) {
2238 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2242 if (outdata.dsize != sizeof(uint32_t)) {
2243 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2244 talloc_free(outdata.dptr);
2248 *value = *(uint32_t *)outdata.dptr;
2249 talloc_free(outdata.dptr);
2257 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2258 struct timeval timeout,
2260 const char *name, uint32_t value)
2262 struct ctdb_control_set_tunable *t;
2267 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2268 data.dptr = talloc_size(ctdb, data.dsize);
2269 CTDB_NO_MEMORY(ctdb, data.dptr);
2271 t = (struct ctdb_control_set_tunable *)data.dptr;
2272 t->length = strlen(name)+1;
2273 memcpy(t->name, name, t->length);
2276 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2277 NULL, &res, &timeout, NULL);
2278 talloc_free(data.dptr);
2279 if (ret != 0 || res != 0) {
2280 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2290 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2291 struct timeval timeout,
2293 TALLOC_CTX *mem_ctx,
2294 const char ***list, uint32_t *count)
2299 struct ctdb_control_list_tunable *t;
2302 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2303 mem_ctx, &outdata, &res, &timeout, NULL);
2304 if (ret != 0 || res != 0) {
2305 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2309 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2310 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2311 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2312 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2313 talloc_free(outdata.dptr);
2317 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2318 CTDB_NO_MEMORY(ctdb, p);
2320 talloc_free(outdata.dptr);
2325 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2326 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2327 CTDB_NO_MEMORY(ctdb, *list);
2328 (*list)[*count] = talloc_strdup(*list, s);
2329 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2339 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2340 struct timeval timeout, uint32_t destnode,
2341 TALLOC_CTX *mem_ctx,
2343 struct ctdb_all_public_ips **ips)
2349 ret = ctdb_control(ctdb, destnode, 0,
2350 CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2351 mem_ctx, &outdata, &res, &timeout, NULL);
2352 if (ret == 0 && res == -1) {
2353 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2354 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2356 if (ret != 0 || res != 0) {
2357 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2361 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2362 talloc_free(outdata.dptr);
2367 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2368 struct timeval timeout, uint32_t destnode,
2369 TALLOC_CTX *mem_ctx,
2370 struct ctdb_all_public_ips **ips)
2372 return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2377 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb,
2378 struct timeval timeout, uint32_t destnode,
2379 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2384 struct ctdb_all_public_ipsv4 *ipsv4;
2386 ret = ctdb_control(ctdb, destnode, 0,
2387 CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null,
2388 mem_ctx, &outdata, &res, &timeout, NULL);
2389 if (ret != 0 || res != 0) {
2390 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2394 ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2395 len = offsetof(struct ctdb_all_public_ips, ips) +
2396 ipsv4->num*sizeof(struct ctdb_public_ip);
2397 *ips = talloc_zero_size(mem_ctx, len);
2398 CTDB_NO_MEMORY(ctdb, *ips);
2399 (*ips)->num = ipsv4->num;
2400 for (i=0; i<ipsv4->num; i++) {
2401 (*ips)->ips[i].pnn = ipsv4->ips[i].pnn;
2402 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2405 talloc_free(outdata.dptr);
2410 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2411 struct timeval timeout, uint32_t destnode,
2412 TALLOC_CTX *mem_ctx,
2413 const ctdb_sock_addr *addr,
2414 struct ctdb_control_public_ip_info **_info)
2420 struct ctdb_control_public_ip_info *info;
2424 indata.dptr = discard_const_p(uint8_t, addr);
2425 indata.dsize = sizeof(*addr);
2427 ret = ctdb_control(ctdb, destnode, 0,
2428 CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2429 mem_ctx, &outdata, &res, &timeout, NULL);
2430 if (ret != 0 || res != 0) {
2431 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2432 "failed ret:%d res:%d\n",
2437 len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2438 if (len > outdata.dsize) {
2439 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2440 "returned invalid data with size %u > %u\n",
2441 (unsigned int)outdata.dsize,
2442 (unsigned int)len));
2443 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2447 info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2448 len += info->num*sizeof(struct ctdb_control_iface_info);
2450 if (len > outdata.dsize) {
2451 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2452 "returned invalid data with size %u > %u\n",
2453 (unsigned int)outdata.dsize,
2454 (unsigned int)len));
2455 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2459 /* make sure we null terminate the returned strings */
2460 for (i=0; i < info->num; i++) {
2461 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2464 *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2467 talloc_free(outdata.dptr);
2468 if (*_info == NULL) {
2469 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2470 "talloc_memdup size %u failed\n",
2471 (unsigned int)outdata.dsize));
2478 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2479 struct timeval timeout, uint32_t destnode,
2480 TALLOC_CTX *mem_ctx,
2481 struct ctdb_control_get_ifaces **_ifaces)
2486 struct ctdb_control_get_ifaces *ifaces;
2490 ret = ctdb_control(ctdb, destnode, 0,
2491 CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2492 mem_ctx, &outdata, &res, &timeout, NULL);
2493 if (ret != 0 || res != 0) {
2494 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2495 "failed ret:%d res:%d\n",
2500 len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2501 if (len > outdata.dsize) {
2502 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2503 "returned invalid data with size %u > %u\n",
2504 (unsigned int)outdata.dsize,
2505 (unsigned int)len));
2506 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2510 ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2511 len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2513 if (len > outdata.dsize) {
2514 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2515 "returned invalid data with size %u > %u\n",
2516 (unsigned int)outdata.dsize,
2517 (unsigned int)len));
2518 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2522 /* make sure we null terminate the returned strings */
2523 for (i=0; i < ifaces->num; i++) {
2524 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2527 *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2530 talloc_free(outdata.dptr);
2531 if (*_ifaces == NULL) {
2532 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2533 "talloc_memdup size %u failed\n",
2534 (unsigned int)outdata.dsize));
2541 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2542 struct timeval timeout, uint32_t destnode,
2543 TALLOC_CTX *mem_ctx,
2544 const struct ctdb_control_iface_info *info)
2550 indata.dptr = discard_const_p(uint8_t, info);
2551 indata.dsize = sizeof(*info);
2553 ret = ctdb_control(ctdb, destnode, 0,
2554 CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2555 mem_ctx, NULL, &res, &timeout, NULL);
2556 if (ret != 0 || res != 0) {
2557 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2558 "failed ret:%d res:%d\n",
2567 set/clear the permanent disabled bit on a remote node
2569 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2570 uint32_t set, uint32_t clear)
2574 struct ctdb_node_map *nodemap=NULL;
2575 struct ctdb_node_flag_change c;
2576 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2581 /* find the recovery master */
2582 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2584 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2585 talloc_free(tmp_ctx);
2590 /* read the node flags from the recmaster */
2591 ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2593 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2594 talloc_free(tmp_ctx);
2597 if (destnode >= nodemap->num) {
2598 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2599 talloc_free(tmp_ctx);
2604 c.old_flags = nodemap->nodes[destnode].flags;
2605 c.new_flags = c.old_flags;
2607 c.new_flags &= ~clear;
2609 data.dsize = sizeof(c);
2610 data.dptr = (unsigned char *)&c;
2612 /* send the flags update to all connected nodes */
2613 nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2615 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2617 timeout, false, data,
2620 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2622 talloc_free(tmp_ctx);
2626 talloc_free(tmp_ctx);
2634 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2635 struct timeval timeout,
2637 struct ctdb_tunable *tunables)
2643 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2644 &outdata, &res, &timeout, NULL);
2645 if (ret != 0 || res != 0) {
2646 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2650 if (outdata.dsize != sizeof(*tunables)) {
2651 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2652 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2656 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2657 talloc_free(outdata.dptr);
2662 add a public address to a node
2664 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2665 struct timeval timeout,
2667 struct ctdb_control_ip_iface *pub)
2673 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2674 data.dptr = (unsigned char *)pub;
2676 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2677 NULL, &res, &timeout, NULL);
2678 if (ret != 0 || res != 0) {
2679 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2687 delete a public address from a node
2689 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2690 struct timeval timeout,
2692 struct ctdb_control_ip_iface *pub)
2698 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2699 data.dptr = (unsigned char *)pub;
2701 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2702 NULL, &res, &timeout, NULL);
2703 if (ret != 0 || res != 0) {
2704 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2712 kill a tcp connection
2714 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb,
2715 struct timeval timeout,
2717 struct ctdb_control_killtcp *killtcp)
2723 data.dsize = sizeof(struct ctdb_control_killtcp);
2724 data.dptr = (unsigned char *)killtcp;
2726 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2727 NULL, &res, &timeout, NULL);
2728 if (ret != 0 || res != 0) {
2729 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2739 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2740 struct timeval timeout,
2742 ctdb_sock_addr *addr,
2748 struct ctdb_control_gratious_arp *gratious_arp;
2749 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2752 len = strlen(ifname)+1;
2753 gratious_arp = talloc_size(tmp_ctx,
2754 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2755 CTDB_NO_MEMORY(ctdb, gratious_arp);
2757 gratious_arp->addr = *addr;
2758 gratious_arp->len = len;
2759 memcpy(&gratious_arp->iface[0], ifname, len);
2762 data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2763 data.dptr = (unsigned char *)gratious_arp;
2765 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2766 NULL, &res, &timeout, NULL);
2767 if (ret != 0 || res != 0) {
2768 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2769 talloc_free(tmp_ctx);
2773 talloc_free(tmp_ctx);
2778 get a list of all tcp tickles that a node knows about for a particular vnn
2780 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2781 struct timeval timeout, uint32_t destnode,
2782 TALLOC_CTX *mem_ctx,
2783 ctdb_sock_addr *addr,
2784 struct ctdb_control_tcp_tickle_list **list)
2787 TDB_DATA data, outdata;
2790 data.dptr = (uint8_t*)addr;
2791 data.dsize = sizeof(ctdb_sock_addr);
2793 ret = ctdb_control(ctdb, destnode, 0,
2794 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
2795 mem_ctx, &outdata, &status, NULL, NULL);
2796 if (ret != 0 || status != 0) {
2797 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2801 *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2807 register a server id
2809 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
2810 struct timeval timeout,
2811 struct ctdb_server_id *id)
2817 data.dsize = sizeof(struct ctdb_server_id);
2818 data.dptr = (unsigned char *)id;
2820 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2821 CTDB_CONTROL_REGISTER_SERVER_ID,
2823 NULL, &res, &timeout, NULL);
2824 if (ret != 0 || res != 0) {
2825 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2833 unregister a server id
2835 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
2836 struct timeval timeout,
2837 struct ctdb_server_id *id)
2843 data.dsize = sizeof(struct ctdb_server_id);
2844 data.dptr = (unsigned char *)id;
2846 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2847 CTDB_CONTROL_UNREGISTER_SERVER_ID,
2849 NULL, &res, &timeout, NULL);
2850 if (ret != 0 || res != 0) {
2851 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2860 check if a server id exists
2862 if a server id does exist, return *status == 1, otherwise *status == 0
2864 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
2865 struct timeval timeout,
2867 struct ctdb_server_id *id,
2874 data.dsize = sizeof(struct ctdb_server_id);
2875 data.dptr = (unsigned char *)id;
2877 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID,
2879 NULL, &res, &timeout, NULL);
2881 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2895 get the list of server ids that are registered on a node
2897 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2898 TALLOC_CTX *mem_ctx,
2899 struct timeval timeout, uint32_t destnode,
2900 struct ctdb_server_id_list **svid_list)
2906 ret = ctdb_control(ctdb, destnode, 0,
2907 CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null,
2908 mem_ctx, &outdata, &res, &timeout, NULL);
2909 if (ret != 0 || res != 0) {
2910 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2914 *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2920 initialise the ctdb daemon for client applications
2922 NOTE: In current code the daemon does not fork. This is for testing purposes only
2923 and to simplify the code.
2925 struct ctdb_context *ctdb_init(struct event_context *ev)
2928 struct ctdb_context *ctdb;
2930 ctdb = talloc_zero(ev, struct ctdb_context);
2932 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2936 ctdb->idr = idr_init(ctdb);
2937 /* Wrap early to exercise code. */
2938 ctdb->lastid = INT_MAX-200;
2939 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2941 ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2943 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2948 ctdb->statistics.statistics_start_time = timeval_current();
2957 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2959 ctdb->flags |= flags;
2963 setup the local socket name
2965 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2967 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2968 CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2973 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2975 return ctdb->daemon.name;
2979 return the pnn of this node
2981 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2988 get the uptime of a remote node
2990 struct ctdb_client_control_state *
2991 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2993 return ctdb_control_send(ctdb, destnode, 0,
2994 CTDB_CONTROL_UPTIME, 0, tdb_null,
2995 mem_ctx, &timeout, NULL);
2998 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3004 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3005 if (ret != 0 || res != 0) {
3006 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3010 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3015 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3017 struct ctdb_client_control_state *state;
3019 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3020 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3024 send a control to execute the "recovered" event script on a node
3026 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3031 ret = ctdb_control(ctdb, destnode, 0,
3032 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
3033 NULL, NULL, &status, &timeout, NULL);
3034 if (ret != 0 || status != 0) {
3035 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3043 callback for the async helpers used when sending the same control
3044 to multiple nodes in parallell.
3046 static void async_callback(struct ctdb_client_control_state *state)
3048 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3049 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3053 uint32_t destnode = state->c->hdr.destnode;
3055 /* one more node has responded with recmode data */
3058 /* if we failed to push the db, then return an error and let
3059 the main loop try again.
3061 if (state->state != CTDB_CONTROL_DONE) {
3062 if ( !data->dont_log_errors) {
3063 DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3066 if (data->fail_callback) {
3067 data->fail_callback(ctdb, destnode, res, outdata,
3068 data->callback_data);
3073 state->async.fn = NULL;
3075 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3076 if ((ret != 0) || (res != 0)) {
3077 if ( !data->dont_log_errors) {
3078 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3081 if (data->fail_callback) {
3082 data->fail_callback(ctdb, destnode, res, outdata,
3083 data->callback_data);
3086 if ((ret == 0) && (data->callback != NULL)) {
3087 data->callback(ctdb, destnode, res, outdata,
3088 data->callback_data);
3093 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3095 /* set up the callback functions */
3096 state->async.fn = async_callback;
3097 state->async.private_data = data;
3099 /* one more control to wait for to complete */
3104 /* wait for up to the maximum number of seconds allowed
3105 or until all nodes we expect a response from has replied
3107 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3109 while (data->count > 0) {
3110 event_loop_once(ctdb->ev);
3112 if (data->fail_count != 0) {
3113 if (!data->dont_log_errors) {
3114 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
3124 perform a simple control on the listed nodes
3125 The control cannot return data
3127 int ctdb_client_async_control(struct ctdb_context *ctdb,
3128 enum ctdb_controls opcode,
3131 struct timeval timeout,
3132 bool dont_log_errors,
3134 client_async_callback client_callback,
3135 client_async_callback fail_callback,
3136 void *callback_data)
3138 struct client_async_data *async_data;
3139 struct ctdb_client_control_state *state;
3142 async_data = talloc_zero(ctdb, struct client_async_data);
3143 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3144 async_data->dont_log_errors = dont_log_errors;
3145 async_data->callback = client_callback;
3146 async_data->fail_callback = fail_callback;
3147 async_data->callback_data = callback_data;
3148 async_data->opcode = opcode;
3150 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3152 /* loop over all nodes and send an async control to each of them */
3153 for (j=0; j<num_nodes; j++) {
3154 uint32_t pnn = nodes[j];
3156 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
3157 0, data, async_data, &timeout, NULL);
3158 if (state == NULL) {
3159 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3160 talloc_free(async_data);
3164 ctdb_client_async_add(async_data, state);
3167 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3168 talloc_free(async_data);
3172 talloc_free(async_data);
3176 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3177 struct ctdb_vnn_map *vnn_map,
3178 TALLOC_CTX *mem_ctx,
3181 int i, j, num_nodes;
3184 for (i=num_nodes=0;i<vnn_map->size;i++) {
3185 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3191 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3192 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3194 for (i=j=0;i<vnn_map->size;i++) {
3195 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3198 nodes[j++] = vnn_map->map[i];
3204 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3205 struct ctdb_node_map *node_map,
3206 TALLOC_CTX *mem_ctx,
3209 int i, j, num_nodes;
3212 for (i=num_nodes=0;i<node_map->num;i++) {
3213 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3216 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3222 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3223 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3225 for (i=j=0;i<node_map->num;i++) {
3226 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3229 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3232 nodes[j++] = node_map->nodes[i].pnn;
3238 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3239 struct ctdb_node_map *node_map,
3240 TALLOC_CTX *mem_ctx,
3243 int i, j, num_nodes;
3246 for (i=num_nodes=0;i<node_map->num;i++) {
3247 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3250 if (node_map->nodes[i].pnn == pnn) {
3256 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3257 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3259 for (i=j=0;i<node_map->num;i++) {
3260 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3263 if (node_map->nodes[i].pnn == pnn) {
3266 nodes[j++] = node_map->nodes[i].pnn;
3272 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3273 struct ctdb_node_map *node_map,
3274 TALLOC_CTX *mem_ctx,
3277 int i, j, num_nodes;
3280 for (i=num_nodes=0;i<node_map->num;i++) {
3281 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3284 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3290 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3291 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3293 for (i=j=0;i<node_map->num;i++) {
3294 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3297 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3300 nodes[j++] = node_map->nodes[i].pnn;
3307 this is used to test if a pnn lock exists and if it exists will return
3308 the number of connections that pnn has reported or -1 if that recovery
3309 daemon is not running.
3312 ctdb_read_pnn_lock(int fd, int32_t pnn)
3317 lock.l_type = F_WRLCK;
3318 lock.l_whence = SEEK_SET;
3323 if (fcntl(fd, F_GETLK, &lock) != 0) {
3324 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3328 if (lock.l_type == F_UNLCK) {
3332 if (pread(fd, &c, 1, pnn) == -1) {
3333 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3341 get capabilities of a remote node
3343 struct ctdb_client_control_state *
3344 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3346 return ctdb_control_send(ctdb, destnode, 0,
3347 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
3348 mem_ctx, &timeout, NULL);
3351 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3357 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3358 if ( (ret != 0) || (res != 0) ) {
3359 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3364 *capabilities = *((uint32_t *)outdata.dptr);
3370 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3372 struct ctdb_client_control_state *state;
3373 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3376 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3377 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3378 talloc_free(tmp_ctx);
3383 * check whether a transaction is active on a given db on a given node
3385 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3393 indata.dptr = (uint8_t *)&db_id;
3394 indata.dsize = sizeof(db_id);
3396 ret = ctdb_control(ctdb, destnode, 0,
3397 CTDB_CONTROL_TRANS2_ACTIVE,
3398 0, indata, NULL, NULL, &status,
3402 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3410 struct ctdb_transaction_handle {
3411 struct ctdb_db_context *ctdb_db;
3414 * we store the reads and writes done under a transaction:
3415 * - one list stores both reads and writes (m_all),
3416 * - the other just writes (m_write)
3418 struct ctdb_marshall_buffer *m_all;
3419 struct ctdb_marshall_buffer *m_write;
3422 /* start a transaction on a database */
3423 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3425 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3429 /* start a transaction on a database */
3430 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3432 struct ctdb_record_handle *rh;
3435 struct ctdb_ltdb_header header;
3436 TALLOC_CTX *tmp_ctx;
3437 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3439 struct ctdb_db_context *ctdb_db = h->ctdb_db;
3443 key.dptr = discard_const(keyname);
3444 key.dsize = strlen(keyname);
3446 if (!ctdb_db->persistent) {
3447 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3452 tmp_ctx = talloc_new(h);
3454 rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3456 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3457 talloc_free(tmp_ctx);
3461 status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3465 unsigned long int usec = (1000 + random()) % 100000;
3466 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3467 "on db_id[0x%08x]. waiting for %lu "
3469 ctdb_db->db_id, usec));
3470 talloc_free(tmp_ctx);
3476 * store the pid in the database:
3477 * it is not enough that the node is dmaster...
3480 data.dptr = (unsigned char *)&pid;
3481 data.dsize = sizeof(pid_t);
3483 rh->header.dmaster = ctdb_db->ctdb->pnn;
3484 ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3486 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3487 "transaction record\n"));
3488 talloc_free(tmp_ctx);
3494 ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3496 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3497 talloc_free(tmp_ctx);
3501 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3503 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3504 "lock record inside transaction\n"));
3505 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3506 talloc_free(tmp_ctx);
3510 if (header.dmaster != ctdb_db->ctdb->pnn) {
3511 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3512 "transaction lock record\n"));
3513 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3514 talloc_free(tmp_ctx);
3518 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3519 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3520 "the transaction lock record\n"));
3521 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3522 talloc_free(tmp_ctx);
3526 talloc_free(tmp_ctx);
3532 /* start a transaction on a database */
3533 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3534 TALLOC_CTX *mem_ctx)
3536 struct ctdb_transaction_handle *h;
3539 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3541 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
3545 h->ctdb_db = ctdb_db;
3547 ret = ctdb_transaction_fetch_start(h);
3553 talloc_set_destructor(h, ctdb_transaction_destructor);
3561 fetch a record inside a transaction
3563 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3564 TALLOC_CTX *mem_ctx,
3565 TDB_DATA key, TDB_DATA *data)
3567 struct ctdb_ltdb_header header;
3570 ZERO_STRUCT(header);
3572 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3573 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3574 /* record doesn't exist yet */
3583 if (!h->in_replay) {
3584 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3585 if (h->m_all == NULL) {
3586 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3595 stores a record inside a transaction
3597 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3598 TDB_DATA key, TDB_DATA data)
3600 TALLOC_CTX *tmp_ctx = talloc_new(h);
3601 struct ctdb_ltdb_header header;
3605 ZERO_STRUCT(header);
3607 /* we need the header so we can update the RSN */
3608 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3609 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3610 /* the record doesn't exist - create one with us as dmaster.
3611 This is only safe because we are in a transaction and this
3612 is a persistent database */
3613 ZERO_STRUCT(header);
3614 } else if (ret != 0) {
3615 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3616 talloc_free(tmp_ctx);
3620 if (data.dsize == olddata.dsize &&
3621 memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3622 /* save writing the same data */
3623 talloc_free(tmp_ctx);
3627 header.dmaster = h->ctdb_db->ctdb->pnn;
3630 if (!h->in_replay) {
3631 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3632 if (h->m_all == NULL) {
3633 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3634 talloc_free(tmp_ctx);
3639 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3640 if (h->m_write == NULL) {
3641 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3642 talloc_free(tmp_ctx);
3646 ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3648 talloc_free(tmp_ctx);
3654 replay a transaction
3656 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3659 struct ctdb_rec_data *rec = NULL;
3661 h->in_replay = true;
3662 talloc_free(h->m_write);
3665 ret = ctdb_transaction_fetch_start(h);
3670 for (i=0;i<h->m_all->count;i++) {
3673 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3675 DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3679 if (rec->reqid == 0) {
3681 if (ctdb_transaction_store(h, key, data) != 0) {
3686 TALLOC_CTX *tmp_ctx = talloc_new(h);
3688 if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3689 talloc_free(tmp_ctx);
3692 if (data2.dsize != data.dsize ||
3693 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3694 /* the record has changed on us - we have to give up */
3695 talloc_free(tmp_ctx);
3698 talloc_free(tmp_ctx);
3705 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3711 commit a transaction
3713 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3717 struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3718 struct timeval timeout;
3719 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3721 talloc_set_destructor(h, NULL);
3723 /* our commit strategy is quite complex.
3725 - we first try to commit the changes to all other nodes
3727 - if that works, then we commit locally and we are done
3729 - if a commit on another node fails, then we need to cancel
3730 the transaction, then restart the transaction (thus
3731 opening a window of time for a pending recovery to
3732 complete), then replay the transaction, checking all the
3733 reads and writes (checking that reads give the same data,
3734 and writes succeed). Then we retry the transaction to the
3739 if (h->m_write == NULL) {
3740 /* no changes were made */
3741 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3746 /* tell ctdbd to commit to the other nodes */
3747 timeout = timeval_current_ofs(1, 0);
3748 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3749 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
3750 ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
3752 if (ret != 0 || status != 0) {
3753 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3754 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3755 ", retrying after 1 second...\n",
3756 (retries==0)?"":"retry "));
3760 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3762 /* work out what error code we will give if we
3763 have to fail the operation */
3764 switch ((enum ctdb_trans2_commit_error)status) {
3765 case CTDB_TRANS2_COMMIT_SUCCESS:
3766 case CTDB_TRANS2_COMMIT_SOMEFAIL:
3767 case CTDB_TRANS2_COMMIT_TIMEOUT:
3768 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3770 case CTDB_TRANS2_COMMIT_ALLFAIL:
3771 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3776 if (++retries == 100) {
3777 DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
3778 h->ctdb_db->db_id, retries, (unsigned)failure_control));
3779 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3780 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3781 tdb_null, NULL, NULL, NULL, NULL, NULL);
3786 if (ctdb_replay_transaction(h) != 0) {
3787 DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3788 "transaction on db 0x%08x, "
3789 "failure control =%u\n",
3791 (unsigned)failure_control));
3792 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3793 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3794 tdb_null, NULL, NULL, NULL, NULL, NULL);
3800 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3803 /* do the real commit locally */
3804 ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3806 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3807 "on db id 0x%08x locally, "
3808 "failure_control=%u\n",
3810 (unsigned)failure_control));
3811 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3812 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3813 tdb_null, NULL, NULL, NULL, NULL, NULL);
3818 /* tell ctdbd that we are finished with our local commit */
3819 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3820 CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
3821 tdb_null, NULL, NULL, NULL, NULL, NULL);
3827 recovery daemon ping to main daemon
3829 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3834 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
3835 ctdb, NULL, &res, NULL, NULL);
3836 if (ret != 0 || res != 0) {
3837 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3844 /* when forking the main daemon and the child process needs to connect back
3845 * to the daemon as a client process, this function can be used to change
3846 * the ctdb context from daemon into client mode
3848 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3853 /* Add extra information so we can identify this in the logs */
3855 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3858 /* shutdown the transport */
3859 if (ctdb->methods) {
3860 ctdb->methods->shutdown(ctdb);
3863 /* get a new event context */
3864 talloc_free(ctdb->ev);
3865 ctdb->ev = event_context_init(ctdb);
3866 tevent_loop_allow_nesting(ctdb->ev);
3868 close(ctdb->daemon.sd);
3869 ctdb->daemon.sd = -1;
3871 /* the client does not need to be realtime */
3872 if (ctdb->do_setsched) {
3873 ctdb_restore_scheduler(ctdb);
3876 /* initialise ctdb */
3877 ret = ctdb_socket_connect(ctdb);
3879 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3887 get the status of running the monitor eventscripts: NULL means never run.
3889 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
3890 struct timeval timeout, uint32_t destnode,
3891 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3892 struct ctdb_scripts_wire **script_status)
3895 TDB_DATA outdata, indata;
3897 uint32_t uinttype = type;
3899 indata.dptr = (uint8_t *)&uinttype;
3900 indata.dsize = sizeof(uinttype);
3902 ret = ctdb_control(ctdb, destnode, 0,
3903 CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3904 mem_ctx, &outdata, &res, &timeout, NULL);
3905 if (ret != 0 || res != 0) {
3906 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3910 if (outdata.dsize == 0) {
3911 *script_status = NULL;
3913 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3914 talloc_free(outdata.dptr);
3921 tell the main daemon how long it took to lock the reclock file
3923 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3929 data.dptr = (uint8_t *)&latency;
3930 data.dsize = sizeof(latency);
3932 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
3933 ctdb, NULL, &res, NULL, NULL);
3934 if (ret != 0 || res != 0) {
3935 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3943 get the name of the reclock file
3945 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3946 uint32_t destnode, TALLOC_CTX *mem_ctx,
3953 ret = ctdb_control(ctdb, destnode, 0,
3954 CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null,
3955 mem_ctx, &data, &res, &timeout, NULL);
3956 if (ret != 0 || res != 0) {
3960 if (data.dsize == 0) {
3963 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3965 talloc_free(data.dptr);
3971 set the reclock filename for a node
3973 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3979 if (reclock == NULL) {
3983 data.dsize = strlen(reclock) + 1;
3984 data.dptr = discard_const(reclock);
3987 ret = ctdb_control(ctdb, destnode, 0,
3988 CTDB_CONTROL_SET_RECLOCK_FILE, 0, data,
3989 NULL, NULL, &res, &timeout, NULL);
3990 if (ret != 0 || res != 0) {
3991 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4001 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4006 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null,
4007 ctdb, NULL, &res, &timeout, NULL);
4008 if (ret != 0 || res != 0) {
4009 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4019 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4023 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null,
4024 ctdb, NULL, NULL, &timeout, NULL);
4026 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4034 set the natgw state for a node
4036 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4042 data.dsize = sizeof(natgwstate);
4043 data.dptr = (uint8_t *)&natgwstate;
4045 ret = ctdb_control(ctdb, destnode, 0,
4046 CTDB_CONTROL_SET_NATGWSTATE, 0, data,
4047 NULL, NULL, &res, &timeout, NULL);
4048 if (ret != 0 || res != 0) {
4049 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4057 set the lmaster role for a node
4059 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4065 data.dsize = sizeof(lmasterrole);
4066 data.dptr = (uint8_t *)&lmasterrole;
4068 ret = ctdb_control(ctdb, destnode, 0,
4069 CTDB_CONTROL_SET_LMASTERROLE, 0, data,
4070 NULL, NULL, &res, &timeout, NULL);
4071 if (ret != 0 || res != 0) {
4072 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4080 set the recmaster role for a node
4082 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4088 data.dsize = sizeof(recmasterrole);
4089 data.dptr = (uint8_t *)&recmasterrole;
4091 ret = ctdb_control(ctdb, destnode, 0,
4092 CTDB_CONTROL_SET_RECMASTERROLE, 0, data,
4093 NULL, NULL, &res, &timeout, NULL);
4094 if (ret != 0 || res != 0) {
4095 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4102 /* enable an eventscript
4104 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4110 data.dsize = strlen(script) + 1;
4111 data.dptr = discard_const(script);
4113 ret = ctdb_control(ctdb, destnode, 0,
4114 CTDB_CONTROL_ENABLE_SCRIPT, 0, data,
4115 NULL, NULL, &res, &timeout, NULL);
4116 if (ret != 0 || res != 0) {
4117 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4124 /* disable an eventscript
4126 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4132 data.dsize = strlen(script) + 1;
4133 data.dptr = discard_const(script);
4135 ret = ctdb_control(ctdb, destnode, 0,
4136 CTDB_CONTROL_DISABLE_SCRIPT, 0, data,
4137 NULL, NULL, &res, &timeout, NULL);
4138 if (ret != 0 || res != 0) {
4139 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4147 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4153 data.dsize = sizeof(*bantime);
4154 data.dptr = (uint8_t *)bantime;
4156 ret = ctdb_control(ctdb, destnode, 0,
4157 CTDB_CONTROL_SET_BAN_STATE, 0, data,
4158 NULL, NULL, &res, &timeout, NULL);
4159 if (ret != 0 || res != 0) {
4160 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4168 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4173 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4175 ret = ctdb_control(ctdb, destnode, 0,
4176 CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4177 tmp_ctx, &outdata, &res, &timeout, NULL);
4178 if (ret != 0 || res != 0) {
4179 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4180 talloc_free(tmp_ctx);
4184 *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4185 talloc_free(tmp_ctx);
4191 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4196 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4198 data.dptr = (uint8_t*)db_prio;
4199 data.dsize = sizeof(*db_prio);
4201 ret = ctdb_control(ctdb, destnode, 0,
4202 CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4203 tmp_ctx, NULL, &res, &timeout, NULL);
4204 if (ret != 0 || res != 0) {
4205 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4206 talloc_free(tmp_ctx);
4210 talloc_free(tmp_ctx);
4215 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4220 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4222 data.dptr = (uint8_t*)&db_id;
4223 data.dsize = sizeof(db_id);
4225 ret = ctdb_control(ctdb, destnode, 0,
4226 CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4227 tmp_ctx, NULL, &res, &timeout, NULL);
4228 if (ret != 0 || res < 0) {
4229 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4230 talloc_free(tmp_ctx);
4238 talloc_free(tmp_ctx);
4243 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4249 ret = ctdb_control(ctdb, destnode, 0,
4250 CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null,
4251 mem_ctx, &outdata, &res, &timeout, NULL);
4252 if (ret != 0 || res != 0 || outdata.dsize == 0) {
4253 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4257 *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4258 talloc_free(outdata.dptr);
4263 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)