4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 allocate a packet for use in client<->daemon communication
34 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36 enum ctdb_operation operation,
37 size_t length, size_t slength,
41 struct ctdb_req_header *hdr;
43 length = MAX(length, slength);
44 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46 hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
48 DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
49 operation, (unsigned)length));
52 talloc_set_name_const(hdr, type);
53 memset(hdr, 0, slength);
55 hdr->operation = operation;
56 hdr->ctdb_magic = CTDB_MAGIC;
57 hdr->ctdb_version = CTDB_VERSION;
58 hdr->srcnode = ctdb->vnn;
60 hdr->generation = ctdb->vnn_map->generation;
67 local version of ctdb_call
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71 TDB_DATA *data, uint32_t caller)
73 struct ctdb_call_info *c;
74 struct ctdb_registered_call *fn;
75 struct ctdb_context *ctdb = ctdb_db->ctdb;
77 c = talloc(ctdb, struct ctdb_call_info);
78 CTDB_NO_MEMORY(ctdb, c);
81 c->call_data = &call->call_data;
82 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83 c->record_data.dsize = data->dsize;
84 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89 for (fn=ctdb_db->calls;fn;fn=fn->next) {
90 if (fn->id == call->call_id) break;
93 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104 if (header->laccessor != caller) {
107 header->laccessor = caller;
110 /* we need to force the record to be written out if this was a remote access,
111 so that the lacount is updated */
112 if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
113 c->new_data = &c->record_data;
117 /* XXX check that we always have the lock here? */
118 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
119 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
126 call->reply_data = *c->reply_data;
127 talloc_steal(ctdb, call->reply_data.dptr);
128 talloc_set_name_const(call->reply_data.dptr, __location__);
130 call->reply_data.dptr = NULL;
131 call->reply_data.dsize = 0;
133 call->status = c->status;
142 queue a packet for sending from client to daemon
144 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
146 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
151 state of a in-progress ctdb call in client
153 struct ctdb_client_call_state {
154 enum call_state state;
156 struct ctdb_db_context *ctdb_db;
157 struct ctdb_call call;
161 called when a CTDB_REPLY_CALL packet comes in in the client
163 This packet comes in response to a CTDB_REQ_CALL request packet. It
164 contains any reply data from the call
166 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
168 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
169 struct ctdb_client_call_state *state;
171 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
173 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
177 if (hdr->reqid != state->reqid) {
178 /* we found a record but it was the wrong one */
179 DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
183 state->call.reply_data.dptr = c->data;
184 state->call.reply_data.dsize = c->datalen;
185 state->call.status = c->status;
187 talloc_steal(state, c);
189 state->state = CTDB_CALL_DONE;
192 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
195 this is called in the client, when data comes in from the daemon
197 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
199 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
200 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
203 /* place the packet as a child of a tmp_ctx. We then use
204 talloc_free() below to free it. If any of the calls want
205 to keep it, then they will steal it somewhere else, and the
206 talloc_free() will be a no-op */
207 tmp_ctx = talloc_new(ctdb);
208 talloc_steal(tmp_ctx, hdr);
211 DEBUG(2,("Daemon has exited - shutting down client\n"));
215 if (cnt < sizeof(*hdr)) {
216 DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt));
219 if (cnt != hdr->length) {
220 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
221 (unsigned)hdr->length, (unsigned)cnt);
225 if (hdr->ctdb_magic != CTDB_MAGIC) {
226 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
230 if (hdr->ctdb_version != CTDB_VERSION) {
231 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
235 switch (hdr->operation) {
236 case CTDB_REPLY_CALL:
237 ctdb_client_reply_call(ctdb, hdr);
240 case CTDB_REQ_MESSAGE:
241 ctdb_request_message(ctdb, hdr);
244 case CTDB_REPLY_CONTROL:
245 ctdb_client_reply_control(ctdb, hdr);
249 DEBUG(0,("bogus operation code:%u\n",hdr->operation));
253 talloc_free(tmp_ctx);
257 connect to a unix domain socket
259 int ctdb_socket_connect(struct ctdb_context *ctdb)
261 struct sockaddr_un addr;
263 memset(&addr, 0, sizeof(addr));
264 addr.sun_family = AF_UNIX;
265 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
267 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
268 if (ctdb->daemon.sd == -1) {
272 set_nonblocking(ctdb->daemon.sd);
273 set_close_on_exec(ctdb->daemon.sd);
275 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276 close(ctdb->daemon.sd);
277 ctdb->daemon.sd = -1;
281 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
283 ctdb_client_read_cb, ctdb);
288 struct ctdb_record_handle {
289 struct ctdb_db_context *ctdb_db;
292 struct ctdb_ltdb_header header;
297 make a recv call to the local ctdb daemon - called from client context
299 This is called when the program wants to wait for a ctdb_call to complete and get the
300 results. This call will block unless the call has already completed.
302 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 while (state->state < CTDB_CALL_DONE) {
305 event_loop_once(state->ctdb_db->ctdb->ev);
307 if (state->state != CTDB_CALL_DONE) {
308 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
313 if (state->call.reply_data.dsize) {
314 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
315 state->call.reply_data.dptr,
316 state->call.reply_data.dsize);
317 call->reply_data.dsize = state->call.reply_data.dsize;
319 call->reply_data.dptr = NULL;
320 call->reply_data.dsize = 0;
322 call->status = state->call.status;
332 destroy a ctdb_call in client
334 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
336 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
341 construct an event driven local ctdb_call
343 this is used so that locally processed ctdb_call requests are processed
344 in an event driven manner
346 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
347 struct ctdb_call *call,
348 struct ctdb_ltdb_header *header,
351 struct ctdb_client_call_state *state;
352 struct ctdb_context *ctdb = ctdb_db->ctdb;
355 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
356 CTDB_NO_MEMORY_NULL(ctdb, state);
358 talloc_steal(state, data->dptr);
360 state->state = CTDB_CALL_DONE;
362 state->ctdb_db = ctdb_db;
364 ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn);
370 make a ctdb call to the local daemon - async send. Called from client context.
372 This constructs a ctdb_call request and queues it for processing.
373 This call never blocks.
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
376 struct ctdb_call *call)
378 struct ctdb_client_call_state *state;
379 struct ctdb_context *ctdb = ctdb_db->ctdb;
380 struct ctdb_ltdb_header header;
384 struct ctdb_req_call *c;
386 /* if the domain socket is not yet open, open it */
387 if (ctdb->daemon.sd==-1) {
388 ctdb_socket_connect(ctdb);
391 ret = ctdb_ltdb_lock(ctdb_db, call->key);
393 DEBUG(0,(__location__ " Failed to get chainlock\n"));
397 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
399 if (ret == 0 && header.dmaster == ctdb->vnn) {
400 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
401 talloc_free(data.dptr);
402 ctdb_ltdb_unlock(ctdb_db, call->key);
406 ctdb_ltdb_unlock(ctdb_db, call->key);
407 talloc_free(data.dptr);
409 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
411 DEBUG(0, (__location__ " failed to allocate state\n"));
415 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
416 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
418 DEBUG(0, (__location__ " failed to allocate packet\n"));
422 state->reqid = ctdb_reqid_new(ctdb, state);
423 state->ctdb_db = ctdb_db;
424 talloc_set_destructor(state, ctdb_client_call_destructor);
426 c->hdr.reqid = state->reqid;
427 c->flags = call->flags;
428 c->db_id = ctdb_db->db_id;
429 c->callid = call->call_id;
431 c->keylen = call->key.dsize;
432 c->calldatalen = call->call_data.dsize;
433 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
434 memcpy(&c->data[call->key.dsize],
435 call->call_data.dptr, call->call_data.dsize);
437 state->call.call_data.dptr = &c->data[call->key.dsize];
438 state->call.key.dptr = &c->data[0];
440 state->state = CTDB_CALL_WAIT;
443 ctdb_client_queue_pkt(ctdb, &c->hdr);
450 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
452 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
454 struct ctdb_client_call_state *state;
456 state = ctdb_call_send(ctdb_db, call);
457 return ctdb_call_recv(state, call);
462 tell the daemon what messaging srvid we will use, and register the message
463 handler function in the client
465 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
466 ctdb_message_fn_t handler,
473 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
474 tdb_null, NULL, NULL, &status, NULL, NULL);
475 if (res != 0 || status != 0) {
476 DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));
480 /* also need to register the handler with our own ctdb structure */
481 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
485 tell the daemon we no longer want a srvid
487 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
492 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
493 tdb_null, NULL, NULL, &status, NULL, NULL);
494 if (res != 0 || status != 0) {
495 DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
499 /* also need to register the handler with our own ctdb structure */
500 ctdb_deregister_message_handler(ctdb, srvid, private_data);
506 send a message - from client context
508 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
509 uint64_t srvid, TDB_DATA data)
511 struct ctdb_req_message *r;
514 len = offsetof(struct ctdb_req_message, data) + data.dsize;
515 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
516 len, struct ctdb_req_message);
517 CTDB_NO_MEMORY(ctdb, r);
519 r->hdr.destnode = vnn;
521 r->datalen = data.dsize;
522 memcpy(&r->data[0], data.dptr, data.dsize);
524 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
535 cancel a ctdb_fetch_lock operation, releasing the lock
537 static int fetch_lock_destructor(struct ctdb_record_handle *h)
539 ctdb_ltdb_unlock(h->ctdb_db, h->key);
544 force the migration of a record to this node
546 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
548 struct ctdb_call call;
550 call.call_id = CTDB_NULL_FUNC;
552 call.flags = CTDB_IMMEDIATE_MIGRATION;
553 return ctdb_call(ctdb_db, &call);
557 get a lock on a record, and return the records data. Blocks until it gets the lock
559 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
560 TDB_DATA key, TDB_DATA *data)
563 struct ctdb_record_handle *h;
566 procedure is as follows:
568 1) get the chain lock.
569 2) check if we are dmaster
570 3) if we are the dmaster then return handle
571 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
573 5) when we get the reply, goto (1)
576 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
581 h->ctdb_db = ctdb_db;
583 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
584 if (h->key.dptr == NULL) {
590 DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
591 (const char *)key.dptr));
594 /* step 1 - get the chain lock */
595 ret = ctdb_ltdb_lock(ctdb_db, key);
597 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
602 DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
604 talloc_set_destructor(h, fetch_lock_destructor);
606 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
608 /* when torturing, ensure we test the remote path */
609 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
611 h->header.dmaster = (uint32_t)-1;
615 DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
617 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->vnn) {
618 ctdb_ltdb_unlock(ctdb_db, key);
619 ret = ctdb_client_force_migration(ctdb_db, key);
621 DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
628 DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
633 store some data to the record that was locked with ctdb_fetch_lock()
635 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
637 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
640 struct ctdb_client_control_state {
641 struct ctdb_context *ctdb;
645 enum call_state state;
650 called when a CTDB_REPLY_CONTROL packet comes in in the client
652 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
653 contains any reply data from the control
655 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
656 struct ctdb_req_header *hdr)
658 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
659 struct ctdb_client_control_state *state;
661 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
663 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
667 if (hdr->reqid != state->reqid) {
668 /* we found a record but it was the wrong one */
669 DEBUG(0, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
673 state->outdata.dptr = c->data;
674 state->outdata.dsize = c->datalen;
675 state->status = c->status;
677 state->errormsg = talloc_strndup(state,
678 (char *)&c->data[c->datalen],
682 talloc_steal(state, c);
684 state->state = CTDB_CALL_DONE;
688 /* time out handler for ctdb_control */
689 static void timeout_func(struct event_context *ev, struct timed_event *te,
690 struct timeval t, void *private_data)
692 uint32_t *timed_out = (uint32_t *)private_data;
698 destroy a ctdb_control in client
700 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
702 ctdb_reqid_remove(state->ctdb, state->reqid);
707 send a ctdb control message
708 timeout specifies how long we should wait for a reply.
709 if timeout is NULL we wait indefinitely
711 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
712 uint32_t opcode, uint32_t flags, TDB_DATA data,
713 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
714 struct timeval *timeout,
717 struct ctdb_client_control_state *state;
718 struct ctdb_req_control *c;
727 /* if the domain socket is not yet open, open it */
728 if (ctdb->daemon.sd==-1) {
729 ctdb_socket_connect(ctdb);
732 state = talloc_zero(ctdb, struct ctdb_client_control_state);
733 CTDB_NO_MEMORY(ctdb, state);
736 state->reqid = ctdb_reqid_new(ctdb, state);
737 state->state = CTDB_CALL_WAIT;
738 state->errormsg = NULL;
740 talloc_set_destructor(state, ctdb_control_destructor);
742 len = offsetof(struct ctdb_req_control, data) + data.dsize;
743 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
744 len, struct ctdb_req_control);
745 CTDB_NO_MEMORY(ctdb, c);
747 c->hdr.reqid = state->reqid;
748 c->hdr.destnode = destnode;
749 c->hdr.reqid = state->reqid;
754 c->datalen = data.dsize;
756 memcpy(&c->data[0], data.dptr, data.dsize);
759 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
765 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
770 /* semi-async operation */
772 if (timeout && !timeval_is_zero(timeout)) {
773 event_add_timed(ctdb->ev, state, *timeout, timeout_func, &timed_out);
775 while ((state->state == CTDB_CALL_WAIT)
776 && (timed_out == 0) ){
777 event_loop_once(ctdb->ev);
782 (*errormsg) = talloc_strdup(mem_ctx, "control timed out");
784 DEBUG(0,("ctdb_control timed out\n"));
790 *outdata = state->outdata;
791 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
794 *status = state->status;
796 if (!errormsg && state->errormsg) {
797 DEBUG(0,("ctdb_control error: '%s'\n", state->errormsg));
800 if (errormsg && state->errormsg) {
801 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
812 a process exists call. Returns 0 if process exists, -1 otherwise
814 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
820 data.dptr = (uint8_t*)&pid;
821 data.dsize = sizeof(pid);
823 ret = ctdb_control(ctdb, destnode, 0,
824 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
825 NULL, NULL, &status, NULL, NULL);
827 DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
835 get remote statistics
837 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
843 ret = ctdb_control(ctdb, destnode, 0,
844 CTDB_CONTROL_STATISTICS, 0, tdb_null,
845 ctdb, &data, &res, NULL, NULL);
846 if (ret != 0 || res != 0) {
847 DEBUG(0,(__location__ " ctdb_control for statistics failed\n"));
851 if (data.dsize != sizeof(struct ctdb_statistics)) {
852 DEBUG(0,(__location__ " Wrong statistics size %u - expected %u\n",
853 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
857 *status = *(struct ctdb_statistics *)data.dptr;
858 talloc_free(data.dptr);
864 shutdown a remote ctdb node
866 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
871 ret = ctdb_control(ctdb, destnode, 0,
872 CTDB_CONTROL_SHUTDOWN, CTDB_CTRL_FLAG_NOREPLY, tdb_null,
873 NULL, NULL, &res, &timeout, NULL);
875 DEBUG(0,(__location__ " ctdb_control for shutdown failed\n"));
883 get vnn map from a remote node
885 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
890 struct ctdb_vnn_map_wire *map;
892 ret = ctdb_control(ctdb, destnode, 0,
893 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
894 mem_ctx, &outdata, &res, &timeout, NULL);
895 if (ret != 0 || res != 0) {
896 DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
900 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
901 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
902 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
903 DEBUG(0,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
907 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
908 CTDB_NO_MEMORY(ctdb, *vnnmap);
909 (*vnnmap)->generation = map->generation;
910 (*vnnmap)->size = map->size;
911 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
913 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
914 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
915 talloc_free(outdata.dptr);
921 get the recovery mode of a remote node
923 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
928 ret = ctdb_control(ctdb, destnode, 0,
929 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
930 NULL, NULL, &res, &timeout, NULL);
932 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
942 set the recovery mode of a remote node
944 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
950 data.dsize = sizeof(uint32_t);
951 data.dptr = (unsigned char *)&recmode;
953 ret = ctdb_control(ctdb, destnode, 0,
954 CTDB_CONTROL_SET_RECMODE, 0, data,
955 NULL, NULL, &res, &timeout, NULL);
956 if (ret != 0 || res != 0) {
957 DEBUG(0,(__location__ " ctdb_control for setrecmode failed\n"));
965 get the recovery master of a remote node
967 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
972 ret = ctdb_control(ctdb, destnode, 0,
973 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
974 NULL, NULL, &res, &timeout, NULL);
976 DEBUG(0,(__location__ " ctdb_control for getrecmaster failed\n"));
986 set the recovery master of a remote node
988 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
995 data.dsize = sizeof(uint32_t);
996 data.dptr = (unsigned char *)&recmaster;
998 ret = ctdb_control(ctdb, destnode, 0,
999 CTDB_CONTROL_SET_RECMASTER, 0, data,
1000 NULL, NULL, &res, &timeout, NULL);
1001 if (ret != 0 || res != 0) {
1002 DEBUG(0,(__location__ " ctdb_control for setrecmaster failed\n"));
1011 get a list of databases off a remote node
1013 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1014 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1020 ret = ctdb_control(ctdb, destnode, 0,
1021 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1022 mem_ctx, &outdata, &res, &timeout, NULL);
1023 if (ret != 0 || res != 0) {
1024 DEBUG(0,(__location__ " ctdb_control for getdbmap failed\n"));
1028 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1029 talloc_free(outdata.dptr);
1036 get a list of nodes (vnn and flags ) from a remote node
1038 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1039 struct timeval timeout, uint32_t destnode,
1040 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1046 ret = ctdb_control(ctdb, destnode, 0,
1047 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1048 mem_ctx, &outdata, &res, &timeout, NULL);
1049 if (ret != 0 || res != 0) {
1050 DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
1054 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1055 talloc_free(outdata.dptr);
1061 set vnn map on a node
1063 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1064 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1069 struct ctdb_vnn_map_wire *map;
1072 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1073 map = talloc_size(mem_ctx, len);
1074 CTDB_NO_MEMORY_VOID(ctdb, map);
1076 map->generation = vnnmap->generation;
1077 map->size = vnnmap->size;
1078 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1081 data.dptr = (uint8_t *)map;
1083 ret = ctdb_control(ctdb, destnode, 0,
1084 CTDB_CONTROL_SETVNNMAP, 0, data,
1085 NULL, NULL, &res, &timeout, NULL);
1086 if (ret != 0 || res != 0) {
1087 DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
1097 get all keys and records for a specific database
1099 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster,
1100 TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
1103 TDB_DATA indata, outdata;
1104 struct ctdb_control_pulldb pull;
1105 struct ctdb_control_pulldb_reply *reply;
1106 struct ctdb_rec_data *rec;
1110 pull.lmaster = lmaster;
1112 indata.dsize = sizeof(struct ctdb_control_pulldb);
1113 indata.dptr = (unsigned char *)&pull;
1115 ret = ctdb_control(ctdb, destnode, 0,
1116 CTDB_CONTROL_PULL_DB, 0, indata,
1117 mem_ctx, &outdata, &res, NULL, NULL);
1118 if (ret != 0 || res != 0) {
1119 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1124 reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
1125 keys->dbid = reply->db_id;
1126 keys->num = reply->count;
1128 keys->keys = talloc_array(mem_ctx, TDB_DATA, keys->num);
1129 keys->headers = talloc_array(mem_ctx, struct ctdb_ltdb_header, keys->num);
1130 keys->data = talloc_array(mem_ctx, TDB_DATA, keys->num);
1132 rec = (struct ctdb_rec_data *)&reply->data[0];
1134 for (i=0;i<reply->count;i++) {
1135 keys->keys[i].dptr = talloc_memdup(mem_ctx, &rec->data[0], rec->keylen);
1136 keys->keys[i].dsize = rec->keylen;
1138 keys->data[i].dptr = talloc_memdup(mem_ctx, &rec->data[keys->keys[i].dsize], rec->datalen);
1139 keys->data[i].dsize = rec->datalen;
1141 if (keys->data[i].dsize < sizeof(struct ctdb_ltdb_header)) {
1142 DEBUG(0,(__location__ " bad ltdb record\n"));
1145 memcpy(&keys->headers[i], keys->data[i].dptr, sizeof(struct ctdb_ltdb_header));
1146 keys->data[i].dptr += sizeof(struct ctdb_ltdb_header);
1147 keys->data[i].dsize -= sizeof(struct ctdb_ltdb_header);
1149 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1152 talloc_free(outdata.dptr);
1158 copy a tdb from one node to another node
1160 int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t sourcenode,
1161 uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
1164 TDB_DATA indata, outdata;
1167 indata.dsize = 2*sizeof(uint32_t);
1168 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1170 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1171 ((uint32_t *)(&indata.dptr[0]))[1] = lmaster;
1173 DEBUG(3,("pulling dbid 0x%x from %u\n", dbid, sourcenode));
1175 ret = ctdb_control(ctdb, sourcenode, 0,
1176 CTDB_CONTROL_PULL_DB, 0, indata,
1177 mem_ctx, &outdata, &res, &timeout, NULL);
1178 if (ret != 0 || res != 0) {
1179 DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
1183 DEBUG(3,("pushing dbid 0x%x to %u\n", dbid, destnode));
1185 ret = ctdb_control(ctdb, destnode, 0,
1186 CTDB_CONTROL_PUSH_DB, 0, outdata,
1187 mem_ctx, NULL, &res, &timeout, NULL);
1188 talloc_free(outdata.dptr);
1189 if (ret != 0 || res != 0) {
1190 DEBUG(0,(__location__ " ctdb_control for pushdb failed\n"));
1194 DEBUG(3,("copydb for dbid 0x%x done for %u to %u\n",
1195 dbid, sourcenode, destnode));
1201 change dmaster for all keys in the database to the new value
1203 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1204 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1210 indata.dsize = 2*sizeof(uint32_t);
1211 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1213 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1214 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1216 ret = ctdb_control(ctdb, destnode, 0,
1217 CTDB_CONTROL_SET_DMASTER, 0, indata,
1218 NULL, NULL, &res, &timeout, NULL);
1219 if (ret != 0 || res != 0) {
1220 DEBUG(0,(__location__ " ctdb_control for setdmaster failed\n"));
1228 ping a node, return number of clients connected
1230 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1235 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1236 tdb_null, NULL, NULL, &res, NULL, NULL);
1244 find the real path to a ltdb
1246 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1253 data.dptr = (uint8_t *)&dbid;
1254 data.dsize = sizeof(dbid);
1256 ret = ctdb_control(ctdb, destnode, 0,
1257 CTDB_CONTROL_GETDBPATH, 0, data,
1258 mem_ctx, &data, &res, &timeout, NULL);
1259 if (ret != 0 || res != 0) {
1263 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1264 if ((*path) == NULL) {
1268 talloc_free(data.dptr);
1274 find the name of a db
1276 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1283 data.dptr = (uint8_t *)&dbid;
1284 data.dsize = sizeof(dbid);
1286 ret = ctdb_control(ctdb, destnode, 0,
1287 CTDB_CONTROL_GET_DBNAME, 0, data,
1288 mem_ctx, &data, &res, &timeout, NULL);
1289 if (ret != 0 || res != 0) {
1293 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1294 if ((*name) == NULL) {
1298 talloc_free(data.dptr);
1306 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name)
1312 data.dptr = discard_const(name);
1313 data.dsize = strlen(name)+1;
1315 ret = ctdb_control(ctdb, destnode, 0,
1316 CTDB_CONTROL_DB_ATTACH, 0, data,
1317 mem_ctx, &data, &res, &timeout, NULL);
1319 if (ret != 0 || res != 0) {
1327 get debug level on a node
1329 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *level)
1335 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1336 ctdb, &data, &res, NULL, NULL);
1337 if (ret != 0 || res != 0) {
1340 if (data.dsize != sizeof(uint32_t)) {
1341 DEBUG(0,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1342 (unsigned)data.dsize));
1345 *level = *(uint32_t *)data.dptr;
1346 talloc_free(data.dptr);
1351 set debug level on a node
1353 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t level)
1359 data.dptr = (uint8_t *)&level;
1360 data.dsize = sizeof(level);
1362 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1363 NULL, NULL, &res, NULL, NULL);
1364 if (ret != 0 || res != 0) {
1372 get a list of connected nodes
1374 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1375 struct timeval timeout,
1376 TALLOC_CTX *mem_ctx,
1377 uint32_t *num_nodes)
1379 struct ctdb_node_map *map=NULL;
1385 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1390 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1391 if (nodes == NULL) {
1395 for (i=0;i<map->num;i++) {
1396 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1397 nodes[*num_nodes] = map->nodes[i].vnn;
1409 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1414 ret = ctdb_control(ctdb, destnode, 0,
1415 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1416 NULL, NULL, &res, NULL, NULL);
1417 if (ret != 0 || res != 0) {
1418 DEBUG(0,(__location__ " ctdb_control for reset statistics failed\n"));
1426 attach to a specific database - client call
1428 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
1430 struct ctdb_db_context *ctdb_db;
1435 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1436 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1438 ctdb_db->ctdb = ctdb;
1439 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1440 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1442 data.dptr = discard_const(name);
1443 data.dsize = strlen(name)+1;
1445 /* tell ctdb daemon to attach */
1446 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
1447 0, data, ctdb_db, &data, &res, NULL, NULL);
1448 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1449 DEBUG(0,("Failed to attach to database '%s'\n", name));
1450 talloc_free(ctdb_db);
1454 ctdb_db->db_id = *(uint32_t *)data.dptr;
1455 talloc_free(data.dptr);
1457 ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1459 DEBUG(0,("Failed to get dbpath for database '%s'\n", name));
1460 talloc_free(ctdb_db);
1464 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 0, O_RDWR, 0);
1465 if (ctdb_db->ltdb == NULL) {
1466 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1467 talloc_free(ctdb_db);
1471 DLIST_ADD(ctdb->db_list, ctdb_db);
1478 setup a call for a database
1480 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1484 struct ctdb_control_set_call c;
1486 struct ctdb_registered_call *call;
1488 c.db_id = ctdb_db->db_id;
1492 data.dptr = (uint8_t *)&c;
1493 data.dsize = sizeof(c);
1495 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1496 data, NULL, NULL, &status, NULL, NULL);
1497 if (ret != 0 || status != 0) {
1498 DEBUG(0,("ctdb_set_call failed for call %u\n", id));
1502 /* also register locally */
1503 call = talloc(ctdb_db, struct ctdb_registered_call);
1507 DLIST_ADD(ctdb_db->calls, call);
1512 struct traverse_state {
1515 ctdb_traverse_func fn;
1520 called on each key during a ctdb_traverse
1522 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1524 struct traverse_state *state = (struct traverse_state *)p;
1525 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1528 if (data.dsize < sizeof(uint32_t) ||
1529 d->length != data.dsize) {
1530 DEBUG(0,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1535 key.dsize = d->keylen;
1536 key.dptr = &d->data[0];
1537 data.dsize = d->datalen;
1538 data.dptr = &d->data[d->keylen];
1540 if (key.dsize == 0 && data.dsize == 0) {
1541 /* end of traverse */
1546 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1555 start a cluster wide traverse, calling the supplied fn on each record
1556 return the number of records traversed, or -1 on error
1558 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1561 struct ctdb_traverse_start t;
1564 uint64_t srvid = (getpid() | 0xFLL<<60);
1565 struct traverse_state state;
1569 state.private_data = private_data;
1572 ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1574 DEBUG(0,("Failed to setup traverse handler\n"));
1578 t.db_id = ctdb_db->db_id;
1582 data.dptr = (uint8_t *)&t;
1583 data.dsize = sizeof(t);
1585 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1586 data, NULL, NULL, &status, NULL, NULL);
1587 if (ret != 0 || status != 0) {
1588 DEBUG(0,("ctdb_traverse_all failed\n"));
1589 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1593 while (!state.done) {
1594 event_loop_once(ctdb_db->ctdb->ev);
1597 ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1599 DEBUG(0,("Failed to remove ctdb_traverse handler\n"));
1607 called on each key during a catdb
1609 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1611 FILE *f = (FILE *)p;
1612 char *keystr, *datastr;
1613 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1615 keystr = hex_encode(ctdb, key.dptr, key.dsize);
1616 datastr = hex_encode(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
1618 fprintf(f, "dmaster: %u\n", h->dmaster);
1619 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1620 fprintf(f, "key: %s\ndata: %s\n", keystr, datastr);
1622 talloc_free(keystr);
1623 talloc_free(datastr);
1628 convenience function to list all keys to stdout
1630 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1632 return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1636 get the pid of a ctdb daemon
1638 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1643 ret = ctdb_control(ctdb, destnode, 0,
1644 CTDB_CONTROL_GET_PID, 0, tdb_null,
1645 NULL, NULL, &res, &timeout, NULL);
1647 DEBUG(0,(__location__ " ctdb_control for getpid failed\n"));
1660 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1665 ret = ctdb_control(ctdb, destnode, 0,
1666 CTDB_CONTROL_FREEZE, 0, tdb_null,
1667 NULL, NULL, &res, &timeout, NULL);
1668 if (ret != 0 || res != 0) {
1669 DEBUG(0,(__location__ " ctdb_control freeze failed\n"));
1679 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1684 ret = ctdb_control(ctdb, destnode, 0,
1685 CTDB_CONTROL_THAW, 0, tdb_null,
1686 NULL, NULL, &res, &timeout, NULL);
1687 if (ret != 0 || res != 0) {
1688 DEBUG(0,(__location__ " ctdb_control thaw failed\n"));
1696 get vnn of a node, or -1
1698 int ctdb_ctrl_getvnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1703 ret = ctdb_control(ctdb, destnode, 0,
1704 CTDB_CONTROL_GET_VNN, 0, tdb_null,
1705 NULL, NULL, &res, &timeout, NULL);
1707 DEBUG(0,(__location__ " ctdb_control for getvnn failed\n"));
1715 set the monitoring mode of a remote node
1717 int ctdb_ctrl_setmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t monmode)
1723 data.dsize = sizeof(uint32_t);
1724 data.dptr = (uint8_t *)&monmode;
1726 ret = ctdb_control(ctdb, destnode, 0,
1727 CTDB_CONTROL_SET_MONMODE, 0, data,
1728 NULL, NULL, &res, &timeout, NULL);
1729 if (ret != 0 || res != 0) {
1730 DEBUG(0,(__location__ " ctdb_control for setmonmode failed\n"));
1738 get the monitoring mode of a remote node
1740 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1745 ret = ctdb_control(ctdb, destnode, 0,
1746 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
1747 NULL, NULL, &res, &timeout, NULL);
1749 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
1760 get maximum rsn for a db on a node
1762 int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout,
1763 uint32_t destnode, uint32_t db_id, uint64_t *max_rsn)
1765 TDB_DATA data, outdata;
1769 data.dptr = (uint8_t *)&db_id;
1770 data.dsize = sizeof(db_id);
1772 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_MAX_RSN, 0, data, ctdb,
1773 &outdata, &res, &timeout, NULL);
1774 if (ret != 0 || res != 0 || outdata.dsize != sizeof(uint64_t)) {
1775 DEBUG(0,(__location__ " ctdb_control for get_max_rsn failed\n"));
1779 *max_rsn = *(uint64_t *)outdata.dptr;
1780 talloc_free(outdata.dptr);
1786 set the rsn on non-empty records to the given rsn
1788 int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout,
1789 uint32_t destnode, uint32_t db_id, uint64_t rsn)
1794 struct ctdb_control_set_rsn_nonempty p;
1799 data.dptr = (uint8_t *)&p;
1800 data.dsize = sizeof(p);
1802 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, NULL,
1803 NULL, &res, &timeout, NULL);
1804 if (ret != 0 || res != 0) {
1805 DEBUG(0,(__location__ " ctdb_control for set_rsn_nonempty failed\n"));
1813 delete records which have a rsn below the given rsn
1815 int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout,
1816 uint32_t destnode, uint32_t db_id, uint64_t rsn)
1821 struct ctdb_control_delete_low_rsn p;
1826 data.dptr = (uint8_t *)&p;
1827 data.dsize = sizeof(p);
1829 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, NULL,
1830 NULL, &res, &timeout, NULL);
1831 if (ret != 0 || res != 0) {
1832 DEBUG(0,(__location__ " ctdb_control for delete_low_rsn failed\n"));
1840 sent to a node to make it take over an ip address
1842 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
1843 uint32_t destnode, struct ctdb_public_ip *ip)
1849 data.dsize = sizeof(*ip);
1850 data.dptr = (uint8_t *)ip;
1852 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1853 NULL, &res, &timeout, NULL);
1855 if (ret != 0 || res != 0) {
1856 DEBUG(0,(__location__ " ctdb_control for takeover_ip failed\n"));
1865 sent to a node to make it release an ip address
1867 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
1868 uint32_t destnode, struct ctdb_public_ip *ip)
1874 data.dsize = sizeof(*ip);
1875 data.dptr = (uint8_t *)ip;
1877 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1878 NULL, &res, &timeout, NULL);
1880 if (ret != 0 || res != 0) {
1881 DEBUG(0,(__location__ " ctdb_control for release_ip failed\n"));
1892 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
1893 struct timeval timeout,
1895 const char *name, uint32_t *value)
1897 struct ctdb_control_get_tunable *t;
1898 TDB_DATA data, outdata;
1902 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1903 data.dptr = talloc_size(ctdb, data.dsize);
1904 CTDB_NO_MEMORY(ctdb, data.dptr);
1906 t = (struct ctdb_control_get_tunable *)data.dptr;
1907 t->length = strlen(name)+1;
1908 memcpy(t->name, name, t->length);
1910 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1911 &outdata, &res, &timeout, NULL);
1912 talloc_free(data.dptr);
1913 if (ret != 0 || res != 0) {
1914 DEBUG(0,(__location__ " ctdb_control for get_tunable failed\n"));
1918 if (outdata.dsize != sizeof(uint32_t)) {
1919 DEBUG(0,("Invalid return data in get_tunable\n"));
1920 talloc_free(outdata.dptr);
1924 *value = *(uint32_t *)outdata.dptr;
1925 talloc_free(outdata.dptr);
1933 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
1934 struct timeval timeout,
1936 const char *name, uint32_t value)
1938 struct ctdb_control_set_tunable *t;
1943 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1944 data.dptr = talloc_size(ctdb, data.dsize);
1945 CTDB_NO_MEMORY(ctdb, data.dptr);
1947 t = (struct ctdb_control_set_tunable *)data.dptr;
1948 t->length = strlen(name)+1;
1949 memcpy(t->name, name, t->length);
1952 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1953 NULL, &res, &timeout, NULL);
1954 talloc_free(data.dptr);
1955 if (ret != 0 || res != 0) {
1956 DEBUG(0,(__location__ " ctdb_control for set_tunable failed\n"));
1966 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
1967 struct timeval timeout,
1969 TALLOC_CTX *mem_ctx,
1970 const char ***list, uint32_t *count)
1975 struct ctdb_control_list_tunable *t;
1978 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
1979 mem_ctx, &outdata, &res, &timeout, NULL);
1980 if (ret != 0 || res != 0) {
1981 DEBUG(0,(__location__ " ctdb_control for list_tunables failed\n"));
1985 t = (struct ctdb_control_list_tunable *)outdata.dptr;
1986 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1987 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1988 DEBUG(0,("Invalid data in list_tunables reply\n"));
1989 talloc_free(outdata.dptr);
1993 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1994 CTDB_NO_MEMORY(ctdb, p);
1996 talloc_free(outdata.dptr);
2001 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2002 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2003 CTDB_NO_MEMORY(ctdb, *list);
2004 (*list)[*count] = talloc_strdup(*list, s);
2005 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2015 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2016 struct timeval timeout, uint32_t destnode,
2017 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2023 ret = ctdb_control(ctdb, destnode, 0,
2024 CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null,
2025 mem_ctx, &outdata, &res, &timeout, NULL);
2026 if (ret != 0 || res != 0) {
2027 DEBUG(0,(__location__ " ctdb_control for getpublicips failed\n"));
2031 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2032 talloc_free(outdata.dptr);
2038 set/clear the permanent disabled bit on a remote node
2040 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2041 uint32_t set, uint32_t clear)
2045 struct ctdb_node_modflags m;
2051 data.dsize = sizeof(m);
2052 data.dptr = (unsigned char *)&m;
2054 ret = ctdb_control(ctdb, destnode, 0,
2055 CTDB_CONTROL_MODIFY_FLAGS, 0, data,
2056 NULL, NULL, &res, &timeout, NULL);
2057 if (ret != 0 || res != 0) {
2058 DEBUG(0,(__location__ " ctdb_control for modflags failed\n"));
2069 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2070 struct timeval timeout,
2072 struct ctdb_tunable *tunables)
2078 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2079 &outdata, &res, &timeout, NULL);
2080 if (ret != 0 || res != 0) {
2081 DEBUG(0,(__location__ " ctdb_control for get all tunables failed\n"));
2085 if (outdata.dsize != sizeof(*tunables)) {
2086 DEBUG(0,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2087 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2091 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2092 talloc_free(outdata.dptr);
2098 initialise the ctdb daemon for client applications
2100 NOTE: In current code the daemon does not fork. This is for testing purposes only
2101 and to simplify the code.
2103 struct ctdb_context *ctdb_init(struct event_context *ev)
2105 struct ctdb_context *ctdb;
2107 ctdb = talloc_zero(ev, struct ctdb_context);
2109 ctdb->idr = idr_init(ctdb);
2110 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2119 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2121 ctdb->flags |= flags;
2125 setup the local socket name
2127 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2129 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2134 return the vnn of this node
2136 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)