4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
33 allocate a packet for use in client<->daemon communication
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
37 enum ctdb_operation operation,
38 size_t length, size_t slength,
42 struct ctdb_req_header *hdr;
44 length = MAX(length, slength);
45 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
47 hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
49 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50 operation, (unsigned)length));
53 talloc_set_name_const(hdr, type);
54 memset(hdr, 0, slength);
56 hdr->operation = operation;
57 hdr->ctdb_magic = CTDB_MAGIC;
58 hdr->ctdb_version = CTDB_VERSION;
59 hdr->srcnode = ctdb->pnn;
61 hdr->generation = ctdb->vnn_map->generation;
68 local version of ctdb_call
70 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
71 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
72 TDB_DATA *data, uint32_t caller)
74 struct ctdb_call_info *c;
75 struct ctdb_registered_call *fn;
76 struct ctdb_context *ctdb = ctdb_db->ctdb;
78 c = talloc(ctdb, struct ctdb_call_info);
79 CTDB_NO_MEMORY(ctdb, c);
82 c->call_data = &call->call_data;
83 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
84 c->record_data.dsize = data->dsize;
85 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
90 for (fn=ctdb_db->calls;fn;fn=fn->next) {
91 if (fn->id == call->call_id) break;
94 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
100 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105 if (header->laccessor != caller) {
108 header->laccessor = caller;
111 /* we need to force the record to be written out if this was a remote access,
112 so that the lacount is updated */
113 if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
114 c->new_data = &c->record_data;
118 /* XXX check that we always have the lock here? */
119 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
120 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
127 call->reply_data = *c->reply_data;
129 talloc_steal(call, call->reply_data.dptr);
130 talloc_set_name_const(call->reply_data.dptr, __location__);
132 call->reply_data.dptr = NULL;
133 call->reply_data.dsize = 0;
135 call->status = c->status;
144 queue a packet for sending from client to daemon
146 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
148 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
153 called when a CTDB_REPLY_CALL packet comes in in the client
155 This packet comes in response to a CTDB_REQ_CALL request packet. It
156 contains any reply data from the call
158 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
160 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
161 struct ctdb_client_call_state *state;
163 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
165 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169 if (hdr->reqid != state->reqid) {
170 /* we found a record but it was the wrong one */
171 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175 state->call->reply_data.dptr = c->data;
176 state->call->reply_data.dsize = c->datalen;
177 state->call->status = c->status;
179 talloc_steal(state, c);
181 state->state = CTDB_CALL_DONE;
183 if (state->async.fn) {
184 state->async.fn(state);
188 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
191 this is called in the client, when data comes in from the daemon
193 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
195 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
196 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
199 /* place the packet as a child of a tmp_ctx. We then use
200 talloc_free() below to free it. If any of the calls want
201 to keep it, then they will steal it somewhere else, and the
202 talloc_free() will be a no-op */
203 tmp_ctx = talloc_new(ctdb);
204 talloc_steal(tmp_ctx, hdr);
207 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211 if (cnt < sizeof(*hdr)) {
212 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
215 if (cnt != hdr->length) {
216 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
217 (unsigned)hdr->length, (unsigned)cnt);
221 if (hdr->ctdb_magic != CTDB_MAGIC) {
222 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226 if (hdr->ctdb_version != CTDB_VERSION) {
227 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231 switch (hdr->operation) {
232 case CTDB_REPLY_CALL:
233 ctdb_client_reply_call(ctdb, hdr);
236 case CTDB_REQ_MESSAGE:
237 ctdb_request_message(ctdb, hdr);
240 case CTDB_REPLY_CONTROL:
241 ctdb_client_reply_control(ctdb, hdr);
245 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249 talloc_free(tmp_ctx);
253 connect to a unix domain socket
255 int ctdb_socket_connect(struct ctdb_context *ctdb)
257 struct sockaddr_un addr;
259 memset(&addr, 0, sizeof(addr));
260 addr.sun_family = AF_UNIX;
261 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
263 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
264 if (ctdb->daemon.sd == -1) {
268 set_nonblocking(ctdb->daemon.sd);
269 set_close_on_exec(ctdb->daemon.sd);
271 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
272 close(ctdb->daemon.sd);
273 ctdb->daemon.sd = -1;
277 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
279 ctdb_client_read_cb, ctdb);
284 struct ctdb_record_handle {
285 struct ctdb_db_context *ctdb_db;
288 struct ctdb_ltdb_header header;
293 make a recv call to the local ctdb daemon - called from client context
295 This is called when the program wants to wait for a ctdb_call to complete and get the
296 results. This call will block unless the call has already completed.
298 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 while (state->state < CTDB_CALL_DONE) {
305 event_loop_once(state->ctdb_db->ctdb->ev);
307 if (state->state != CTDB_CALL_DONE) {
308 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
313 if (state->call->reply_data.dsize) {
314 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
315 state->call->reply_data.dptr,
316 state->call->reply_data.dsize);
317 call->reply_data.dsize = state->call->reply_data.dsize;
319 call->reply_data.dptr = NULL;
320 call->reply_data.dsize = 0;
322 call->status = state->call->status;
332 destroy a ctdb_call in client
334 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
336 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
341 construct an event driven local ctdb_call
343 this is used so that locally processed ctdb_call requests are processed
344 in an event driven manner
346 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
347 struct ctdb_call *call,
348 struct ctdb_ltdb_header *header,
351 struct ctdb_client_call_state *state;
352 struct ctdb_context *ctdb = ctdb_db->ctdb;
355 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
356 CTDB_NO_MEMORY_NULL(ctdb, state);
357 state->call = talloc_zero(state, struct ctdb_call);
358 CTDB_NO_MEMORY_NULL(ctdb, state->call);
360 talloc_steal(state, data->dptr);
362 state->state = CTDB_CALL_DONE;
363 *(state->call) = *call;
364 state->ctdb_db = ctdb_db;
366 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
372 make a ctdb call to the local daemon - async send. Called from client context.
374 This constructs a ctdb_call request and queues it for processing.
375 This call never blocks.
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
378 struct ctdb_call *call)
380 struct ctdb_client_call_state *state;
381 struct ctdb_context *ctdb = ctdb_db->ctdb;
382 struct ctdb_ltdb_header header;
386 struct ctdb_req_call *c;
388 /* if the domain socket is not yet open, open it */
389 if (ctdb->daemon.sd==-1) {
390 ctdb_socket_connect(ctdb);
393 ret = ctdb_ltdb_lock(ctdb_db, call->key);
395 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
399 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
401 if (ret == 0 && header.dmaster == ctdb->pnn) {
402 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
403 talloc_free(data.dptr);
404 ctdb_ltdb_unlock(ctdb_db, call->key);
408 ctdb_ltdb_unlock(ctdb_db, call->key);
409 talloc_free(data.dptr);
411 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
413 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
416 state->call = talloc_zero(state, struct ctdb_call);
417 if (state->call == NULL) {
418 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
422 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
423 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
425 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
429 state->reqid = ctdb_reqid_new(ctdb, state);
430 state->ctdb_db = ctdb_db;
431 talloc_set_destructor(state, ctdb_client_call_destructor);
433 c->hdr.reqid = state->reqid;
434 c->flags = call->flags;
435 c->db_id = ctdb_db->db_id;
436 c->callid = call->call_id;
438 c->keylen = call->key.dsize;
439 c->calldatalen = call->call_data.dsize;
440 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
441 memcpy(&c->data[call->key.dsize],
442 call->call_data.dptr, call->call_data.dsize);
443 *(state->call) = *call;
444 state->call->call_data.dptr = &c->data[call->key.dsize];
445 state->call->key.dptr = &c->data[0];
447 state->state = CTDB_CALL_WAIT;
450 ctdb_client_queue_pkt(ctdb, &c->hdr);
457 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
459 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
461 struct ctdb_client_call_state *state;
463 state = ctdb_call_send(ctdb_db, call);
464 return ctdb_call_recv(state, call);
469 tell the daemon what messaging srvid we will use, and register the message
470 handler function in the client
472 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
473 ctdb_message_fn_t handler,
480 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
481 tdb_null, NULL, NULL, &status, NULL, NULL);
482 if (res != 0 || status != 0) {
483 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487 /* also need to register the handler with our own ctdb structure */
488 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 tell the daemon we no longer want a srvid
494 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
499 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
500 tdb_null, NULL, NULL, &status, NULL, NULL);
501 if (res != 0 || status != 0) {
502 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506 /* also need to register the handler with our own ctdb structure */
507 ctdb_deregister_message_handler(ctdb, srvid, private_data);
513 send a message - from client context
515 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
516 uint64_t srvid, TDB_DATA data)
518 struct ctdb_req_message *r;
521 len = offsetof(struct ctdb_req_message, data) + data.dsize;
522 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
523 len, struct ctdb_req_message);
524 CTDB_NO_MEMORY(ctdb, r);
526 r->hdr.destnode = pnn;
528 r->datalen = data.dsize;
529 memcpy(&r->data[0], data.dptr, data.dsize);
531 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
542 cancel a ctdb_fetch_lock operation, releasing the lock
544 static int fetch_lock_destructor(struct ctdb_record_handle *h)
546 ctdb_ltdb_unlock(h->ctdb_db, h->key);
551 force the migration of a record to this node
553 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
555 struct ctdb_call call;
557 call.call_id = CTDB_NULL_FUNC;
559 call.flags = CTDB_IMMEDIATE_MIGRATION;
560 return ctdb_call(ctdb_db, &call);
564 get a lock on a record, and return the records data. Blocks until it gets the lock
566 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
567 TDB_DATA key, TDB_DATA *data)
570 struct ctdb_record_handle *h;
573 procedure is as follows:
575 1) get the chain lock.
576 2) check if we are dmaster
577 3) if we are the dmaster then return handle
578 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
580 5) when we get the reply, goto (1)
583 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
588 h->ctdb_db = ctdb_db;
590 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
591 if (h->key.dptr == NULL) {
597 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
598 (const char *)key.dptr));
601 /* step 1 - get the chain lock */
602 ret = ctdb_ltdb_lock(ctdb_db, key);
604 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
609 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
611 talloc_set_destructor(h, fetch_lock_destructor);
613 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
615 /* when torturing, ensure we test the remote path */
616 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
618 h->header.dmaster = (uint32_t)-1;
622 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
624 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
625 ctdb_ltdb_unlock(ctdb_db, key);
626 ret = ctdb_client_force_migration(ctdb_db, key);
628 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
635 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
640 store some data to the record that was locked with ctdb_fetch_lock()
642 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
646 struct ctdb_rec_data *rec;
649 if (h->ctdb_db->persistent) {
653 ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
658 /* don't need the persistent_store control for non-persistent databases */
659 if (!h->ctdb_db->persistent) {
663 rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
665 DEBUG(DEBUG_ERR,("Unable to marshall record in ctdb_record_store\n"));
669 recdata.dptr = (uint8_t *)rec;
670 recdata.dsize = rec->length;
672 ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0,
673 CTDB_CONTROL_PERSISTENT_STORE, 0,
674 recdata, NULL, NULL, &status, NULL, NULL);
678 if (ret != 0 || status != 0) {
679 DEBUG(DEBUG_ERR,("Failed persistent store in ctdb_record_store\n"));
687 non-locking fetch of a record
689 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
690 TDB_DATA key, TDB_DATA *data)
692 struct ctdb_call call;
695 call.call_id = CTDB_FETCH_FUNC;
696 call.call_data.dptr = NULL;
697 call.call_data.dsize = 0;
699 ret = ctdb_call(ctdb_db, &call);
702 *data = call.reply_data;
703 talloc_steal(mem_ctx, data->dptr);
712 called when a control completes or timesout to invoke the callback
713 function the user provided
715 static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
716 struct timeval t, void *private_data)
718 struct ctdb_client_control_state *state;
719 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
722 state = talloc_get_type(private_data, struct ctdb_client_control_state);
723 talloc_steal(tmp_ctx, state);
725 ret = ctdb_control_recv(state->ctdb, state, state,
730 talloc_free(tmp_ctx);
734 called when a CTDB_REPLY_CONTROL packet comes in in the client
736 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
737 contains any reply data from the control
739 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
740 struct ctdb_req_header *hdr)
742 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
743 struct ctdb_client_control_state *state;
745 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
747 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
751 if (hdr->reqid != state->reqid) {
752 /* we found a record but it was the wrong one */
753 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
757 state->outdata.dptr = c->data;
758 state->outdata.dsize = c->datalen;
759 state->status = c->status;
761 state->errormsg = talloc_strndup(state,
762 (char *)&c->data[c->datalen],
766 /* state->outdata now uses resources from c so we dont want c
767 to just dissappear from under us while state is still alive
769 talloc_steal(state, c);
771 state->state = CTDB_CONTROL_DONE;
773 /* if we had a callback registered for this control, pull the response
774 and call the callback.
776 if (state->async.fn) {
777 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
783 destroy a ctdb_control in client
785 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
787 ctdb_reqid_remove(state->ctdb, state->reqid);
792 /* time out handler for ctdb_control */
793 static void control_timeout_func(struct event_context *ev, struct timed_event *te,
794 struct timeval t, void *private_data)
796 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
798 DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
800 state->state = CTDB_CONTROL_TIMEOUT;
802 /* if we had a callback registered for this control, pull the response
803 and call the callback.
805 if (state->async.fn) {
806 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
810 /* async version of send control request */
811 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
812 uint32_t destnode, uint64_t srvid,
813 uint32_t opcode, uint32_t flags, TDB_DATA data,
815 struct timeval *timeout,
818 struct ctdb_client_control_state *state;
820 struct ctdb_req_control *c;
827 /* if the domain socket is not yet open, open it */
828 if (ctdb->daemon.sd==-1) {
829 ctdb_socket_connect(ctdb);
832 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
833 CTDB_NO_MEMORY_NULL(ctdb, state);
836 state->reqid = ctdb_reqid_new(ctdb, state);
837 state->state = CTDB_CONTROL_WAIT;
838 state->errormsg = NULL;
840 talloc_set_destructor(state, ctdb_control_destructor);
842 len = offsetof(struct ctdb_req_control, data) + data.dsize;
843 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
844 len, struct ctdb_req_control);
846 CTDB_NO_MEMORY_NULL(ctdb, c);
847 c->hdr.reqid = state->reqid;
848 c->hdr.destnode = destnode;
849 c->hdr.reqid = state->reqid;
854 c->datalen = data.dsize;
856 memcpy(&c->data[0], data.dptr, data.dsize);
860 if (timeout && !timeval_is_zero(timeout)) {
861 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
864 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
870 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
879 /* async version of receive control reply */
880 int ctdb_control_recv(struct ctdb_context *ctdb,
881 struct ctdb_client_control_state *state,
883 TDB_DATA *outdata, int32_t *status, char **errormsg)
887 if (status != NULL) {
890 if (errormsg != NULL) {
898 /* prevent double free of state */
899 tmp_ctx = talloc_new(ctdb);
900 talloc_steal(tmp_ctx, state);
902 /* loop one event at a time until we either timeout or the control
905 while (state->state == CTDB_CONTROL_WAIT) {
906 event_loop_once(ctdb->ev);
909 if (state->state != CTDB_CONTROL_DONE) {
910 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
911 if (state->async.fn) {
912 state->async.fn(state);
914 talloc_free(tmp_ctx);
918 if (state->errormsg) {
919 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
921 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
923 if (state->async.fn) {
924 state->async.fn(state);
926 talloc_free(tmp_ctx);
931 *outdata = state->outdata;
932 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
936 *status = state->status;
939 if (state->async.fn) {
940 state->async.fn(state);
943 talloc_free(tmp_ctx);
950 send a ctdb control message
951 timeout specifies how long we should wait for a reply.
952 if timeout is NULL we wait indefinitely
954 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
955 uint32_t opcode, uint32_t flags, TDB_DATA data,
956 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
957 struct timeval *timeout,
960 struct ctdb_client_control_state *state;
962 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
963 flags, data, mem_ctx,
965 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
973 a process exists call. Returns 0 if process exists, -1 otherwise
975 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
981 data.dptr = (uint8_t*)&pid;
982 data.dsize = sizeof(pid);
984 ret = ctdb_control(ctdb, destnode, 0,
985 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
986 NULL, NULL, &status, NULL, NULL);
988 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
996 get remote statistics
998 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1004 ret = ctdb_control(ctdb, destnode, 0,
1005 CTDB_CONTROL_STATISTICS, 0, tdb_null,
1006 ctdb, &data, &res, NULL, NULL);
1007 if (ret != 0 || res != 0) {
1008 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1012 if (data.dsize != sizeof(struct ctdb_statistics)) {
1013 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1014 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1018 *status = *(struct ctdb_statistics *)data.dptr;
1019 talloc_free(data.dptr);
1025 shutdown a remote ctdb node
1027 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1029 struct ctdb_client_control_state *state;
1031 state = ctdb_control_send(ctdb, destnode, 0,
1032 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
1033 NULL, &timeout, NULL);
1034 if (state == NULL) {
1035 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1043 get vnn map from a remote node
1045 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1050 struct ctdb_vnn_map_wire *map;
1052 ret = ctdb_control(ctdb, destnode, 0,
1053 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1054 mem_ctx, &outdata, &res, &timeout, NULL);
1055 if (ret != 0 || res != 0) {
1056 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1060 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1061 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1062 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1063 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1067 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1068 CTDB_NO_MEMORY(ctdb, *vnnmap);
1069 (*vnnmap)->generation = map->generation;
1070 (*vnnmap)->size = map->size;
1071 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1073 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1074 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1075 talloc_free(outdata.dptr);
1082 get the recovery mode of a remote node
1084 struct ctdb_client_control_state *
1085 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1087 return ctdb_control_send(ctdb, destnode, 0,
1088 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1089 mem_ctx, &timeout, NULL);
1092 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1097 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1099 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1104 *recmode = (uint32_t)res;
1110 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1112 struct ctdb_client_control_state *state;
1114 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1115 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1122 set the recovery mode of a remote node
1124 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1130 data.dsize = sizeof(uint32_t);
1131 data.dptr = (unsigned char *)&recmode;
1133 ret = ctdb_control(ctdb, destnode, 0,
1134 CTDB_CONTROL_SET_RECMODE, 0, data,
1135 NULL, NULL, &res, &timeout, NULL);
1136 if (ret != 0 || res != 0) {
1137 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1147 get the recovery master of a remote node
1149 struct ctdb_client_control_state *
1150 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1151 struct timeval timeout, uint32_t destnode)
1153 return ctdb_control_send(ctdb, destnode, 0,
1154 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1155 mem_ctx, &timeout, NULL);
1158 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1163 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1165 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1170 *recmaster = (uint32_t)res;
1176 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1178 struct ctdb_client_control_state *state;
1180 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1181 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1186 set the recovery master of a remote node
1188 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1195 data.dsize = sizeof(uint32_t);
1196 data.dptr = (unsigned char *)&recmaster;
1198 ret = ctdb_control(ctdb, destnode, 0,
1199 CTDB_CONTROL_SET_RECMASTER, 0, data,
1200 NULL, NULL, &res, &timeout, NULL);
1201 if (ret != 0 || res != 0) {
1202 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1211 get a list of databases off a remote node
1213 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1214 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1220 ret = ctdb_control(ctdb, destnode, 0,
1221 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1222 mem_ctx, &outdata, &res, &timeout, NULL);
1223 if (ret != 0 || res != 0) {
1224 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed\n"));
1228 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1229 talloc_free(outdata.dptr);
1235 get a list of nodes (vnn and flags ) from a remote node
1237 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1238 struct timeval timeout, uint32_t destnode,
1239 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1245 ret = ctdb_control(ctdb, destnode, 0,
1246 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1247 mem_ctx, &outdata, &res, &timeout, NULL);
1248 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1249 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed\n"));
1253 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1254 talloc_free(outdata.dptr);
1260 drop the transport, reload the nodes file and restart the transport
1262 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1263 struct timeval timeout, uint32_t destnode)
1268 ret = ctdb_control(ctdb, destnode, 0,
1269 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1270 NULL, NULL, &res, &timeout, NULL);
1271 if (ret != 0 || res != 0) {
1272 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1281 set vnn map on a node
1283 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1284 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1289 struct ctdb_vnn_map_wire *map;
1292 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1293 map = talloc_size(mem_ctx, len);
1294 CTDB_NO_MEMORY(ctdb, map);
1296 map->generation = vnnmap->generation;
1297 map->size = vnnmap->size;
1298 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1301 data.dptr = (uint8_t *)map;
1303 ret = ctdb_control(ctdb, destnode, 0,
1304 CTDB_CONTROL_SETVNNMAP, 0, data,
1305 NULL, NULL, &res, &timeout, NULL);
1306 if (ret != 0 || res != 0) {
1307 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1318 async send for pull database
1320 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1321 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1322 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1325 struct ctdb_control_pulldb *pull;
1326 struct ctdb_client_control_state *state;
1328 pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1329 CTDB_NO_MEMORY_NULL(ctdb, pull);
1332 pull->lmaster = lmaster;
1334 indata.dsize = sizeof(struct ctdb_control_pulldb);
1335 indata.dptr = (unsigned char *)pull;
1337 state = ctdb_control_send(ctdb, destnode, 0,
1338 CTDB_CONTROL_PULL_DB, 0, indata,
1339 mem_ctx, &timeout, NULL);
1346 async recv for pull database
1348 int ctdb_ctrl_pulldb_recv(
1349 struct ctdb_context *ctdb,
1350 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1356 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1357 if ( (ret != 0) || (res != 0) ){
1358 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1366 pull all keys and records for a specific database on a node
1368 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1369 uint32_t dbid, uint32_t lmaster,
1370 TALLOC_CTX *mem_ctx, struct timeval timeout,
1373 struct ctdb_client_control_state *state;
1375 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1378 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1383 change dmaster for all keys in the database to the new value
1385 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1386 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1392 indata.dsize = 2*sizeof(uint32_t);
1393 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1395 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1396 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1398 ret = ctdb_control(ctdb, destnode, 0,
1399 CTDB_CONTROL_SET_DMASTER, 0, indata,
1400 NULL, NULL, &res, &timeout, NULL);
1401 if (ret != 0 || res != 0) {
1402 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1410 ping a node, return number of clients connected
1412 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1417 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1418 tdb_null, NULL, NULL, &res, NULL, NULL);
1426 find the real path to a ltdb
1428 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1435 data.dptr = (uint8_t *)&dbid;
1436 data.dsize = sizeof(dbid);
1438 ret = ctdb_control(ctdb, destnode, 0,
1439 CTDB_CONTROL_GETDBPATH, 0, data,
1440 mem_ctx, &data, &res, &timeout, NULL);
1441 if (ret != 0 || res != 0) {
1445 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1446 if ((*path) == NULL) {
1450 talloc_free(data.dptr);
1456 find the name of a db
1458 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1465 data.dptr = (uint8_t *)&dbid;
1466 data.dsize = sizeof(dbid);
1468 ret = ctdb_control(ctdb, destnode, 0,
1469 CTDB_CONTROL_GET_DBNAME, 0, data,
1470 mem_ctx, &data, &res, &timeout, NULL);
1471 if (ret != 0 || res != 0) {
1475 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1476 if ((*name) == NULL) {
1480 talloc_free(data.dptr);
1488 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1489 TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1495 data.dptr = discard_const(name);
1496 data.dsize = strlen(name)+1;
1498 ret = ctdb_control(ctdb, destnode, 0,
1499 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1501 mem_ctx, &data, &res, &timeout, NULL);
1503 if (ret != 0 || res != 0) {
1511 get debug level on a node
1513 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1519 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1520 ctdb, &data, &res, NULL, NULL);
1521 if (ret != 0 || res != 0) {
1524 if (data.dsize != sizeof(int32_t)) {
1525 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1526 (unsigned)data.dsize));
1529 *level = *(int32_t *)data.dptr;
1530 talloc_free(data.dptr);
1535 set debug level on a node
1537 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1543 data.dptr = (uint8_t *)&level;
1544 data.dsize = sizeof(level);
1546 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1547 NULL, NULL, &res, NULL, NULL);
1548 if (ret != 0 || res != 0) {
1556 get a list of connected nodes
1558 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1559 struct timeval timeout,
1560 TALLOC_CTX *mem_ctx,
1561 uint32_t *num_nodes)
1563 struct ctdb_node_map *map=NULL;
1569 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1574 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1575 if (nodes == NULL) {
1579 for (i=0;i<map->num;i++) {
1580 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1581 nodes[*num_nodes] = map->nodes[i].pnn;
1593 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1598 ret = ctdb_control(ctdb, destnode, 0,
1599 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1600 NULL, NULL, &res, NULL, NULL);
1601 if (ret != 0 || res != 0) {
1602 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1609 this is the dummy null procedure that all databases support
1611 static int ctdb_null_func(struct ctdb_call_info *call)
1617 this is a plain fetch procedure that all databases support
1619 static int ctdb_fetch_func(struct ctdb_call_info *call)
1621 call->reply_data = &call->record_data;
1626 attach to a specific database - client call
1628 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1630 struct ctdb_db_context *ctdb_db;
1635 ctdb_db = ctdb_db_handle(ctdb, name);
1640 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1641 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1643 ctdb_db->ctdb = ctdb;
1644 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1645 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1647 data.dptr = discard_const(name);
1648 data.dsize = strlen(name)+1;
1650 /* tell ctdb daemon to attach */
1651 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
1652 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1653 0, data, ctdb_db, &data, &res, NULL, NULL);
1654 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1655 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1656 talloc_free(ctdb_db);
1660 ctdb_db->db_id = *(uint32_t *)data.dptr;
1661 talloc_free(data.dptr);
1663 ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1665 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1666 talloc_free(ctdb_db);
1670 tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1671 if (!ctdb->do_setsched) {
1672 tdb_flags |= TDB_NOMMAP;
1675 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1676 if (ctdb_db->ltdb == NULL) {
1677 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1678 talloc_free(ctdb_db);
1682 ctdb_db->persistent = persistent;
1684 DLIST_ADD(ctdb->db_list, ctdb_db);
1686 /* add well known functions */
1687 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1688 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1695 setup a call for a database
1697 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1699 struct ctdb_registered_call *call;
1704 struct ctdb_control_set_call c;
1707 /* this is no longer valid with the separate daemon architecture */
1708 c.db_id = ctdb_db->db_id;
1712 data.dptr = (uint8_t *)&c;
1713 data.dsize = sizeof(c);
1715 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1716 data, NULL, NULL, &status, NULL, NULL);
1717 if (ret != 0 || status != 0) {
1718 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1723 /* also register locally */
1724 call = talloc(ctdb_db, struct ctdb_registered_call);
1728 DLIST_ADD(ctdb_db->calls, call);
1733 struct traverse_state {
1736 ctdb_traverse_func fn;
1741 called on each key during a ctdb_traverse
1743 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1745 struct traverse_state *state = (struct traverse_state *)p;
1746 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1749 if (data.dsize < sizeof(uint32_t) ||
1750 d->length != data.dsize) {
1751 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1756 key.dsize = d->keylen;
1757 key.dptr = &d->data[0];
1758 data.dsize = d->datalen;
1759 data.dptr = &d->data[d->keylen];
1761 if (key.dsize == 0 && data.dsize == 0) {
1762 /* end of traverse */
1767 if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1768 /* empty records are deleted records in ctdb */
1772 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1781 start a cluster wide traverse, calling the supplied fn on each record
1782 return the number of records traversed, or -1 on error
1784 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1787 struct ctdb_traverse_start t;
1790 uint64_t srvid = (getpid() | 0xFLL<<60);
1791 struct traverse_state state;
1795 state.private_data = private_data;
1798 ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1800 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1804 t.db_id = ctdb_db->db_id;
1808 data.dptr = (uint8_t *)&t;
1809 data.dsize = sizeof(t);
1811 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1812 data, NULL, NULL, &status, NULL, NULL);
1813 if (ret != 0 || status != 0) {
1814 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1815 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1819 while (!state.done) {
1820 event_loop_once(ctdb_db->ctdb->ev);
1823 ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1825 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1832 #define ISASCII(x) ((x>31)&&(x<128))
1834 called on each key during a catdb
1836 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1839 FILE *f = (FILE *)p;
1840 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1842 fprintf(f, "dmaster: %u\n", h->dmaster);
1843 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1845 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1846 for (i=0;i<key.dsize;i++) {
1847 if (ISASCII(key.dptr[i])) {
1848 fprintf(f, "%c", key.dptr[i]);
1850 fprintf(f, "\\%02X", key.dptr[i]);
1855 fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1856 for (i=sizeof(*h);i<data.dsize;i++) {
1857 if (ISASCII(data.dptr[i])) {
1858 fprintf(f, "%c", data.dptr[i]);
1860 fprintf(f, "\\%02X", data.dptr[i]);
1869 convenience function to list all keys to stdout
1871 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1873 return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1877 get the pid of a ctdb daemon
1879 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1884 ret = ctdb_control(ctdb, destnode, 0,
1885 CTDB_CONTROL_GET_PID, 0, tdb_null,
1886 NULL, NULL, &res, &timeout, NULL);
1888 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1899 async freeze send control
1901 struct ctdb_client_control_state *
1902 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1904 return ctdb_control_send(ctdb, destnode, 0,
1905 CTDB_CONTROL_FREEZE, 0, tdb_null,
1906 mem_ctx, &timeout, NULL);
1910 async freeze recv control
1912 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1917 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1918 if ( (ret != 0) || (res != 0) ){
1919 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1929 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1931 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1932 struct ctdb_client_control_state *state;
1935 state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1936 ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1937 talloc_free(tmp_ctx);
1945 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1950 ret = ctdb_control(ctdb, destnode, 0,
1951 CTDB_CONTROL_THAW, 0, tdb_null,
1952 NULL, NULL, &res, &timeout, NULL);
1953 if (ret != 0 || res != 0) {
1954 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1962 get pnn of a node, or -1
1964 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1969 ret = ctdb_control(ctdb, destnode, 0,
1970 CTDB_CONTROL_GET_PNN, 0, tdb_null,
1971 NULL, NULL, &res, &timeout, NULL);
1973 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1981 get the monitoring mode of a remote node
1983 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1988 ret = ctdb_control(ctdb, destnode, 0,
1989 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
1990 NULL, NULL, &res, &timeout, NULL);
1992 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2003 set the monitoring mode of a remote node to active
2005 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2010 ret = ctdb_control(ctdb, destnode, 0,
2011 CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null,
2012 NULL, NULL,NULL, &timeout, NULL);
2014 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2024 set the monitoring mode of a remote node to disable
2026 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2031 ret = ctdb_control(ctdb, destnode, 0,
2032 CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null,
2033 NULL, NULL, NULL, &timeout, NULL);
2035 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2047 sent to a node to make it take over an ip address
2049 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2050 uint32_t destnode, struct ctdb_public_ip *ip)
2056 data.dsize = sizeof(*ip);
2057 data.dptr = (uint8_t *)ip;
2059 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2060 NULL, &res, &timeout, NULL);
2062 if (ret != 0 || res != 0) {
2063 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2072 sent to a node to make it release an ip address
2074 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2075 uint32_t destnode, struct ctdb_public_ip *ip)
2081 data.dsize = sizeof(*ip);
2082 data.dptr = (uint8_t *)ip;
2084 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2085 NULL, &res, &timeout, NULL);
2087 if (ret != 0 || res != 0) {
2088 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2099 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2100 struct timeval timeout,
2102 const char *name, uint32_t *value)
2104 struct ctdb_control_get_tunable *t;
2105 TDB_DATA data, outdata;
2109 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2110 data.dptr = talloc_size(ctdb, data.dsize);
2111 CTDB_NO_MEMORY(ctdb, data.dptr);
2113 t = (struct ctdb_control_get_tunable *)data.dptr;
2114 t->length = strlen(name)+1;
2115 memcpy(t->name, name, t->length);
2117 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2118 &outdata, &res, &timeout, NULL);
2119 talloc_free(data.dptr);
2120 if (ret != 0 || res != 0) {
2121 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2125 if (outdata.dsize != sizeof(uint32_t)) {
2126 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2127 talloc_free(outdata.dptr);
2131 *value = *(uint32_t *)outdata.dptr;
2132 talloc_free(outdata.dptr);
2140 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2141 struct timeval timeout,
2143 const char *name, uint32_t value)
2145 struct ctdb_control_set_tunable *t;
2150 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2151 data.dptr = talloc_size(ctdb, data.dsize);
2152 CTDB_NO_MEMORY(ctdb, data.dptr);
2154 t = (struct ctdb_control_set_tunable *)data.dptr;
2155 t->length = strlen(name)+1;
2156 memcpy(t->name, name, t->length);
2159 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2160 NULL, &res, &timeout, NULL);
2161 talloc_free(data.dptr);
2162 if (ret != 0 || res != 0) {
2163 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2173 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2174 struct timeval timeout,
2176 TALLOC_CTX *mem_ctx,
2177 const char ***list, uint32_t *count)
2182 struct ctdb_control_list_tunable *t;
2185 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2186 mem_ctx, &outdata, &res, &timeout, NULL);
2187 if (ret != 0 || res != 0) {
2188 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2192 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2193 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2194 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2195 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2196 talloc_free(outdata.dptr);
2200 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2201 CTDB_NO_MEMORY(ctdb, p);
2203 talloc_free(outdata.dptr);
2208 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2209 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2210 CTDB_NO_MEMORY(ctdb, *list);
2211 (*list)[*count] = talloc_strdup(*list, s);
2212 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2222 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2223 struct timeval timeout, uint32_t destnode,
2224 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2230 ret = ctdb_control(ctdb, destnode, 0,
2231 CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null,
2232 mem_ctx, &outdata, &res, &timeout, NULL);
2233 if (ret != 0 || res != 0) {
2234 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2238 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2239 talloc_free(outdata.dptr);
2245 set/clear the permanent disabled bit on a remote node
2247 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2248 uint32_t set, uint32_t clear)
2252 struct ctdb_node_modflags m;
2258 data.dsize = sizeof(m);
2259 data.dptr = (unsigned char *)&m;
2261 ret = ctdb_control(ctdb, destnode, 0,
2262 CTDB_CONTROL_MODIFY_FLAGS, 0, data,
2263 NULL, NULL, &res, &timeout, NULL);
2264 if (ret != 0 || res != 0) {
2265 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for modflags failed\n"));
2276 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2277 struct timeval timeout,
2279 struct ctdb_tunable *tunables)
2285 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2286 &outdata, &res, &timeout, NULL);
2287 if (ret != 0 || res != 0) {
2288 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2292 if (outdata.dsize != sizeof(*tunables)) {
2293 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2294 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2298 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2299 talloc_free(outdata.dptr);
2304 add a public address to a node
2306 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2307 struct timeval timeout,
2309 struct ctdb_control_ip_iface *pub)
2315 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2316 data.dptr = (unsigned char *)pub;
2318 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2319 NULL, &res, &timeout, NULL);
2320 if (ret != 0 || res != 0) {
2321 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2329 delete a public address from a node
2331 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2332 struct timeval timeout,
2334 struct ctdb_control_ip_iface *pub)
2340 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2341 data.dptr = (unsigned char *)pub;
2343 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2344 NULL, &res, &timeout, NULL);
2345 if (ret != 0 || res != 0) {
2346 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2354 kill a tcp connection
2356 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb,
2357 struct timeval timeout,
2359 struct ctdb_control_killtcp *killtcp)
2365 data.dsize = sizeof(struct ctdb_control_killtcp);
2366 data.dptr = (unsigned char *)killtcp;
2368 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2369 NULL, &res, &timeout, NULL);
2370 if (ret != 0 || res != 0) {
2371 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2381 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2382 struct timeval timeout,
2384 ctdb_sock_addr *addr,
2390 struct ctdb_control_gratious_arp *gratious_arp;
2391 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2394 len = strlen(ifname)+1;
2395 gratious_arp = talloc_size(tmp_ctx,
2396 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2397 CTDB_NO_MEMORY(ctdb, gratious_arp);
2399 gratious_arp->addr = *addr;
2400 gratious_arp->len = len;
2401 memcpy(&gratious_arp->iface[0], ifname, len);
2404 data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2405 data.dptr = (unsigned char *)gratious_arp;
2407 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2408 NULL, &res, &timeout, NULL);
2409 if (ret != 0 || res != 0) {
2410 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2411 talloc_free(tmp_ctx);
2415 talloc_free(tmp_ctx);
2420 get a list of all tcp tickles that a node knows about for a particular vnn
2422 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2423 struct timeval timeout, uint32_t destnode,
2424 TALLOC_CTX *mem_ctx,
2425 struct sockaddr_in *ip,
2426 struct ctdb_control_tcp_tickle_list **list)
2429 TDB_DATA data, outdata;
2432 data.dptr = (uint8_t*)ip;
2433 data.dsize = sizeof(struct sockaddr_in);
2435 ret = ctdb_control(ctdb, destnode, 0,
2436 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
2437 mem_ctx, &outdata, &status, NULL, NULL);
2439 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2443 *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2449 register a server id
2451 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
2452 struct timeval timeout,
2453 struct ctdb_server_id *id)
2459 data.dsize = sizeof(struct ctdb_server_id);
2460 data.dptr = (unsigned char *)id;
2462 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2463 CTDB_CONTROL_REGISTER_SERVER_ID,
2465 NULL, &res, &timeout, NULL);
2466 if (ret != 0 || res != 0) {
2467 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2475 unregister a server id
2477 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
2478 struct timeval timeout,
2479 struct ctdb_server_id *id)
2485 data.dsize = sizeof(struct ctdb_server_id);
2486 data.dptr = (unsigned char *)id;
2488 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2489 CTDB_CONTROL_UNREGISTER_SERVER_ID,
2491 NULL, &res, &timeout, NULL);
2492 if (ret != 0 || res != 0) {
2493 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2502 check if a server id exists
2504 if a server id does exist, return *status == 1, otherwise *status == 0
2506 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
2507 struct timeval timeout,
2509 struct ctdb_server_id *id,
2516 data.dsize = sizeof(struct ctdb_server_id);
2517 data.dptr = (unsigned char *)id;
2519 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID,
2521 NULL, &res, &timeout, NULL);
2523 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2537 get the list of server ids that are registered on a node
2539 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2540 TALLOC_CTX *mem_ctx,
2541 struct timeval timeout, uint32_t destnode,
2542 struct ctdb_server_id_list **svid_list)
2548 ret = ctdb_control(ctdb, destnode, 0,
2549 CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null,
2550 mem_ctx, &outdata, &res, &timeout, NULL);
2551 if (ret != 0 || res != 0) {
2552 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2556 *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2562 initialise the ctdb daemon for client applications
2564 NOTE: In current code the daemon does not fork. This is for testing purposes only
2565 and to simplify the code.
2567 struct ctdb_context *ctdb_init(struct event_context *ev)
2569 struct ctdb_context *ctdb;
2571 ctdb = talloc_zero(ev, struct ctdb_context);
2573 ctdb->idr = idr_init(ctdb);
2574 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2576 ctdb_set_socketname(ctdb, CTDB_PATH);
2585 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2587 ctdb->flags |= flags;
2591 setup the local socket name
2593 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2595 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2600 return the pnn of this node
2602 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2609 get the uptime of a remote node
2611 struct ctdb_client_control_state *
2612 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2614 return ctdb_control_send(ctdb, destnode, 0,
2615 CTDB_CONTROL_UPTIME, 0, tdb_null,
2616 mem_ctx, &timeout, NULL);
2619 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2625 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2626 if (ret != 0 || res != 0) {
2627 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2631 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2636 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2638 struct ctdb_client_control_state *state;
2640 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2641 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2645 send a control to execute the "recovered" event script on a node
2647 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2652 ret = ctdb_control(ctdb, destnode, 0,
2653 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
2654 NULL, NULL, &status, &timeout, NULL);
2655 if (ret != 0 || status != 0) {
2656 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2664 callback for the async helpers used when sending the same control
2665 to multiple nodes in parallell.
2667 static void async_callback(struct ctdb_client_control_state *state)
2669 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2670 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2674 uint32_t destnode = state->c->hdr.destnode;
2676 /* one more node has responded with recmode data */
2679 /* if we failed to push the db, then return an error and let
2680 the main loop try again.
2682 if (state->state != CTDB_CONTROL_DONE) {
2683 if ( !data->dont_log_errors) {
2684 DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2687 if (data->fail_callback) {
2688 data->fail_callback(ctdb, destnode, res, outdata,
2689 data->callback_data);
2694 state->async.fn = NULL;
2696 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2697 if ((ret != 0) || (res != 0)) {
2698 if ( !data->dont_log_errors) {
2699 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2702 if (data->fail_callback) {
2703 data->fail_callback(ctdb, destnode, res, outdata,
2704 data->callback_data);
2707 if ((ret == 0) && (data->callback != NULL)) {
2708 data->callback(ctdb, destnode, res, outdata,
2709 data->callback_data);
2714 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2716 /* set up the callback functions */
2717 state->async.fn = async_callback;
2718 state->async.private_data = data;
2720 /* one more control to wait for to complete */
2725 /* wait for up to the maximum number of seconds allowed
2726 or until all nodes we expect a response from has replied
2728 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2730 while (data->count > 0) {
2731 event_loop_once(ctdb->ev);
2733 if (data->fail_count != 0) {
2734 if (!data->dont_log_errors) {
2735 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
2745 perform a simple control on the listed nodes
2746 The control cannot return data
2748 int ctdb_client_async_control(struct ctdb_context *ctdb,
2749 enum ctdb_controls opcode,
2751 struct timeval timeout,
2752 bool dont_log_errors,
2754 client_async_callback client_callback,
2755 client_async_callback fail_callback,
2756 void *callback_data)
2758 struct client_async_data *async_data;
2759 struct ctdb_client_control_state *state;
2762 async_data = talloc_zero(ctdb, struct client_async_data);
2763 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2764 async_data->dont_log_errors = dont_log_errors;
2765 async_data->callback = client_callback;
2766 async_data->fail_callback = fail_callback;
2767 async_data->callback_data = callback_data;
2768 async_data->opcode = opcode;
2770 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2772 /* loop over all nodes and send an async control to each of them */
2773 for (j=0; j<num_nodes; j++) {
2774 uint32_t pnn = nodes[j];
2776 state = ctdb_control_send(ctdb, pnn, 0, opcode,
2777 0, data, async_data, &timeout, NULL);
2778 if (state == NULL) {
2779 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2780 talloc_free(async_data);
2784 ctdb_client_async_add(async_data, state);
2787 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2788 talloc_free(async_data);
2792 talloc_free(async_data);
2796 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2797 struct ctdb_vnn_map *vnn_map,
2798 TALLOC_CTX *mem_ctx,
2801 int i, j, num_nodes;
2804 for (i=num_nodes=0;i<vnn_map->size;i++) {
2805 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2811 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2812 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2814 for (i=j=0;i<vnn_map->size;i++) {
2815 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2818 nodes[j++] = vnn_map->map[i];
2824 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2825 struct ctdb_node_map *node_map,
2826 TALLOC_CTX *mem_ctx,
2829 int i, j, num_nodes;
2832 for (i=num_nodes=0;i<node_map->num;i++) {
2833 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2836 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2842 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2843 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2845 for (i=j=0;i<node_map->num;i++) {
2846 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2849 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2852 nodes[j++] = node_map->nodes[i].pnn;
2859 this is used to test if a pnn lock exists and if it exists will return
2860 the number of connections that pnn has reported or -1 if that recovery
2861 daemon is not running.
2864 ctdb_read_pnn_lock(int fd, int32_t pnn)
2869 lock.l_type = F_WRLCK;
2870 lock.l_whence = SEEK_SET;
2875 if (fcntl(fd, F_GETLK, &lock) != 0) {
2876 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2880 if (lock.l_type == F_UNLCK) {
2884 if (pread(fd, &c, 1, pnn) == -1) {
2885 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2893 get capabilities of a remote node
2895 struct ctdb_client_control_state *
2896 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2898 return ctdb_control_send(ctdb, destnode, 0,
2899 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
2900 mem_ctx, &timeout, NULL);
2903 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2909 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2910 if ( (ret != 0) || (res != 0) ) {
2911 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2916 *capabilities = *((uint32_t *)outdata.dptr);
2922 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2924 struct ctdb_client_control_state *state;
2925 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2928 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2929 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2930 talloc_free(tmp_ctx);
2934 struct ctdb_transaction_handle {
2935 struct ctdb_db_context *ctdb_db;
2937 /* we store the reads and writes done under a transaction one
2938 list stores both reads and writes, the other just writes
2940 struct ctdb_marshall_buffer *m_all;
2941 struct ctdb_marshall_buffer *m_write;
2944 /* start a transaction on a database */
2945 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2947 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2951 /* start a transaction on a database */
2952 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2954 struct ctdb_record_handle *rh;
2956 struct ctdb_ltdb_header header;
2957 TALLOC_CTX *tmp_ctx;
2958 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2960 struct ctdb_db_context *ctdb_db = h->ctdb_db;
2962 key.dptr = discard_const(keyname);
2963 key.dsize = strlen(keyname);
2965 if (!ctdb_db->persistent) {
2966 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2971 tmp_ctx = talloc_new(h);
2973 rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2975 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2976 talloc_free(tmp_ctx);
2981 ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2983 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2984 talloc_free(tmp_ctx);
2988 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
2989 if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
2990 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2991 talloc_free(tmp_ctx);
2995 talloc_free(tmp_ctx);
3001 /* start a transaction on a database */
3002 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3003 TALLOC_CTX *mem_ctx)
3005 struct ctdb_transaction_handle *h;
3008 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3010 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
3014 h->ctdb_db = ctdb_db;
3016 ret = ctdb_transaction_fetch_start(h);
3022 talloc_set_destructor(h, ctdb_transaction_destructor);
3030 fetch a record inside a transaction
3032 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3033 TALLOC_CTX *mem_ctx,
3034 TDB_DATA key, TDB_DATA *data)
3036 struct ctdb_ltdb_header header;
3039 ZERO_STRUCT(header);
3041 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3042 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3043 /* record doesn't exist yet */
3052 if (!h->in_replay) {
3053 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3054 if (h->m_all == NULL) {
3055 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3064 stores a record inside a transaction
3066 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3067 TDB_DATA key, TDB_DATA data)
3069 TALLOC_CTX *tmp_ctx = talloc_new(h);
3070 struct ctdb_ltdb_header header;
3074 ZERO_STRUCT(header);
3076 /* we need the header so we can update the RSN */
3077 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3078 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3079 /* the record doesn't exist - create one with us as dmaster.
3080 This is only safe because we are in a transaction and this
3081 is a persistent database */
3082 header.dmaster = h->ctdb_db->ctdb->pnn;
3084 } else if (ret != 0) {
3085 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3086 talloc_free(tmp_ctx);
3090 if (data.dsize == olddata.dsize &&
3091 memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3092 /* save writing the same data */
3093 talloc_free(tmp_ctx);
3099 if (!h->in_replay) {
3100 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3101 if (h->m_all == NULL) {
3102 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3103 talloc_free(tmp_ctx);
3108 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3109 if (h->m_write == NULL) {
3110 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3111 talloc_free(tmp_ctx);
3115 ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3117 talloc_free(tmp_ctx);
3123 replay a transaction
3125 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3128 struct ctdb_rec_data *rec = NULL;
3130 h->in_replay = true;
3131 talloc_free(h->m_write);
3134 ret = ctdb_transaction_fetch_start(h);
3139 for (i=0;i<h->m_all->count;i++) {
3142 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3144 DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3148 if (rec->reqid == 0) {
3150 if (ctdb_transaction_store(h, key, data) != 0) {
3155 TALLOC_CTX *tmp_ctx = talloc_new(h);
3157 if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3158 talloc_free(tmp_ctx);
3161 if (data2.dsize != data.dsize ||
3162 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3163 /* the record has changed on us - we have to give up */
3164 talloc_free(tmp_ctx);
3167 talloc_free(tmp_ctx);
3174 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3180 commit a transaction
3182 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3186 struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3187 struct timeval timeout;
3188 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3190 talloc_set_destructor(h, NULL);
3192 /* our commit strategy is quite complex.
3194 - we first try to commit the changes to all other nodes
3196 - if that works, then we commit locally and we are done
3198 - if a commit on another node fails, then we need to cancel
3199 the transaction, then restart the transaction (thus
3200 opening a window of time for a pending recovery to
3201 complete), then replay the transaction, checking all the
3202 reads and writes (checking that reads give the same data,
3203 and writes succeed). Then we retry the transaction to the
3208 if (h->m_write == NULL) {
3209 /* no changes were made */
3210 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3215 /* tell ctdbd to commit to the other nodes */
3216 timeout = timeval_current_ofs(1, 0);
3217 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3218 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
3219 ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
3221 if (ret != 0 || status != 0) {
3222 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3226 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3228 /* work out what error code we will give if we
3229 have to fail the operation */
3230 switch ((enum ctdb_trans2_commit_error)status) {
3231 case CTDB_TRANS2_COMMIT_SUCCESS:
3232 case CTDB_TRANS2_COMMIT_SOMEFAIL:
3233 case CTDB_TRANS2_COMMIT_TIMEOUT:
3234 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3236 case CTDB_TRANS2_COMMIT_ALLFAIL:
3237 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3242 if (++retries == 10) {
3243 DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
3244 h->ctdb_db->db_id, retries, (unsigned)failure_control));
3245 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3246 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3247 tdb_null, NULL, NULL, NULL, NULL, NULL);
3252 if (ctdb_replay_transaction(h) != 0) {
3253 DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3254 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3255 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3256 tdb_null, NULL, NULL, NULL, NULL, NULL);
3262 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3265 /* do the real commit locally */
3266 ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3268 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3269 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3270 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3271 tdb_null, NULL, NULL, NULL, NULL, NULL);
3276 /* tell ctdbd that we are finished with our local commit */
3277 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3278 CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
3279 tdb_null, NULL, NULL, NULL, NULL, NULL);