4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
34 allocate a packet for use in client<->daemon communication
36 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38 enum ctdb_operation operation,
39 size_t length, size_t slength,
43 struct ctdb_req_header *hdr;
45 length = MAX(length, slength);
46 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48 hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
50 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
51 operation, (unsigned)length));
54 talloc_set_name_const(hdr, type);
55 memset(hdr, 0, slength);
57 hdr->operation = operation;
58 hdr->ctdb_magic = CTDB_MAGIC;
59 hdr->ctdb_version = CTDB_VERSION;
60 hdr->srcnode = ctdb->pnn;
62 hdr->generation = ctdb->vnn_map->generation;
69 local version of ctdb_call
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73 TDB_DATA *data, uint32_t caller)
75 struct ctdb_call_info *c;
76 struct ctdb_registered_call *fn;
77 struct ctdb_context *ctdb = ctdb_db->ctdb;
79 c = talloc(ctdb, struct ctdb_call_info);
80 CTDB_NO_MEMORY(ctdb, c);
83 c->call_data = &call->call_data;
84 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85 c->record_data.dsize = data->dsize;
86 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
91 for (fn=ctdb_db->calls;fn;fn=fn->next) {
92 if (fn->id == call->call_id) break;
95 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
100 if (fn->fn(c) != 0) {
101 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
106 if (header->laccessor != caller) {
109 header->laccessor = caller;
112 /* we need to force the record to be written out if this was a remote access,
113 so that the lacount is updated */
114 if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
115 c->new_data = &c->record_data;
119 /* XXX check that we always have the lock here? */
120 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
121 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
128 call->reply_data = *c->reply_data;
130 talloc_steal(call, call->reply_data.dptr);
131 talloc_set_name_const(call->reply_data.dptr, __location__);
133 call->reply_data.dptr = NULL;
134 call->reply_data.dsize = 0;
136 call->status = c->status;
145 queue a packet for sending from client to daemon
147 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
149 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
154 called when a CTDB_REPLY_CALL packet comes in in the client
156 This packet comes in response to a CTDB_REQ_CALL request packet. It
157 contains any reply data from the call
159 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
161 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
162 struct ctdb_client_call_state *state;
164 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
166 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
170 if (hdr->reqid != state->reqid) {
171 /* we found a record but it was the wrong one */
172 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
176 state->call->reply_data.dptr = c->data;
177 state->call->reply_data.dsize = c->datalen;
178 state->call->status = c->status;
180 talloc_steal(state, c);
182 state->state = CTDB_CALL_DONE;
184 if (state->async.fn) {
185 state->async.fn(state);
189 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192 this is called in the client, when data comes in from the daemon
194 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
196 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
197 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200 /* place the packet as a child of a tmp_ctx. We then use
201 talloc_free() below to free it. If any of the calls want
202 to keep it, then they will steal it somewhere else, and the
203 talloc_free() will be a no-op */
204 tmp_ctx = talloc_new(ctdb);
205 talloc_steal(tmp_ctx, hdr);
208 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
212 if (cnt < sizeof(*hdr)) {
213 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216 if (cnt != hdr->length) {
217 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
218 (unsigned)hdr->length, (unsigned)cnt);
222 if (hdr->ctdb_magic != CTDB_MAGIC) {
223 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
227 if (hdr->ctdb_version != CTDB_VERSION) {
228 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
232 switch (hdr->operation) {
233 case CTDB_REPLY_CALL:
234 ctdb_client_reply_call(ctdb, hdr);
237 case CTDB_REQ_MESSAGE:
238 ctdb_request_message(ctdb, hdr);
241 case CTDB_REPLY_CONTROL:
242 ctdb_client_reply_control(ctdb, hdr);
246 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
250 talloc_free(tmp_ctx);
254 connect to a unix domain socket
256 int ctdb_socket_connect(struct ctdb_context *ctdb)
258 struct sockaddr_un addr;
260 memset(&addr, 0, sizeof(addr));
261 addr.sun_family = AF_UNIX;
262 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
264 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
265 if (ctdb->daemon.sd == -1) {
269 set_nonblocking(ctdb->daemon.sd);
270 set_close_on_exec(ctdb->daemon.sd);
272 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
273 close(ctdb->daemon.sd);
274 ctdb->daemon.sd = -1;
278 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
280 ctdb_client_read_cb, ctdb);
285 struct ctdb_record_handle {
286 struct ctdb_db_context *ctdb_db;
289 struct ctdb_ltdb_header header;
294 make a recv call to the local ctdb daemon - called from client context
296 This is called when the program wants to wait for a ctdb_call to complete and get the
297 results. This call will block unless the call has already completed.
299 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
305 while (state->state < CTDB_CALL_DONE) {
306 event_loop_once(state->ctdb_db->ctdb->ev);
308 if (state->state != CTDB_CALL_DONE) {
309 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314 if (state->call->reply_data.dsize) {
315 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
316 state->call->reply_data.dptr,
317 state->call->reply_data.dsize);
318 call->reply_data.dsize = state->call->reply_data.dsize;
320 call->reply_data.dptr = NULL;
321 call->reply_data.dsize = 0;
323 call->status = state->call->status;
333 destroy a ctdb_call in client
335 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
337 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342 construct an event driven local ctdb_call
344 this is used so that locally processed ctdb_call requests are processed
345 in an event driven manner
347 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
348 struct ctdb_call *call,
349 struct ctdb_ltdb_header *header,
352 struct ctdb_client_call_state *state;
353 struct ctdb_context *ctdb = ctdb_db->ctdb;
356 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
357 CTDB_NO_MEMORY_NULL(ctdb, state);
358 state->call = talloc_zero(state, struct ctdb_call);
359 CTDB_NO_MEMORY_NULL(ctdb, state->call);
361 talloc_steal(state, data->dptr);
363 state->state = CTDB_CALL_DONE;
364 *(state->call) = *call;
365 state->ctdb_db = ctdb_db;
367 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
373 make a ctdb call to the local daemon - async send. Called from client context.
375 This constructs a ctdb_call request and queues it for processing.
376 This call never blocks.
378 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
379 struct ctdb_call *call)
381 struct ctdb_client_call_state *state;
382 struct ctdb_context *ctdb = ctdb_db->ctdb;
383 struct ctdb_ltdb_header header;
387 struct ctdb_req_call *c;
389 /* if the domain socket is not yet open, open it */
390 if (ctdb->daemon.sd==-1) {
391 ctdb_socket_connect(ctdb);
394 ret = ctdb_ltdb_lock(ctdb_db, call->key);
396 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
400 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
402 if (ret == 0 && header.dmaster == ctdb->pnn) {
403 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
404 talloc_free(data.dptr);
405 ctdb_ltdb_unlock(ctdb_db, call->key);
409 ctdb_ltdb_unlock(ctdb_db, call->key);
410 talloc_free(data.dptr);
412 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
414 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
417 state->call = talloc_zero(state, struct ctdb_call);
418 if (state->call == NULL) {
419 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
424 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
426 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430 state->reqid = ctdb_reqid_new(ctdb, state);
431 state->ctdb_db = ctdb_db;
432 talloc_set_destructor(state, ctdb_client_call_destructor);
434 c->hdr.reqid = state->reqid;
435 c->flags = call->flags;
436 c->db_id = ctdb_db->db_id;
437 c->callid = call->call_id;
439 c->keylen = call->key.dsize;
440 c->calldatalen = call->call_data.dsize;
441 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
442 memcpy(&c->data[call->key.dsize],
443 call->call_data.dptr, call->call_data.dsize);
444 *(state->call) = *call;
445 state->call->call_data.dptr = &c->data[call->key.dsize];
446 state->call->key.dptr = &c->data[0];
448 state->state = CTDB_CALL_WAIT;
451 ctdb_client_queue_pkt(ctdb, &c->hdr);
458 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
460 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
462 struct ctdb_client_call_state *state;
464 state = ctdb_call_send(ctdb_db, call);
465 return ctdb_call_recv(state, call);
470 tell the daemon what messaging srvid we will use, and register the message
471 handler function in the client
473 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
474 ctdb_message_fn_t handler,
481 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
482 tdb_null, NULL, NULL, &status, NULL, NULL);
483 if (res != 0 || status != 0) {
484 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
488 /* also need to register the handler with our own ctdb structure */
489 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
493 tell the daemon we no longer want a srvid
495 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
500 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
501 tdb_null, NULL, NULL, &status, NULL, NULL);
502 if (res != 0 || status != 0) {
503 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
507 /* also need to register the handler with our own ctdb structure */
508 ctdb_deregister_message_handler(ctdb, srvid, private_data);
514 send a message - from client context
516 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
517 uint64_t srvid, TDB_DATA data)
519 struct ctdb_req_message *r;
522 len = offsetof(struct ctdb_req_message, data) + data.dsize;
523 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
524 len, struct ctdb_req_message);
525 CTDB_NO_MEMORY(ctdb, r);
527 r->hdr.destnode = pnn;
529 r->datalen = data.dsize;
530 memcpy(&r->data[0], data.dptr, data.dsize);
532 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
543 cancel a ctdb_fetch_lock operation, releasing the lock
545 static int fetch_lock_destructor(struct ctdb_record_handle *h)
547 ctdb_ltdb_unlock(h->ctdb_db, h->key);
552 force the migration of a record to this node
554 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
556 struct ctdb_call call;
558 call.call_id = CTDB_NULL_FUNC;
560 call.flags = CTDB_IMMEDIATE_MIGRATION;
561 return ctdb_call(ctdb_db, &call);
565 get a lock on a record, and return the records data. Blocks until it gets the lock
567 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
568 TDB_DATA key, TDB_DATA *data)
571 struct ctdb_record_handle *h;
574 procedure is as follows:
576 1) get the chain lock.
577 2) check if we are dmaster
578 3) if we are the dmaster then return handle
579 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
581 5) when we get the reply, goto (1)
584 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
589 h->ctdb_db = ctdb_db;
591 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
592 if (h->key.dptr == NULL) {
598 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
599 (const char *)key.dptr));
602 /* step 1 - get the chain lock */
603 ret = ctdb_ltdb_lock(ctdb_db, key);
605 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
610 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
612 talloc_set_destructor(h, fetch_lock_destructor);
614 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
616 /* when torturing, ensure we test the remote path */
617 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
619 h->header.dmaster = (uint32_t)-1;
623 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
625 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
626 ctdb_ltdb_unlock(ctdb_db, key);
627 ret = ctdb_client_force_migration(ctdb_db, key);
629 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
636 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
641 store some data to the record that was locked with ctdb_fetch_lock()
643 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
647 struct ctdb_rec_data *rec;
650 if (h->ctdb_db->persistent) {
654 ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
659 /* don't need the persistent_store control for non-persistent databases */
660 if (!h->ctdb_db->persistent) {
664 rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
666 DEBUG(DEBUG_ERR,("Unable to marshall record in ctdb_record_store\n"));
670 recdata.dptr = (uint8_t *)rec;
671 recdata.dsize = rec->length;
673 ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0,
674 CTDB_CONTROL_PERSISTENT_STORE, 0,
675 recdata, NULL, NULL, &status, NULL, NULL);
679 if (ret != 0 || status != 0) {
680 DEBUG(DEBUG_ERR,("Failed persistent store in ctdb_record_store\n"));
688 non-locking fetch of a record
690 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
691 TDB_DATA key, TDB_DATA *data)
693 struct ctdb_call call;
696 call.call_id = CTDB_FETCH_FUNC;
697 call.call_data.dptr = NULL;
698 call.call_data.dsize = 0;
700 ret = ctdb_call(ctdb_db, &call);
703 *data = call.reply_data;
704 talloc_steal(mem_ctx, data->dptr);
713 called when a control completes or timesout to invoke the callback
714 function the user provided
716 static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
717 struct timeval t, void *private_data)
719 struct ctdb_client_control_state *state;
720 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
723 state = talloc_get_type(private_data, struct ctdb_client_control_state);
724 talloc_steal(tmp_ctx, state);
726 ret = ctdb_control_recv(state->ctdb, state, state,
731 talloc_free(tmp_ctx);
735 called when a CTDB_REPLY_CONTROL packet comes in in the client
737 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
738 contains any reply data from the control
740 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
741 struct ctdb_req_header *hdr)
743 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
744 struct ctdb_client_control_state *state;
746 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
748 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
752 if (hdr->reqid != state->reqid) {
753 /* we found a record but it was the wrong one */
754 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
758 state->outdata.dptr = c->data;
759 state->outdata.dsize = c->datalen;
760 state->status = c->status;
762 state->errormsg = talloc_strndup(state,
763 (char *)&c->data[c->datalen],
767 /* state->outdata now uses resources from c so we dont want c
768 to just dissappear from under us while state is still alive
770 talloc_steal(state, c);
772 state->state = CTDB_CONTROL_DONE;
774 /* if we had a callback registered for this control, pull the response
775 and call the callback.
777 if (state->async.fn) {
778 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
784 destroy a ctdb_control in client
786 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
788 ctdb_reqid_remove(state->ctdb, state->reqid);
793 /* time out handler for ctdb_control */
794 static void control_timeout_func(struct event_context *ev, struct timed_event *te,
795 struct timeval t, void *private_data)
797 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
799 DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
801 state->state = CTDB_CONTROL_TIMEOUT;
803 /* if we had a callback registered for this control, pull the response
804 and call the callback.
806 if (state->async.fn) {
807 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
811 /* async version of send control request */
812 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
813 uint32_t destnode, uint64_t srvid,
814 uint32_t opcode, uint32_t flags, TDB_DATA data,
816 struct timeval *timeout,
819 struct ctdb_client_control_state *state;
821 struct ctdb_req_control *c;
828 /* if the domain socket is not yet open, open it */
829 if (ctdb->daemon.sd==-1) {
830 ctdb_socket_connect(ctdb);
833 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
834 CTDB_NO_MEMORY_NULL(ctdb, state);
837 state->reqid = ctdb_reqid_new(ctdb, state);
838 state->state = CTDB_CONTROL_WAIT;
839 state->errormsg = NULL;
841 talloc_set_destructor(state, ctdb_control_destructor);
843 len = offsetof(struct ctdb_req_control, data) + data.dsize;
844 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
845 len, struct ctdb_req_control);
847 CTDB_NO_MEMORY_NULL(ctdb, c);
848 c->hdr.reqid = state->reqid;
849 c->hdr.destnode = destnode;
850 c->hdr.reqid = state->reqid;
855 c->datalen = data.dsize;
857 memcpy(&c->data[0], data.dptr, data.dsize);
861 if (timeout && !timeval_is_zero(timeout)) {
862 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
865 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
871 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
880 /* async version of receive control reply */
881 int ctdb_control_recv(struct ctdb_context *ctdb,
882 struct ctdb_client_control_state *state,
884 TDB_DATA *outdata, int32_t *status, char **errormsg)
888 if (status != NULL) {
891 if (errormsg != NULL) {
899 /* prevent double free of state */
900 tmp_ctx = talloc_new(ctdb);
901 talloc_steal(tmp_ctx, state);
903 /* loop one event at a time until we either timeout or the control
906 while (state->state == CTDB_CONTROL_WAIT) {
907 event_loop_once(ctdb->ev);
910 if (state->state != CTDB_CONTROL_DONE) {
911 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
912 if (state->async.fn) {
913 state->async.fn(state);
915 talloc_free(tmp_ctx);
919 if (state->errormsg) {
920 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
922 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
924 if (state->async.fn) {
925 state->async.fn(state);
927 talloc_free(tmp_ctx);
932 *outdata = state->outdata;
933 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
937 *status = state->status;
940 if (state->async.fn) {
941 state->async.fn(state);
944 talloc_free(tmp_ctx);
951 send a ctdb control message
952 timeout specifies how long we should wait for a reply.
953 if timeout is NULL we wait indefinitely
955 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
956 uint32_t opcode, uint32_t flags, TDB_DATA data,
957 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
958 struct timeval *timeout,
961 struct ctdb_client_control_state *state;
963 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
964 flags, data, mem_ctx,
966 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
974 a process exists call. Returns 0 if process exists, -1 otherwise
976 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
982 data.dptr = (uint8_t*)&pid;
983 data.dsize = sizeof(pid);
985 ret = ctdb_control(ctdb, destnode, 0,
986 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
987 NULL, NULL, &status, NULL, NULL);
989 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
997 get remote statistics
999 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1005 ret = ctdb_control(ctdb, destnode, 0,
1006 CTDB_CONTROL_STATISTICS, 0, tdb_null,
1007 ctdb, &data, &res, NULL, NULL);
1008 if (ret != 0 || res != 0) {
1009 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1013 if (data.dsize != sizeof(struct ctdb_statistics)) {
1014 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1015 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1019 *status = *(struct ctdb_statistics *)data.dptr;
1020 talloc_free(data.dptr);
1026 shutdown a remote ctdb node
1028 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1030 struct ctdb_client_control_state *state;
1032 state = ctdb_control_send(ctdb, destnode, 0,
1033 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
1034 NULL, &timeout, NULL);
1035 if (state == NULL) {
1036 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1044 get vnn map from a remote node
1046 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1051 struct ctdb_vnn_map_wire *map;
1053 ret = ctdb_control(ctdb, destnode, 0,
1054 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1055 mem_ctx, &outdata, &res, &timeout, NULL);
1056 if (ret != 0 || res != 0) {
1057 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1061 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1062 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1063 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1064 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1068 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1069 CTDB_NO_MEMORY(ctdb, *vnnmap);
1070 (*vnnmap)->generation = map->generation;
1071 (*vnnmap)->size = map->size;
1072 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1074 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1075 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1076 talloc_free(outdata.dptr);
1083 get the recovery mode of a remote node
1085 struct ctdb_client_control_state *
1086 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1088 return ctdb_control_send(ctdb, destnode, 0,
1089 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1090 mem_ctx, &timeout, NULL);
1093 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1098 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1100 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1105 *recmode = (uint32_t)res;
1111 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1113 struct ctdb_client_control_state *state;
1115 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1116 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1123 set the recovery mode of a remote node
1125 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1131 data.dsize = sizeof(uint32_t);
1132 data.dptr = (unsigned char *)&recmode;
1134 ret = ctdb_control(ctdb, destnode, 0,
1135 CTDB_CONTROL_SET_RECMODE, 0, data,
1136 NULL, NULL, &res, &timeout, NULL);
1137 if (ret != 0 || res != 0) {
1138 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1148 get the recovery master of a remote node
1150 struct ctdb_client_control_state *
1151 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1152 struct timeval timeout, uint32_t destnode)
1154 return ctdb_control_send(ctdb, destnode, 0,
1155 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1156 mem_ctx, &timeout, NULL);
1159 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1164 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1166 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1171 *recmaster = (uint32_t)res;
1177 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1179 struct ctdb_client_control_state *state;
1181 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1182 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1187 set the recovery master of a remote node
1189 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1196 data.dsize = sizeof(uint32_t);
1197 data.dptr = (unsigned char *)&recmaster;
1199 ret = ctdb_control(ctdb, destnode, 0,
1200 CTDB_CONTROL_SET_RECMASTER, 0, data,
1201 NULL, NULL, &res, &timeout, NULL);
1202 if (ret != 0 || res != 0) {
1203 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1212 get a list of databases off a remote node
1214 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1215 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1221 ret = ctdb_control(ctdb, destnode, 0,
1222 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1223 mem_ctx, &outdata, &res, &timeout, NULL);
1224 if (ret != 0 || res != 0) {
1225 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1229 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1230 talloc_free(outdata.dptr);
1236 get a list of nodes (vnn and flags ) from a remote node
1238 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1239 struct timeval timeout, uint32_t destnode,
1240 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1246 ret = ctdb_control(ctdb, destnode, 0,
1247 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1248 mem_ctx, &outdata, &res, &timeout, NULL);
1249 if (ret == 0 && res == -1 && outdata.dsize == 0) {
1250 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1251 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1253 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1254 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1258 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1259 talloc_free(outdata.dptr);
1265 old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1267 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb,
1268 struct timeval timeout, uint32_t destnode,
1269 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1273 struct ctdb_node_mapv4 *nodemapv4;
1276 ret = ctdb_control(ctdb, destnode, 0,
1277 CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null,
1278 mem_ctx, &outdata, &res, &timeout, NULL);
1279 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1280 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1284 nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1286 len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1287 (*nodemap) = talloc_zero_size(mem_ctx, len);
1288 CTDB_NO_MEMORY(ctdb, (*nodemap));
1290 (*nodemap)->num = nodemapv4->num;
1291 for (i=0; i<nodemapv4->num; i++) {
1292 (*nodemap)->nodes[i].pnn = nodemapv4->nodes[i].pnn;
1293 (*nodemap)->nodes[i].flags = nodemapv4->nodes[i].flags;
1294 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1295 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1298 talloc_free(outdata.dptr);
1304 drop the transport, reload the nodes file and restart the transport
1306 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1307 struct timeval timeout, uint32_t destnode)
1312 ret = ctdb_control(ctdb, destnode, 0,
1313 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1314 NULL, NULL, &res, &timeout, NULL);
1315 if (ret != 0 || res != 0) {
1316 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1325 set vnn map on a node
1327 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1328 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1333 struct ctdb_vnn_map_wire *map;
1336 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1337 map = talloc_size(mem_ctx, len);
1338 CTDB_NO_MEMORY(ctdb, map);
1340 map->generation = vnnmap->generation;
1341 map->size = vnnmap->size;
1342 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1345 data.dptr = (uint8_t *)map;
1347 ret = ctdb_control(ctdb, destnode, 0,
1348 CTDB_CONTROL_SETVNNMAP, 0, data,
1349 NULL, NULL, &res, &timeout, NULL);
1350 if (ret != 0 || res != 0) {
1351 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1362 async send for pull database
1364 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1365 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1366 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1369 struct ctdb_control_pulldb *pull;
1370 struct ctdb_client_control_state *state;
1372 pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1373 CTDB_NO_MEMORY_NULL(ctdb, pull);
1376 pull->lmaster = lmaster;
1378 indata.dsize = sizeof(struct ctdb_control_pulldb);
1379 indata.dptr = (unsigned char *)pull;
1381 state = ctdb_control_send(ctdb, destnode, 0,
1382 CTDB_CONTROL_PULL_DB, 0, indata,
1383 mem_ctx, &timeout, NULL);
1390 async recv for pull database
1392 int ctdb_ctrl_pulldb_recv(
1393 struct ctdb_context *ctdb,
1394 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1400 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1401 if ( (ret != 0) || (res != 0) ){
1402 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1410 pull all keys and records for a specific database on a node
1412 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1413 uint32_t dbid, uint32_t lmaster,
1414 TALLOC_CTX *mem_ctx, struct timeval timeout,
1417 struct ctdb_client_control_state *state;
1419 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1422 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1427 change dmaster for all keys in the database to the new value
1429 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1430 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1436 indata.dsize = 2*sizeof(uint32_t);
1437 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1439 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1440 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1442 ret = ctdb_control(ctdb, destnode, 0,
1443 CTDB_CONTROL_SET_DMASTER, 0, indata,
1444 NULL, NULL, &res, &timeout, NULL);
1445 if (ret != 0 || res != 0) {
1446 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1454 ping a node, return number of clients connected
1456 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1461 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1462 tdb_null, NULL, NULL, &res, NULL, NULL);
1470 find the real path to a ltdb
1472 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1479 data.dptr = (uint8_t *)&dbid;
1480 data.dsize = sizeof(dbid);
1482 ret = ctdb_control(ctdb, destnode, 0,
1483 CTDB_CONTROL_GETDBPATH, 0, data,
1484 mem_ctx, &data, &res, &timeout, NULL);
1485 if (ret != 0 || res != 0) {
1489 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1490 if ((*path) == NULL) {
1494 talloc_free(data.dptr);
1500 find the name of a db
1502 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1509 data.dptr = (uint8_t *)&dbid;
1510 data.dsize = sizeof(dbid);
1512 ret = ctdb_control(ctdb, destnode, 0,
1513 CTDB_CONTROL_GET_DBNAME, 0, data,
1514 mem_ctx, &data, &res, &timeout, NULL);
1515 if (ret != 0 || res != 0) {
1519 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1520 if ((*name) == NULL) {
1524 talloc_free(data.dptr);
1532 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1533 TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1539 data.dptr = discard_const(name);
1540 data.dsize = strlen(name)+1;
1542 ret = ctdb_control(ctdb, destnode, 0,
1543 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1545 mem_ctx, &data, &res, &timeout, NULL);
1547 if (ret != 0 || res != 0) {
1555 get debug level on a node
1557 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1563 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1564 ctdb, &data, &res, NULL, NULL);
1565 if (ret != 0 || res != 0) {
1568 if (data.dsize != sizeof(int32_t)) {
1569 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1570 (unsigned)data.dsize));
1573 *level = *(int32_t *)data.dptr;
1574 talloc_free(data.dptr);
1579 set debug level on a node
1581 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1587 data.dptr = (uint8_t *)&level;
1588 data.dsize = sizeof(level);
1590 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1591 NULL, NULL, &res, NULL, NULL);
1592 if (ret != 0 || res != 0) {
1600 get a list of connected nodes
1602 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1603 struct timeval timeout,
1604 TALLOC_CTX *mem_ctx,
1605 uint32_t *num_nodes)
1607 struct ctdb_node_map *map=NULL;
1613 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1618 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1619 if (nodes == NULL) {
1623 for (i=0;i<map->num;i++) {
1624 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1625 nodes[*num_nodes] = map->nodes[i].pnn;
1637 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1642 ret = ctdb_control(ctdb, destnode, 0,
1643 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1644 NULL, NULL, &res, NULL, NULL);
1645 if (ret != 0 || res != 0) {
1646 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1653 this is the dummy null procedure that all databases support
1655 static int ctdb_null_func(struct ctdb_call_info *call)
1661 this is a plain fetch procedure that all databases support
1663 static int ctdb_fetch_func(struct ctdb_call_info *call)
1665 call->reply_data = &call->record_data;
1670 attach to a specific database - client call
1672 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1674 struct ctdb_db_context *ctdb_db;
1679 ctdb_db = ctdb_db_handle(ctdb, name);
1684 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1685 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1687 ctdb_db->ctdb = ctdb;
1688 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1689 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1691 data.dptr = discard_const(name);
1692 data.dsize = strlen(name)+1;
1694 /* tell ctdb daemon to attach */
1695 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
1696 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1697 0, data, ctdb_db, &data, &res, NULL, NULL);
1698 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1699 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1700 talloc_free(ctdb_db);
1704 ctdb_db->db_id = *(uint32_t *)data.dptr;
1705 talloc_free(data.dptr);
1707 ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1709 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1710 talloc_free(ctdb_db);
1714 tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1715 if (!ctdb->do_setsched) {
1716 tdb_flags |= TDB_NOMMAP;
1719 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1720 if (ctdb_db->ltdb == NULL) {
1721 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1722 talloc_free(ctdb_db);
1726 ctdb_db->persistent = persistent;
1728 DLIST_ADD(ctdb->db_list, ctdb_db);
1730 /* add well known functions */
1731 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1732 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1739 setup a call for a database
1741 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1743 struct ctdb_registered_call *call;
1748 struct ctdb_control_set_call c;
1751 /* this is no longer valid with the separate daemon architecture */
1752 c.db_id = ctdb_db->db_id;
1756 data.dptr = (uint8_t *)&c;
1757 data.dsize = sizeof(c);
1759 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1760 data, NULL, NULL, &status, NULL, NULL);
1761 if (ret != 0 || status != 0) {
1762 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1767 /* also register locally */
1768 call = talloc(ctdb_db, struct ctdb_registered_call);
1772 DLIST_ADD(ctdb_db->calls, call);
1777 struct traverse_state {
1780 ctdb_traverse_func fn;
1785 called on each key during a ctdb_traverse
1787 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1789 struct traverse_state *state = (struct traverse_state *)p;
1790 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1793 if (data.dsize < sizeof(uint32_t) ||
1794 d->length != data.dsize) {
1795 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1800 key.dsize = d->keylen;
1801 key.dptr = &d->data[0];
1802 data.dsize = d->datalen;
1803 data.dptr = &d->data[d->keylen];
1805 if (key.dsize == 0 && data.dsize == 0) {
1806 /* end of traverse */
1811 if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1812 /* empty records are deleted records in ctdb */
1816 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1825 start a cluster wide traverse, calling the supplied fn on each record
1826 return the number of records traversed, or -1 on error
1828 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1831 struct ctdb_traverse_start t;
1834 uint64_t srvid = (getpid() | 0xFLL<<60);
1835 struct traverse_state state;
1839 state.private_data = private_data;
1842 ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1844 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1848 t.db_id = ctdb_db->db_id;
1852 data.dptr = (uint8_t *)&t;
1853 data.dsize = sizeof(t);
1855 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1856 data, NULL, NULL, &status, NULL, NULL);
1857 if (ret != 0 || status != 0) {
1858 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1859 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1863 while (!state.done) {
1864 event_loop_once(ctdb_db->ctdb->ev);
1867 ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1869 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1876 #define ISASCII(x) ((x>31)&&(x<128))
1878 called on each key during a catdb
1880 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1883 FILE *f = (FILE *)p;
1884 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1886 fprintf(f, "dmaster: %u\n", h->dmaster);
1887 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1889 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1890 for (i=0;i<key.dsize;i++) {
1891 if (ISASCII(key.dptr[i])) {
1892 fprintf(f, "%c", key.dptr[i]);
1894 fprintf(f, "\\%02X", key.dptr[i]);
1899 fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1900 for (i=sizeof(*h);i<data.dsize;i++) {
1901 if (ISASCII(data.dptr[i])) {
1902 fprintf(f, "%c", data.dptr[i]);
1904 fprintf(f, "\\%02X", data.dptr[i]);
1913 convenience function to list all keys to stdout
1915 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1917 return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1921 get the pid of a ctdb daemon
1923 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1928 ret = ctdb_control(ctdb, destnode, 0,
1929 CTDB_CONTROL_GET_PID, 0, tdb_null,
1930 NULL, NULL, &res, &timeout, NULL);
1932 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1943 async freeze send control
1945 struct ctdb_client_control_state *
1946 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1948 return ctdb_control_send(ctdb, destnode, 0,
1949 CTDB_CONTROL_FREEZE, 0, tdb_null,
1950 mem_ctx, &timeout, NULL);
1954 async freeze recv control
1956 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1961 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1962 if ( (ret != 0) || (res != 0) ){
1963 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1973 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1975 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1976 struct ctdb_client_control_state *state;
1979 state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1980 ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1981 talloc_free(tmp_ctx);
1989 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1994 ret = ctdb_control(ctdb, destnode, 0,
1995 CTDB_CONTROL_THAW, 0, tdb_null,
1996 NULL, NULL, &res, &timeout, NULL);
1997 if (ret != 0 || res != 0) {
1998 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2006 get pnn of a node, or -1
2008 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2013 ret = ctdb_control(ctdb, destnode, 0,
2014 CTDB_CONTROL_GET_PNN, 0, tdb_null,
2015 NULL, NULL, &res, &timeout, NULL);
2017 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2025 get the monitoring mode of a remote node
2027 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2032 ret = ctdb_control(ctdb, destnode, 0,
2033 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
2034 NULL, NULL, &res, &timeout, NULL);
2036 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2047 set the monitoring mode of a remote node to active
2049 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2054 ret = ctdb_control(ctdb, destnode, 0,
2055 CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null,
2056 NULL, NULL,NULL, &timeout, NULL);
2058 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2068 set the monitoring mode of a remote node to disable
2070 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2075 ret = ctdb_control(ctdb, destnode, 0,
2076 CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null,
2077 NULL, NULL, NULL, &timeout, NULL);
2079 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2091 sent to a node to make it take over an ip address
2093 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2094 uint32_t destnode, struct ctdb_public_ip *ip)
2097 struct ctdb_public_ipv4 ipv4;
2101 if (ip->addr.sa.sa_family == AF_INET) {
2103 ipv4.sin = ip->addr.ip;
2105 data.dsize = sizeof(ipv4);
2106 data.dptr = (uint8_t *)&ipv4;
2108 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2109 NULL, &res, &timeout, NULL);
2111 data.dsize = sizeof(*ip);
2112 data.dptr = (uint8_t *)ip;
2114 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2115 NULL, &res, &timeout, NULL);
2118 if (ret != 0 || res != 0) {
2119 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2128 sent to a node to make it release an ip address
2130 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2131 uint32_t destnode, struct ctdb_public_ip *ip)
2134 struct ctdb_public_ipv4 ipv4;
2138 if (ip->addr.sa.sa_family == AF_INET) {
2140 ipv4.sin = ip->addr.ip;
2142 data.dsize = sizeof(ipv4);
2143 data.dptr = (uint8_t *)&ipv4;
2145 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2146 NULL, &res, &timeout, NULL);
2148 data.dsize = sizeof(*ip);
2149 data.dptr = (uint8_t *)ip;
2151 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2152 NULL, &res, &timeout, NULL);
2155 if (ret != 0 || res != 0) {
2156 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2167 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2168 struct timeval timeout,
2170 const char *name, uint32_t *value)
2172 struct ctdb_control_get_tunable *t;
2173 TDB_DATA data, outdata;
2177 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2178 data.dptr = talloc_size(ctdb, data.dsize);
2179 CTDB_NO_MEMORY(ctdb, data.dptr);
2181 t = (struct ctdb_control_get_tunable *)data.dptr;
2182 t->length = strlen(name)+1;
2183 memcpy(t->name, name, t->length);
2185 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2186 &outdata, &res, &timeout, NULL);
2187 talloc_free(data.dptr);
2188 if (ret != 0 || res != 0) {
2189 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2193 if (outdata.dsize != sizeof(uint32_t)) {
2194 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2195 talloc_free(outdata.dptr);
2199 *value = *(uint32_t *)outdata.dptr;
2200 talloc_free(outdata.dptr);
2208 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2209 struct timeval timeout,
2211 const char *name, uint32_t value)
2213 struct ctdb_control_set_tunable *t;
2218 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2219 data.dptr = talloc_size(ctdb, data.dsize);
2220 CTDB_NO_MEMORY(ctdb, data.dptr);
2222 t = (struct ctdb_control_set_tunable *)data.dptr;
2223 t->length = strlen(name)+1;
2224 memcpy(t->name, name, t->length);
2227 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2228 NULL, &res, &timeout, NULL);
2229 talloc_free(data.dptr);
2230 if (ret != 0 || res != 0) {
2231 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2241 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2242 struct timeval timeout,
2244 TALLOC_CTX *mem_ctx,
2245 const char ***list, uint32_t *count)
2250 struct ctdb_control_list_tunable *t;
2253 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2254 mem_ctx, &outdata, &res, &timeout, NULL);
2255 if (ret != 0 || res != 0) {
2256 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2260 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2261 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2262 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2263 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2264 talloc_free(outdata.dptr);
2268 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2269 CTDB_NO_MEMORY(ctdb, p);
2271 talloc_free(outdata.dptr);
2276 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2277 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2278 CTDB_NO_MEMORY(ctdb, *list);
2279 (*list)[*count] = talloc_strdup(*list, s);
2280 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2290 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2291 struct timeval timeout, uint32_t destnode,
2292 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2298 ret = ctdb_control(ctdb, destnode, 0,
2299 CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null,
2300 mem_ctx, &outdata, &res, &timeout, NULL);
2301 if (ret == 0 && res == -1) {
2302 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2303 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2305 if (ret != 0 || res != 0) {
2306 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2310 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2311 talloc_free(outdata.dptr);
2316 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb,
2317 struct timeval timeout, uint32_t destnode,
2318 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2323 struct ctdb_all_public_ipsv4 *ipsv4;
2325 ret = ctdb_control(ctdb, destnode, 0,
2326 CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null,
2327 mem_ctx, &outdata, &res, &timeout, NULL);
2328 if (ret != 0 || res != 0) {
2329 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2333 ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2334 len = offsetof(struct ctdb_all_public_ips, ips) +
2335 ipsv4->num*sizeof(struct ctdb_public_ip);
2336 *ips = talloc_zero_size(mem_ctx, len);
2337 (*ips)->num = ipsv4->num;
2338 for (i=0; i<ipsv4->num; i++) {
2339 (*ips)->ips[i].pnn = ipsv4->ips[i].pnn;
2340 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2343 talloc_free(outdata.dptr);
2349 set/clear the permanent disabled bit on a remote node
2351 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2352 uint32_t set, uint32_t clear)
2356 struct ctdb_node_modflags m;
2362 data.dsize = sizeof(m);
2363 data.dptr = (unsigned char *)&m;
2365 ret = ctdb_control(ctdb, destnode, 0,
2366 CTDB_CONTROL_MODIFY_FLAGS, 0, data,
2367 NULL, NULL, &res, &timeout, NULL);
2368 if (ret != 0 || res != 0) {
2369 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for modflags failed\n"));
2380 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2381 struct timeval timeout,
2383 struct ctdb_tunable *tunables)
2389 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2390 &outdata, &res, &timeout, NULL);
2391 if (ret != 0 || res != 0) {
2392 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2396 if (outdata.dsize != sizeof(*tunables)) {
2397 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2398 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2402 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2403 talloc_free(outdata.dptr);
2408 add a public address to a node
2410 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2411 struct timeval timeout,
2413 struct ctdb_control_ip_iface *pub)
2419 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2420 data.dptr = (unsigned char *)pub;
2422 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2423 NULL, &res, &timeout, NULL);
2424 if (ret != 0 || res != 0) {
2425 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2433 delete a public address from a node
2435 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2436 struct timeval timeout,
2438 struct ctdb_control_ip_iface *pub)
2444 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2445 data.dptr = (unsigned char *)pub;
2447 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2448 NULL, &res, &timeout, NULL);
2449 if (ret != 0 || res != 0) {
2450 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2458 kill a tcp connection
2460 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb,
2461 struct timeval timeout,
2463 struct ctdb_control_killtcp *killtcp)
2469 data.dsize = sizeof(struct ctdb_control_killtcp);
2470 data.dptr = (unsigned char *)killtcp;
2472 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2473 NULL, &res, &timeout, NULL);
2474 if (ret != 0 || res != 0) {
2475 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2485 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2486 struct timeval timeout,
2488 ctdb_sock_addr *addr,
2494 struct ctdb_control_gratious_arp *gratious_arp;
2495 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2498 len = strlen(ifname)+1;
2499 gratious_arp = talloc_size(tmp_ctx,
2500 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2501 CTDB_NO_MEMORY(ctdb, gratious_arp);
2503 gratious_arp->addr = *addr;
2504 gratious_arp->len = len;
2505 memcpy(&gratious_arp->iface[0], ifname, len);
2508 data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2509 data.dptr = (unsigned char *)gratious_arp;
2511 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2512 NULL, &res, &timeout, NULL);
2513 if (ret != 0 || res != 0) {
2514 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2515 talloc_free(tmp_ctx);
2519 talloc_free(tmp_ctx);
2524 get a list of all tcp tickles that a node knows about for a particular vnn
2526 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2527 struct timeval timeout, uint32_t destnode,
2528 TALLOC_CTX *mem_ctx,
2529 ctdb_sock_addr *addr,
2530 struct ctdb_control_tcp_tickle_list **list)
2533 TDB_DATA data, outdata;
2536 data.dptr = (uint8_t*)addr;
2537 data.dsize = sizeof(ctdb_sock_addr);
2539 ret = ctdb_control(ctdb, destnode, 0,
2540 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
2541 mem_ctx, &outdata, &status, NULL, NULL);
2542 if (ret != 0 || status != 0) {
2543 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2547 *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2553 register a server id
2555 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
2556 struct timeval timeout,
2557 struct ctdb_server_id *id)
2563 data.dsize = sizeof(struct ctdb_server_id);
2564 data.dptr = (unsigned char *)id;
2566 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2567 CTDB_CONTROL_REGISTER_SERVER_ID,
2569 NULL, &res, &timeout, NULL);
2570 if (ret != 0 || res != 0) {
2571 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2579 unregister a server id
2581 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
2582 struct timeval timeout,
2583 struct ctdb_server_id *id)
2589 data.dsize = sizeof(struct ctdb_server_id);
2590 data.dptr = (unsigned char *)id;
2592 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2593 CTDB_CONTROL_UNREGISTER_SERVER_ID,
2595 NULL, &res, &timeout, NULL);
2596 if (ret != 0 || res != 0) {
2597 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2606 check if a server id exists
2608 if a server id does exist, return *status == 1, otherwise *status == 0
2610 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
2611 struct timeval timeout,
2613 struct ctdb_server_id *id,
2620 data.dsize = sizeof(struct ctdb_server_id);
2621 data.dptr = (unsigned char *)id;
2623 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID,
2625 NULL, &res, &timeout, NULL);
2627 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2641 get the list of server ids that are registered on a node
2643 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2644 TALLOC_CTX *mem_ctx,
2645 struct timeval timeout, uint32_t destnode,
2646 struct ctdb_server_id_list **svid_list)
2652 ret = ctdb_control(ctdb, destnode, 0,
2653 CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null,
2654 mem_ctx, &outdata, &res, &timeout, NULL);
2655 if (ret != 0 || res != 0) {
2656 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2660 *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2666 initialise the ctdb daemon for client applications
2668 NOTE: In current code the daemon does not fork. This is for testing purposes only
2669 and to simplify the code.
2671 struct ctdb_context *ctdb_init(struct event_context *ev)
2673 struct ctdb_context *ctdb;
2675 ctdb = talloc_zero(ev, struct ctdb_context);
2677 ctdb->idr = idr_init(ctdb);
2678 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2680 ctdb_set_socketname(ctdb, CTDB_PATH);
2689 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2691 ctdb->flags |= flags;
2695 setup the local socket name
2697 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2699 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2704 return the pnn of this node
2706 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2713 get the uptime of a remote node
2715 struct ctdb_client_control_state *
2716 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2718 return ctdb_control_send(ctdb, destnode, 0,
2719 CTDB_CONTROL_UPTIME, 0, tdb_null,
2720 mem_ctx, &timeout, NULL);
2723 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2729 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2730 if (ret != 0 || res != 0) {
2731 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2735 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2740 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2742 struct ctdb_client_control_state *state;
2744 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2745 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2749 send a control to execute the "recovered" event script on a node
2751 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2756 ret = ctdb_control(ctdb, destnode, 0,
2757 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
2758 NULL, NULL, &status, &timeout, NULL);
2759 if (ret != 0 || status != 0) {
2760 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2768 callback for the async helpers used when sending the same control
2769 to multiple nodes in parallell.
2771 static void async_callback(struct ctdb_client_control_state *state)
2773 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2774 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2778 uint32_t destnode = state->c->hdr.destnode;
2780 /* one more node has responded with recmode data */
2783 /* if we failed to push the db, then return an error and let
2784 the main loop try again.
2786 if (state->state != CTDB_CONTROL_DONE) {
2787 if ( !data->dont_log_errors) {
2788 DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2791 if (data->fail_callback) {
2792 data->fail_callback(ctdb, destnode, res, outdata,
2793 data->callback_data);
2798 state->async.fn = NULL;
2800 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2801 if ((ret != 0) || (res != 0)) {
2802 if ( !data->dont_log_errors) {
2803 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2806 if (data->fail_callback) {
2807 data->fail_callback(ctdb, destnode, res, outdata,
2808 data->callback_data);
2811 if ((ret == 0) && (data->callback != NULL)) {
2812 data->callback(ctdb, destnode, res, outdata,
2813 data->callback_data);
2818 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2820 /* set up the callback functions */
2821 state->async.fn = async_callback;
2822 state->async.private_data = data;
2824 /* one more control to wait for to complete */
2829 /* wait for up to the maximum number of seconds allowed
2830 or until all nodes we expect a response from has replied
2832 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2834 while (data->count > 0) {
2835 event_loop_once(ctdb->ev);
2837 if (data->fail_count != 0) {
2838 if (!data->dont_log_errors) {
2839 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
2849 perform a simple control on the listed nodes
2850 The control cannot return data
2852 int ctdb_client_async_control(struct ctdb_context *ctdb,
2853 enum ctdb_controls opcode,
2855 struct timeval timeout,
2856 bool dont_log_errors,
2858 client_async_callback client_callback,
2859 client_async_callback fail_callback,
2860 void *callback_data)
2862 struct client_async_data *async_data;
2863 struct ctdb_client_control_state *state;
2866 async_data = talloc_zero(ctdb, struct client_async_data);
2867 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2868 async_data->dont_log_errors = dont_log_errors;
2869 async_data->callback = client_callback;
2870 async_data->fail_callback = fail_callback;
2871 async_data->callback_data = callback_data;
2872 async_data->opcode = opcode;
2874 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2876 /* loop over all nodes and send an async control to each of them */
2877 for (j=0; j<num_nodes; j++) {
2878 uint32_t pnn = nodes[j];
2880 state = ctdb_control_send(ctdb, pnn, 0, opcode,
2881 0, data, async_data, &timeout, NULL);
2882 if (state == NULL) {
2883 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2884 talloc_free(async_data);
2888 ctdb_client_async_add(async_data, state);
2891 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2892 talloc_free(async_data);
2896 talloc_free(async_data);
2900 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2901 struct ctdb_vnn_map *vnn_map,
2902 TALLOC_CTX *mem_ctx,
2905 int i, j, num_nodes;
2908 for (i=num_nodes=0;i<vnn_map->size;i++) {
2909 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2915 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2916 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2918 for (i=j=0;i<vnn_map->size;i++) {
2919 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2922 nodes[j++] = vnn_map->map[i];
2928 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2929 struct ctdb_node_map *node_map,
2930 TALLOC_CTX *mem_ctx,
2933 int i, j, num_nodes;
2936 for (i=num_nodes=0;i<node_map->num;i++) {
2937 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2940 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2946 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2947 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2949 for (i=j=0;i<node_map->num;i++) {
2950 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2953 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2956 nodes[j++] = node_map->nodes[i].pnn;
2963 this is used to test if a pnn lock exists and if it exists will return
2964 the number of connections that pnn has reported or -1 if that recovery
2965 daemon is not running.
2968 ctdb_read_pnn_lock(int fd, int32_t pnn)
2973 lock.l_type = F_WRLCK;
2974 lock.l_whence = SEEK_SET;
2979 if (fcntl(fd, F_GETLK, &lock) != 0) {
2980 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2984 if (lock.l_type == F_UNLCK) {
2988 if (pread(fd, &c, 1, pnn) == -1) {
2989 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2997 get capabilities of a remote node
2999 struct ctdb_client_control_state *
3000 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3002 return ctdb_control_send(ctdb, destnode, 0,
3003 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
3004 mem_ctx, &timeout, NULL);
3007 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3013 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3014 if ( (ret != 0) || (res != 0) ) {
3015 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3020 *capabilities = *((uint32_t *)outdata.dptr);
3026 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3028 struct ctdb_client_control_state *state;
3029 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3032 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3033 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3034 talloc_free(tmp_ctx);
3038 struct ctdb_transaction_handle {
3039 struct ctdb_db_context *ctdb_db;
3041 /* we store the reads and writes done under a transaction one
3042 list stores both reads and writes, the other just writes
3044 struct ctdb_marshall_buffer *m_all;
3045 struct ctdb_marshall_buffer *m_write;
3048 /* start a transaction on a database */
3049 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3051 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3055 /* start a transaction on a database */
3056 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3058 struct ctdb_record_handle *rh;
3060 struct ctdb_ltdb_header header;
3061 TALLOC_CTX *tmp_ctx;
3062 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3064 struct ctdb_db_context *ctdb_db = h->ctdb_db;
3066 key.dptr = discard_const(keyname);
3067 key.dsize = strlen(keyname);
3069 if (!ctdb_db->persistent) {
3070 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3075 tmp_ctx = talloc_new(h);
3077 rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3079 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3080 talloc_free(tmp_ctx);
3085 ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3087 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3088 talloc_free(tmp_ctx);
3092 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
3093 if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3094 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3095 talloc_free(tmp_ctx);
3099 talloc_free(tmp_ctx);
3105 /* start a transaction on a database */
3106 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3107 TALLOC_CTX *mem_ctx)
3109 struct ctdb_transaction_handle *h;
3112 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3114 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
3118 h->ctdb_db = ctdb_db;
3120 ret = ctdb_transaction_fetch_start(h);
3126 talloc_set_destructor(h, ctdb_transaction_destructor);
3134 fetch a record inside a transaction
3136 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3137 TALLOC_CTX *mem_ctx,
3138 TDB_DATA key, TDB_DATA *data)
3140 struct ctdb_ltdb_header header;
3143 ZERO_STRUCT(header);
3145 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3146 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3147 /* record doesn't exist yet */
3156 if (!h->in_replay) {
3157 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3158 if (h->m_all == NULL) {
3159 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3168 stores a record inside a transaction
3170 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3171 TDB_DATA key, TDB_DATA data)
3173 TALLOC_CTX *tmp_ctx = talloc_new(h);
3174 struct ctdb_ltdb_header header;
3178 ZERO_STRUCT(header);
3180 /* we need the header so we can update the RSN */
3181 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3182 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3183 /* the record doesn't exist - create one with us as dmaster.
3184 This is only safe because we are in a transaction and this
3185 is a persistent database */
3186 header.dmaster = h->ctdb_db->ctdb->pnn;
3188 } else if (ret != 0) {
3189 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3190 talloc_free(tmp_ctx);
3194 if (data.dsize == olddata.dsize &&
3195 memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3196 /* save writing the same data */
3197 talloc_free(tmp_ctx);
3203 if (!h->in_replay) {
3204 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3205 if (h->m_all == NULL) {
3206 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3207 talloc_free(tmp_ctx);
3212 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3213 if (h->m_write == NULL) {
3214 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3215 talloc_free(tmp_ctx);
3219 ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3221 talloc_free(tmp_ctx);
3227 replay a transaction
3229 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3232 struct ctdb_rec_data *rec = NULL;
3234 h->in_replay = true;
3235 talloc_free(h->m_write);
3238 ret = ctdb_transaction_fetch_start(h);
3243 for (i=0;i<h->m_all->count;i++) {
3246 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3248 DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3252 if (rec->reqid == 0) {
3254 if (ctdb_transaction_store(h, key, data) != 0) {
3259 TALLOC_CTX *tmp_ctx = talloc_new(h);
3261 if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3262 talloc_free(tmp_ctx);
3265 if (data2.dsize != data.dsize ||
3266 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3267 /* the record has changed on us - we have to give up */
3268 talloc_free(tmp_ctx);
3271 talloc_free(tmp_ctx);
3278 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3284 commit a transaction
3286 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3290 struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3291 struct timeval timeout;
3292 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3294 talloc_set_destructor(h, NULL);
3296 /* our commit strategy is quite complex.
3298 - we first try to commit the changes to all other nodes
3300 - if that works, then we commit locally and we are done
3302 - if a commit on another node fails, then we need to cancel
3303 the transaction, then restart the transaction (thus
3304 opening a window of time for a pending recovery to
3305 complete), then replay the transaction, checking all the
3306 reads and writes (checking that reads give the same data,
3307 and writes succeed). Then we retry the transaction to the
3312 if (h->m_write == NULL) {
3313 /* no changes were made */
3314 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3319 /* tell ctdbd to commit to the other nodes */
3320 timeout = timeval_current_ofs(1, 0);
3321 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3322 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
3323 ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
3325 if (ret != 0 || status != 0) {
3326 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3330 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3332 /* work out what error code we will give if we
3333 have to fail the operation */
3334 switch ((enum ctdb_trans2_commit_error)status) {
3335 case CTDB_TRANS2_COMMIT_SUCCESS:
3336 case CTDB_TRANS2_COMMIT_SOMEFAIL:
3337 case CTDB_TRANS2_COMMIT_TIMEOUT:
3338 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3340 case CTDB_TRANS2_COMMIT_ALLFAIL:
3341 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3346 if (++retries == 10) {
3347 DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
3348 h->ctdb_db->db_id, retries, (unsigned)failure_control));
3349 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3350 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3351 tdb_null, NULL, NULL, NULL, NULL, NULL);
3356 if (ctdb_replay_transaction(h) != 0) {
3357 DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3358 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3359 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3360 tdb_null, NULL, NULL, NULL, NULL, NULL);
3366 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3369 /* do the real commit locally */
3370 ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3372 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3373 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3374 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3375 tdb_null, NULL, NULL, NULL, NULL, NULL);
3380 /* tell ctdbd that we are finished with our local commit */
3381 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3382 CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
3383 tdb_null, NULL, NULL, NULL, NULL, NULL);
3389 recovery daemon ping to main daemon
3391 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3396 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
3397 ctdb, NULL, &res, NULL, NULL);
3398 if (ret != 0 || res != 0) {
3399 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));