4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
55 struct ctdb_set_db_flags_state {
56 struct tevent_context *ev;
57 struct ctdb_client_context *client;
58 struct timeval timeout;
61 bool readonly_done, sticky_done;
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
70 static struct tevent_req *ctdb_set_db_flags_send(
72 struct tevent_context *ev,
73 struct ctdb_client_context *client,
74 uint32_t destnode, struct timeval timeout,
75 uint32_t db_id, uint8_t db_flags)
77 struct tevent_req *req, *subreq;
78 struct ctdb_set_db_flags_state *state;
79 struct ctdb_req_control request;
81 req = tevent_req_create(mem_ctx, &state,
82 struct ctdb_set_db_flags_state);
87 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
89 return tevent_req_post(req, ev);
93 state->client = client;
94 state->timeout = timeout;
96 state->db_flags = db_flags;
98 ctdb_req_control_get_nodemap(&request);
99 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
101 if (tevent_req_nomem(subreq, req)) {
102 return tevent_req_post(req, ev);
104 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
111 struct tevent_req *req = tevent_req_callback_data(
112 subreq, struct tevent_req);
113 struct ctdb_set_db_flags_state *state = tevent_req_data(
114 req, struct ctdb_set_db_flags_state);
115 struct ctdb_req_control request;
116 struct ctdb_reply_control *reply;
117 struct ctdb_node_map *nodemap;
121 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
125 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
127 tevent_req_error(req, ret);
131 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
135 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
137 tevent_req_error(req, ret);
141 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142 state, &state->pnn_list);
143 talloc_free(nodemap);
144 if (state->count <= 0) {
146 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147 state->db_id, state->count));
148 tevent_req_error(req, ENOMEM);
152 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153 ctdb_req_control_set_db_readonly(&request, state->db_id);
154 subreq = ctdb_client_control_multi_send(
155 state, state->ev, state->client,
156 state->pnn_list, state->count,
157 state->timeout, &request);
158 if (tevent_req_nomem(subreq, req)) {
161 tevent_req_set_callback(subreq,
162 ctdb_set_db_flags_readonly_done, req);
164 state->readonly_done = true;
167 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168 ctdb_req_control_set_db_sticky(&request, state->db_id);
169 subreq = ctdb_client_control_multi_send(
170 state, state->ev, state->client,
171 state->pnn_list, state->count,
172 state->timeout, &request);
173 if (tevent_req_nomem(subreq, req)) {
176 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
179 state->sticky_done = true;
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
185 struct tevent_req *req = tevent_req_callback_data(
186 subreq, struct tevent_req);
187 struct ctdb_set_db_flags_state *state = tevent_req_data(
188 req, struct ctdb_set_db_flags_state);
192 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
197 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
199 tevent_req_error(req, ret);
203 state->readonly_done = true;
205 if (state->readonly_done && state->sticky_done) {
206 tevent_req_done(req);
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
212 struct tevent_req *req = tevent_req_callback_data(
213 subreq, struct tevent_req);
214 struct ctdb_set_db_flags_state *state = tevent_req_data(
215 req, struct ctdb_set_db_flags_state);
219 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
224 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
226 tevent_req_error(req, ret);
230 state->sticky_done = true;
232 if (state->readonly_done && state->sticky_done) {
233 tevent_req_done(req);
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
241 if (tevent_req_is_unix_error(req, &err)) {
250 struct ctdb_attach_state {
251 struct tevent_context *ev;
252 struct ctdb_client_context *client;
253 struct timeval timeout;
256 struct ctdb_db_context *db;
259 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
260 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
261 static void ctdb_attach_health_done(struct tevent_req *subreq);
262 static void ctdb_attach_flags_done(struct tevent_req *subreq);
263 static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
265 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
266 struct tevent_context *ev,
267 struct ctdb_client_context *client,
268 struct timeval timeout,
269 const char *db_name, uint8_t db_flags)
271 struct tevent_req *req, *subreq;
272 struct ctdb_attach_state *state;
273 struct ctdb_req_control request;
275 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
280 state->db = client_db_handle(client, db_name);
281 if (state->db != NULL) {
282 tevent_req_done(req);
283 return tevent_req_post(req, ev);
287 state->client = client;
288 state->timeout = timeout;
289 state->destnode = ctdb_client_pnn(client);
290 state->db_flags = db_flags;
292 state->db = talloc_zero(client, struct ctdb_db_context);
293 if (tevent_req_nomem(state->db, req)) {
294 return tevent_req_post(req, ev);
297 state->db->db_name = talloc_strdup(state->db, db_name);
298 if (tevent_req_nomem(state->db, req)) {
299 return tevent_req_post(req, ev);
302 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
303 state->db->persistent = true;
306 if (state->db->persistent) {
307 ctdb_req_control_db_attach_persistent(&request,
308 state->db->db_name, 0);
310 ctdb_req_control_db_attach(&request, state->db->db_name, 0);
313 subreq = ctdb_client_control_send(state, state->ev, state->client,
314 state->destnode, state->timeout,
316 if (tevent_req_nomem(subreq, req)) {
317 return tevent_req_post(req, ev);
319 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
324 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
326 struct tevent_req *req = tevent_req_callback_data(
327 subreq, struct tevent_req);
328 struct ctdb_attach_state *state = tevent_req_data(
329 req, struct ctdb_attach_state);
330 struct ctdb_req_control request;
331 struct ctdb_reply_control *reply;
335 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
338 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
340 (state->db->persistent
341 ? "DB_ATTACH_PERSISTENT"
344 tevent_req_error(req, ret);
348 if (state->db->persistent) {
349 ret = ctdb_reply_control_db_attach_persistent(
350 reply, &state->db->db_id);
352 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
356 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
357 state->db->db_name, ret));
358 tevent_req_error(req, ret);
362 ctdb_req_control_getdbpath(&request, state->db->db_id);
363 subreq = ctdb_client_control_send(state, state->ev, state->client,
364 state->destnode, state->timeout,
366 if (tevent_req_nomem(subreq, req)) {
369 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
372 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
374 struct tevent_req *req = tevent_req_callback_data(
375 subreq, struct tevent_req);
376 struct ctdb_attach_state *state = tevent_req_data(
377 req, struct ctdb_attach_state);
378 struct ctdb_reply_control *reply;
379 struct ctdb_req_control request;
383 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
386 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
387 state->db->db_name, ret));
388 tevent_req_error(req, ret);
392 ret = ctdb_reply_control_getdbpath(reply, state->db,
393 &state->db->db_path);
396 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
397 state->db->db_name, ret));
398 tevent_req_error(req, ret);
402 ctdb_req_control_db_get_health(&request, state->db->db_id);
403 subreq = ctdb_client_control_send(state, state->ev, state->client,
404 state->destnode, state->timeout,
406 if (tevent_req_nomem(subreq, req)) {
409 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
412 static void ctdb_attach_health_done(struct tevent_req *subreq)
414 struct tevent_req *req = tevent_req_callback_data(
415 subreq, struct tevent_req);
416 struct ctdb_attach_state *state = tevent_req_data(
417 req, struct ctdb_attach_state);
418 struct ctdb_reply_control *reply;
423 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
426 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
427 state->db->db_name, ret));
428 tevent_req_error(req, ret);
432 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
435 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
436 state->db->db_name, ret));
437 tevent_req_error(req, ret);
441 if (reason != NULL) {
442 /* Database unhealthy, avoid attach */
443 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
444 state->db->db_name, reason));
445 tevent_req_error(req, EIO);
449 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
450 state->destnode, state->timeout,
451 state->db->db_id, state->db_flags);
452 if (tevent_req_nomem(subreq, req)) {
455 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
458 static void ctdb_attach_flags_done(struct tevent_req *subreq)
460 struct tevent_req *req = tevent_req_callback_data(
461 subreq, struct tevent_req);
462 struct ctdb_attach_state *state = tevent_req_data(
463 req, struct ctdb_attach_state);
464 struct ctdb_req_control request;
468 status = ctdb_set_db_flags_recv(subreq, &ret);
471 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
472 state->db->db_name, state->db_flags));
473 tevent_req_error(req, ret);
477 ctdb_req_control_db_open_flags(&request, state->db->db_id);
478 subreq = ctdb_client_control_send(state, state->ev, state->client,
479 state->destnode, state->timeout,
481 if (tevent_req_nomem(subreq, req)) {
484 tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
487 static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
489 struct tevent_req *req = tevent_req_callback_data(
490 subreq, struct tevent_req);
491 struct ctdb_attach_state *state = tevent_req_data(
492 req, struct ctdb_attach_state);
493 struct ctdb_reply_control *reply;
497 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
500 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
501 state->db->db_name, ret));
502 tevent_req_error(req, ret);
506 ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
509 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
510 " ret=%d\n", state->db->db_name, ret));
511 tevent_req_error(req, ret);
515 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
516 tdb_flags, O_RDWR, 0);
517 if (tevent_req_nomem(state->db->ltdb, req)) {
518 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
519 state->db->db_name));
522 DLIST_ADD(state->client->db, state->db);
524 tevent_req_done(req);
527 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
528 struct ctdb_db_context **out)
530 struct ctdb_attach_state *state = tevent_req_data(
531 req, struct ctdb_attach_state);
534 if (tevent_req_is_unix_error(req, &err)) {
547 int ctdb_attach(struct tevent_context *ev,
548 struct ctdb_client_context *client,
549 struct timeval timeout,
550 const char *db_name, uint8_t db_flags,
551 struct ctdb_db_context **out)
554 struct tevent_req *req;
558 mem_ctx = talloc_new(client);
559 if (mem_ctx == NULL) {
563 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
566 talloc_free(mem_ctx);
570 tevent_req_poll(req, ev);
572 status = ctdb_attach_recv(req, &ret, out);
574 talloc_free(mem_ctx);
579 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
580 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
581 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
584 talloc_free(mem_ctx);
588 struct ctdb_detach_state {
589 struct ctdb_client_context *client;
590 struct tevent_context *ev;
591 struct timeval timeout;
596 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
597 static void ctdb_detach_done(struct tevent_req *subreq);
599 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
600 struct tevent_context *ev,
601 struct ctdb_client_context *client,
602 struct timeval timeout, uint32_t db_id)
604 struct tevent_req *req, *subreq;
605 struct ctdb_detach_state *state;
606 struct ctdb_req_control request;
608 req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
613 state->client = client;
615 state->timeout = timeout;
616 state->db_id = db_id;
618 ctdb_req_control_get_dbname(&request, db_id);
619 subreq = ctdb_client_control_send(state, ev, client,
620 ctdb_client_pnn(client), timeout,
622 if (tevent_req_nomem(subreq, req)) {
623 return tevent_req_post(req, ev);
625 tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
630 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
632 struct tevent_req *req = tevent_req_callback_data(
633 subreq, struct tevent_req);
634 struct ctdb_detach_state *state = tevent_req_data(
635 req, struct ctdb_detach_state);
636 struct ctdb_reply_control *reply;
637 struct ctdb_req_control request;
641 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
644 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
646 tevent_req_error(req, ret);
650 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
652 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
654 tevent_req_error(req, ret);
658 ctdb_req_control_db_detach(&request, state->db_id);
659 subreq = ctdb_client_control_send(state, state->ev, state->client,
660 ctdb_client_pnn(state->client),
661 state->timeout, &request);
662 if (tevent_req_nomem(subreq, req)) {
665 tevent_req_set_callback(subreq, ctdb_detach_done, req);
669 static void ctdb_detach_done(struct tevent_req *subreq)
671 struct tevent_req *req = tevent_req_callback_data(
672 subreq, struct tevent_req);
673 struct ctdb_detach_state *state = tevent_req_data(
674 req, struct ctdb_detach_state);
675 struct ctdb_reply_control *reply;
676 struct ctdb_db_context *db;
680 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
683 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
684 state->db_name, ret));
685 tevent_req_error(req, ret);
689 ret = ctdb_reply_control_db_detach(reply);
691 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
692 state->db_name, ret));
693 tevent_req_error(req, ret);
697 db = client_db_handle(state->client, state->db_name);
699 DLIST_REMOVE(state->client->db, db);
703 tevent_req_done(req);
706 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
710 if (tevent_req_is_unix_error(req, &ret)) {
720 int ctdb_detach(struct tevent_context *ev,
721 struct ctdb_client_context *client,
722 struct timeval timeout, uint32_t db_id)
725 struct tevent_req *req;
729 mem_ctx = talloc_new(client);
730 if (mem_ctx == NULL) {
734 req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
736 talloc_free(mem_ctx);
740 tevent_req_poll(req, ev);
742 status = ctdb_detach_recv(req, &ret);
744 talloc_free(mem_ctx);
748 talloc_free(mem_ctx);
752 uint32_t ctdb_db_id(struct ctdb_db_context *db)
757 struct ctdb_db_traverse_local_state {
758 ctdb_rec_parser_func_t parser;
764 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
765 TDB_DATA key, TDB_DATA data,
768 struct ctdb_db_traverse_local_state *state =
769 (struct ctdb_db_traverse_local_state *)private_data;
772 if (state->extract_header) {
773 struct ctdb_ltdb_header header;
775 ret = ctdb_ltdb_header_extract(&data, &header);
781 ret = state->parser(0, &header, key, data, state->private_data);
783 ret = state->parser(0, NULL, key, data, state->private_data);
794 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
796 ctdb_rec_parser_func_t parser, void *private_data)
798 struct ctdb_db_traverse_local_state state;
801 state.parser = parser;
802 state.private_data = private_data;
803 state.extract_header = extract_header;
807 ret = tdb_traverse_read(db->ltdb->tdb,
808 ctdb_db_traverse_local_handler,
811 ret = tdb_traverse(db->ltdb->tdb,
812 ctdb_db_traverse_local_handler, &state);
822 struct ctdb_db_traverse_state {
823 struct tevent_context *ev;
824 struct ctdb_client_context *client;
825 struct ctdb_db_context *db;
828 struct timeval timeout;
829 ctdb_rec_parser_func_t parser;
834 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
835 static void ctdb_db_traverse_started(struct tevent_req *subreq);
836 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
838 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
839 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
841 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
842 struct tevent_context *ev,
843 struct ctdb_client_context *client,
844 struct ctdb_db_context *db,
846 struct timeval timeout,
847 ctdb_rec_parser_func_t parser,
850 struct tevent_req *req, *subreq;
851 struct ctdb_db_traverse_state *state;
853 req = tevent_req_create(mem_ctx, &state,
854 struct ctdb_db_traverse_state);
860 state->client = client;
862 state->destnode = destnode;
863 state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
864 state->timeout = timeout;
865 state->parser = parser;
866 state->private_data = private_data;
868 subreq = ctdb_client_set_message_handler_send(state, ev, client,
870 ctdb_db_traverse_handler,
872 if (tevent_req_nomem(subreq, req)) {
873 return tevent_req_post(req, ev);
875 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
880 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
882 struct tevent_req *req = tevent_req_callback_data(
883 subreq, struct tevent_req);
884 struct ctdb_db_traverse_state *state = tevent_req_data(
885 req, struct ctdb_db_traverse_state);
886 struct ctdb_traverse_start_ext traverse;
887 struct ctdb_req_control request;
891 status = ctdb_client_set_message_handler_recv(subreq, &ret);
894 tevent_req_error(req, ret);
898 traverse = (struct ctdb_traverse_start_ext) {
899 .db_id = ctdb_db_id(state->db),
901 .srvid = state->srvid,
902 .withemptyrecords = false,
905 ctdb_req_control_traverse_start_ext(&request, &traverse);
906 subreq = ctdb_client_control_send(state, state->ev, state->client,
907 state->destnode, state->timeout,
909 if (subreq == NULL) {
910 state->result = ENOMEM;
911 ctdb_db_traverse_remove_handler(req);
914 tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
917 static void ctdb_db_traverse_started(struct tevent_req *subreq)
919 struct tevent_req *req = tevent_req_callback_data(
920 subreq, struct tevent_req);
921 struct ctdb_db_traverse_state *state = tevent_req_data(
922 req, struct ctdb_db_traverse_state);
923 struct ctdb_reply_control *reply;
927 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
930 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
932 ctdb_db_traverse_remove_handler(req);
936 ret = ctdb_reply_control_traverse_start_ext(reply);
939 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
942 ctdb_db_traverse_remove_handler(req);
947 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
950 struct tevent_req *req = talloc_get_type_abort(
951 private_data, struct tevent_req);
952 struct ctdb_db_traverse_state *state = tevent_req_data(
953 req, struct ctdb_db_traverse_state);
954 struct ctdb_rec_data *rec;
955 struct ctdb_ltdb_header header;
958 ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
963 if (rec->key.dsize == 0 && rec->data.dsize == 0) {
965 ctdb_db_traverse_remove_handler(req);
969 ret = ctdb_ltdb_header_extract(&rec->data, &header);
975 if (rec->data.dsize == 0) {
980 ret = state->parser(rec->reqid, &header, rec->key, rec->data,
981 state->private_data);
985 ctdb_db_traverse_remove_handler(req);
989 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
991 struct ctdb_db_traverse_state *state = tevent_req_data(
992 req, struct ctdb_db_traverse_state);
993 struct tevent_req *subreq;
995 subreq = ctdb_client_remove_message_handler_send(state, state->ev,
998 if (tevent_req_nomem(subreq, req)) {
1001 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1004 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1006 struct tevent_req *req = tevent_req_callback_data(
1007 subreq, struct tevent_req);
1008 struct ctdb_db_traverse_state *state = tevent_req_data(
1009 req, struct ctdb_db_traverse_state);
1013 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1014 TALLOC_FREE(subreq);
1016 tevent_req_error(req, ret);
1020 if (state->result != 0) {
1021 tevent_req_error(req, state->result);
1025 tevent_req_done(req);
1028 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1032 if (tevent_req_is_unix_error(req, &ret)) {
1042 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1043 struct ctdb_client_context *client,
1044 struct ctdb_db_context *db,
1045 uint32_t destnode, struct timeval timeout,
1046 ctdb_rec_parser_func_t parser, void *private_data)
1048 struct tevent_req *req;
1052 req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1053 timeout, parser, private_data);
1058 tevent_req_poll(req, ev);
1060 status = ctdb_db_traverse_recv(req, &ret);
1068 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1069 struct ctdb_ltdb_header *header,
1070 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1075 rec = tdb_fetch(db->ltdb->tdb, key);
1076 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1077 /* No record present */
1078 if (rec.dptr != NULL) {
1082 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
1087 header->dmaster = CTDB_UNKNOWN_PNN;
1096 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
1103 size_t offset = ctdb_ltdb_header_len(header);
1105 data->dsize = rec.dsize - offset;
1106 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
1108 if (data->dptr == NULL) {
1118 * Fetch a record from volatile database
1121 * 1. Get a lock on the hash chain
1122 * 2. If the record does not exist, migrate the record
1123 * 3. If readonly=true and delegations do not exist, migrate the record.
1124 * 4. If readonly=false and delegations exist, migrate the record.
1125 * 5. If the local node is not dmaster, migrate the record.
1129 struct ctdb_fetch_lock_state {
1130 struct tevent_context *ev;
1131 struct ctdb_client_context *client;
1132 struct ctdb_record_handle *h;
1137 static int ctdb_fetch_lock_check(struct tevent_req *req);
1138 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1139 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1141 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1142 struct tevent_context *ev,
1143 struct ctdb_client_context *client,
1144 struct ctdb_db_context *db,
1145 TDB_DATA key, bool readonly)
1147 struct ctdb_fetch_lock_state *state;
1148 struct tevent_req *req;
1151 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1157 state->client = client;
1159 state->h = talloc_zero(db, struct ctdb_record_handle);
1160 if (tevent_req_nomem(state->h, req)) {
1161 return tevent_req_post(req, ev);
1163 state->h->client = client;
1165 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1166 if (tevent_req_nomem(state->h->key.dptr, req)) {
1167 return tevent_req_post(req, ev);
1169 state->h->key.dsize = key.dsize;
1170 state->h->readonly = false;
1172 state->readonly = readonly;
1173 state->pnn = ctdb_client_pnn(client);
1175 /* Check that database is not persistent */
1176 if (db->persistent) {
1177 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1179 tevent_req_error(req, EINVAL);
1180 return tevent_req_post(req, ev);
1183 ret = ctdb_fetch_lock_check(req);
1185 tevent_req_done(req);
1186 return tevent_req_post(req, ev);
1188 if (ret != EAGAIN) {
1189 tevent_req_error(req, ret);
1190 return tevent_req_post(req, ev);
1195 static int ctdb_fetch_lock_check(struct tevent_req *req)
1197 struct ctdb_fetch_lock_state *state = tevent_req_data(
1198 req, struct ctdb_fetch_lock_state);
1199 struct ctdb_record_handle *h = state->h;
1200 struct ctdb_ltdb_header header;
1201 TDB_DATA data = tdb_null;
1203 bool do_migrate = false;
1205 ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1208 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1209 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1214 data = tdb_fetch(h->db->ltdb->tdb, h->key);
1215 if (data.dptr == NULL) {
1216 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1224 /* Got the record */
1225 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
1231 if (! state->readonly) {
1232 /* Read/write access */
1233 if (header.dmaster == state->pnn &&
1234 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1238 if (header.dmaster != state->pnn) {
1242 /* Readonly access */
1243 if (header.dmaster != state->pnn &&
1244 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1245 CTDB_REC_RO_HAVE_DELEGATIONS))) {
1250 /* We are the dmaster or readonly delegation */
1253 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1254 CTDB_REC_RO_HAVE_DELEGATIONS)) {
1264 if (data.dptr != NULL) {
1267 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1270 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1271 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1276 ctdb_fetch_lock_migrate(req);
1281 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1283 struct ctdb_fetch_lock_state *state = tevent_req_data(
1284 req, struct ctdb_fetch_lock_state);
1285 struct ctdb_req_call request;
1286 struct tevent_req *subreq;
1288 ZERO_STRUCT(request);
1289 request.flags = CTDB_IMMEDIATE_MIGRATION;
1290 if (state->readonly) {
1291 request.flags |= CTDB_WANT_READONLY;
1293 request.db_id = state->h->db->db_id;
1294 request.callid = CTDB_NULL_FUNC;
1295 request.key = state->h->key;
1296 request.calldata = tdb_null;
1298 subreq = ctdb_client_call_send(state, state->ev, state->client,
1300 if (tevent_req_nomem(subreq, req)) {
1304 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1307 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1309 struct tevent_req *req = tevent_req_callback_data(
1310 subreq, struct tevent_req);
1311 struct ctdb_fetch_lock_state *state = tevent_req_data(
1312 req, struct ctdb_fetch_lock_state);
1313 struct ctdb_reply_call *reply;
1317 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1318 TALLOC_FREE(subreq);
1320 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1321 state->h->db->db_name, ret));
1322 tevent_req_error(req, ret);
1326 if (reply->status != 0) {
1327 tevent_req_error(req, EIO);
1332 ret = ctdb_fetch_lock_check(req);
1334 if (ret != EAGAIN) {
1335 tevent_req_error(req, ret);
1340 tevent_req_done(req);
1343 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1347 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1350 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1351 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1357 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1358 struct ctdb_ltdb_header *header,
1359 TALLOC_CTX *mem_ctx,
1360 TDB_DATA *data, int *perr)
1362 struct ctdb_fetch_lock_state *state = tevent_req_data(
1363 req, struct ctdb_fetch_lock_state);
1364 struct ctdb_record_handle *h = state->h;
1367 if (tevent_req_is_unix_error(req, &err)) {
1369 TALLOC_FREE(state->h);
1375 if (header != NULL) {
1376 *header = h->header;
1381 offset = ctdb_ltdb_header_len(&h->header);
1383 data->dsize = h->data.dsize - offset;
1384 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1386 if (data->dptr == NULL) {
1387 TALLOC_FREE(state->h);
1395 talloc_set_destructor(h, ctdb_record_handle_destructor);
1399 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1400 struct ctdb_client_context *client,
1401 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1402 struct ctdb_record_handle **out,
1403 struct ctdb_ltdb_header *header, TDB_DATA *data)
1405 struct tevent_req *req;
1406 struct ctdb_record_handle *h;
1409 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1414 tevent_req_poll(req, ev);
1416 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1425 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1427 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1431 /* Cannot modify the record if it was obtained as a readonly copy */
1436 /* Check if the new data is same */
1437 if (h->data.dsize == data.dsize &&
1438 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1439 /* No need to do anything */
1443 ctdb_ltdb_header_push(&h->header, header);
1445 rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1446 rec[0].dptr = header;
1448 rec[1].dsize = data.dsize;
1449 rec[1].dptr = data.dptr;
1451 ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1454 ("store_record: %s tdb_storev failed, %s\n",
1455 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1462 struct ctdb_delete_record_state {
1463 struct ctdb_record_handle *h;
1466 static void ctdb_delete_record_done(struct tevent_req *subreq);
1468 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1469 struct tevent_context *ev,
1470 struct ctdb_record_handle *h)
1472 struct tevent_req *req, *subreq;
1473 struct ctdb_delete_record_state *state;
1474 struct ctdb_key_data key;
1475 struct ctdb_req_control request;
1476 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1480 req = tevent_req_create(mem_ctx, &state,
1481 struct ctdb_delete_record_state);
1488 /* Cannot delete the record if it was obtained as a readonly copy */
1490 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1492 tevent_req_error(req, EINVAL);
1493 return tevent_req_post(req, ev);
1496 ctdb_ltdb_header_push(&h->header, header);
1498 rec.dsize = ctdb_ltdb_header_len(&h->header);
1501 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1504 ("fetch_lock delete: %s tdb_sore failed, %s\n",
1505 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1506 tevent_req_error(req, EIO);
1507 return tevent_req_post(req, ev);
1510 key.db_id = h->db->db_id;
1511 key.header = h->header;
1514 ctdb_req_control_schedule_for_deletion(&request, &key);
1515 subreq = ctdb_client_control_send(state, ev, h->client,
1516 ctdb_client_pnn(h->client),
1517 tevent_timeval_zero(),
1519 if (tevent_req_nomem(subreq, req)) {
1520 return tevent_req_post(req, ev);
1522 tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1527 static void ctdb_delete_record_done(struct tevent_req *subreq)
1529 struct tevent_req *req = tevent_req_callback_data(
1530 subreq, struct tevent_req);
1531 struct ctdb_delete_record_state *state = tevent_req_data(
1532 req, struct ctdb_delete_record_state);
1536 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1537 TALLOC_FREE(subreq);
1540 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1541 "ret=%d\n", state->h->db->db_name, ret));
1542 tevent_req_error(req, ret);
1546 tevent_req_done(req);
1549 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1553 if (tevent_req_is_unix_error(req, &err)) {
1564 int ctdb_delete_record(struct ctdb_record_handle *h)
1566 struct tevent_context *ev = h->ev;
1567 TALLOC_CTX *mem_ctx;
1568 struct tevent_req *req;
1572 mem_ctx = talloc_new(NULL);
1573 if (mem_ctx == NULL) {
1577 req = ctdb_delete_record_send(mem_ctx, ev, h);
1579 talloc_free(mem_ctx);
1583 tevent_req_poll(req, ev);
1585 status = ctdb_delete_record_recv(req, &ret);
1586 talloc_free(mem_ctx);
1595 * Global lock functions
1598 struct ctdb_g_lock_lock_state {
1599 struct tevent_context *ev;
1600 struct ctdb_client_context *client;
1601 struct ctdb_db_context *db;
1603 struct ctdb_server_id my_sid;
1604 enum ctdb_g_lock_type lock_type;
1605 struct ctdb_record_handle *h;
1606 /* state for verification of active locks */
1607 struct ctdb_g_lock_list *lock_list;
1608 unsigned int current;
1611 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1612 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1613 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1614 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1615 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1617 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1618 enum ctdb_g_lock_type l2)
1620 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1626 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1627 struct tevent_context *ev,
1628 struct ctdb_client_context *client,
1629 struct ctdb_db_context *db,
1630 const char *keyname,
1631 struct ctdb_server_id *sid,
1634 struct tevent_req *req, *subreq;
1635 struct ctdb_g_lock_lock_state *state;
1637 req = tevent_req_create(mem_ctx, &state,
1638 struct ctdb_g_lock_lock_state);
1644 state->client = client;
1646 state->key.dptr = discard_const(keyname);
1647 state->key.dsize = strlen(keyname) + 1;
1648 state->my_sid = *sid;
1649 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1651 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1653 if (tevent_req_nomem(subreq, req)) {
1654 return tevent_req_post(req, ev);
1656 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1661 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1663 struct tevent_req *req = tevent_req_callback_data(
1664 subreq, struct tevent_req);
1665 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1666 req, struct ctdb_g_lock_lock_state);
1670 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1671 TALLOC_FREE(subreq);
1672 if (state->h == NULL) {
1673 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1674 (char *)state->key.dptr));
1675 tevent_req_error(req, ret);
1679 if (state->lock_list != NULL) {
1680 TALLOC_FREE(state->lock_list);
1684 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1686 talloc_free(data.dptr);
1688 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1689 (char *)state->key.dptr));
1690 tevent_req_error(req, ret);
1694 ctdb_g_lock_lock_process_locks(req);
1697 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1699 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1700 req, struct ctdb_g_lock_lock_state);
1701 struct tevent_req *subreq;
1702 struct ctdb_g_lock *lock;
1703 bool check_server = false;
1706 while (state->current < state->lock_list->num) {
1707 lock = &state->lock_list->lock[state->current];
1709 /* We should not ask for the same lock more than once */
1710 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1711 DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1712 (char *)state->key.dptr));
1713 tevent_req_error(req, EDEADLK);
1717 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1718 check_server = true;
1722 state->current += 1;
1726 struct ctdb_req_control request;
1728 ctdb_req_control_process_exists(&request, lock->sid.pid);
1729 subreq = ctdb_client_control_send(state, state->ev,
1732 tevent_timeval_zero(),
1734 if (tevent_req_nomem(subreq, req)) {
1737 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1741 /* There is no conflict, add ourself to the lock_list */
1742 state->lock_list->lock = talloc_realloc(state->lock_list,
1743 state->lock_list->lock,
1745 state->lock_list->num + 1);
1746 if (state->lock_list->lock == NULL) {
1747 tevent_req_error(req, ENOMEM);
1751 lock = &state->lock_list->lock[state->lock_list->num];
1752 lock->type = state->lock_type;
1753 lock->sid = state->my_sid;
1754 state->lock_list->num += 1;
1756 ret = ctdb_g_lock_lock_update(req);
1758 tevent_req_error(req, ret);
1762 TALLOC_FREE(state->h);
1763 tevent_req_done(req);
1766 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1768 struct tevent_req *req = tevent_req_callback_data(
1769 subreq, struct tevent_req);
1770 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1771 req, struct ctdb_g_lock_lock_state);
1772 struct ctdb_reply_control *reply;
1776 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1777 TALLOC_FREE(subreq);
1780 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1781 (char *)state->key.dptr, ret));
1782 tevent_req_error(req, ret);
1786 ret = ctdb_reply_control_process_exists(reply, &value);
1788 tevent_req_error(req, ret);
1794 /* server process exists, need to retry */
1795 TALLOC_FREE(state->h);
1796 subreq = tevent_wakeup_send(state, state->ev,
1797 tevent_timeval_current_ofs(0,1000));
1798 if (tevent_req_nomem(subreq, req)) {
1801 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1805 /* server process does not exist, remove conflicting entry */
1806 state->lock_list->lock[state->current] =
1807 state->lock_list->lock[state->lock_list->num-1];
1808 state->lock_list->num -= 1;
1810 ret = ctdb_g_lock_lock_update(req);
1812 tevent_req_error(req, ret);
1816 ctdb_g_lock_lock_process_locks(req);
1819 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1821 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1822 req, struct ctdb_g_lock_lock_state);
1826 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1827 data.dptr = talloc_size(state, data.dsize);
1828 if (data.dptr == NULL) {
1832 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1833 ret = ctdb_store_record(state->h, data);
1834 talloc_free(data.dptr);
1838 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1840 struct tevent_req *req = tevent_req_callback_data(
1841 subreq, struct tevent_req);
1842 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1843 req, struct ctdb_g_lock_lock_state);
1846 success = tevent_wakeup_recv(subreq);
1847 TALLOC_FREE(subreq);
1849 tevent_req_error(req, ENOMEM);
1853 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1854 state->db, state->key, false);
1855 if (tevent_req_nomem(subreq, req)) {
1858 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1861 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1863 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1864 req, struct ctdb_g_lock_lock_state);
1867 TALLOC_FREE(state->h);
1869 if (tevent_req_is_unix_error(req, &err)) {
1879 struct ctdb_g_lock_unlock_state {
1880 struct tevent_context *ev;
1881 struct ctdb_client_context *client;
1882 struct ctdb_db_context *db;
1884 struct ctdb_server_id my_sid;
1885 struct ctdb_record_handle *h;
1886 struct ctdb_g_lock_list *lock_list;
1889 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1890 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1891 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1893 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1894 struct tevent_context *ev,
1895 struct ctdb_client_context *client,
1896 struct ctdb_db_context *db,
1897 const char *keyname,
1898 struct ctdb_server_id sid)
1900 struct tevent_req *req, *subreq;
1901 struct ctdb_g_lock_unlock_state *state;
1903 req = tevent_req_create(mem_ctx, &state,
1904 struct ctdb_g_lock_unlock_state);
1910 state->client = client;
1912 state->key.dptr = discard_const(keyname);
1913 state->key.dsize = strlen(keyname) + 1;
1914 state->my_sid = sid;
1916 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1918 if (tevent_req_nomem(subreq, req)) {
1919 return tevent_req_post(req, ev);
1921 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1926 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1928 struct tevent_req *req = tevent_req_callback_data(
1929 subreq, struct tevent_req);
1930 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1931 req, struct ctdb_g_lock_unlock_state);
1935 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1936 TALLOC_FREE(subreq);
1937 if (state->h == NULL) {
1938 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1939 (char *)state->key.dptr));
1940 tevent_req_error(req, ret);
1944 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1947 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1948 (char *)state->key.dptr));
1949 tevent_req_error(req, ret);
1953 ret = ctdb_g_lock_unlock_update(req);
1955 tevent_req_error(req, ret);
1959 if (state->lock_list->num == 0) {
1960 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1961 if (tevent_req_nomem(subreq, req)) {
1964 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1969 TALLOC_FREE(state->h);
1970 tevent_req_done(req);
1973 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1975 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1976 req, struct ctdb_g_lock_unlock_state);
1977 struct ctdb_g_lock *lock;
1980 for (i=0; i<state->lock_list->num; i++) {
1981 lock = &state->lock_list->lock[i];
1983 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1988 if (i < state->lock_list->num) {
1989 state->lock_list->lock[i] =
1990 state->lock_list->lock[state->lock_list->num-1];
1991 state->lock_list->num -= 1;
1994 if (state->lock_list->num != 0) {
1997 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1998 data.dptr = talloc_size(state, data.dsize);
1999 if (data.dptr == NULL) {
2003 ctdb_g_lock_list_push(state->lock_list, data.dptr);
2004 ret = ctdb_store_record(state->h, data);
2005 talloc_free(data.dptr);
2014 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2016 struct tevent_req *req = tevent_req_callback_data(
2017 subreq, struct tevent_req);
2018 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2019 req, struct ctdb_g_lock_unlock_state);
2023 status = ctdb_delete_record_recv(subreq, &ret);
2026 ("g_lock_unlock %s delete record failed, ret=%d\n",
2027 (char *)state->key.dptr, ret));
2028 tevent_req_error(req, ret);
2032 TALLOC_FREE(state->h);
2033 tevent_req_done(req);
2036 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2038 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2039 req, struct ctdb_g_lock_unlock_state);
2042 TALLOC_FREE(state->h);
2044 if (tevent_req_is_unix_error(req, &err)) {
2055 * Persistent database functions
2057 struct ctdb_transaction_start_state {
2058 struct tevent_context *ev;
2059 struct ctdb_client_context *client;
2060 struct timeval timeout;
2061 struct ctdb_transaction_handle *h;
2065 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2066 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2068 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2069 struct tevent_context *ev,
2070 struct ctdb_client_context *client,
2071 struct timeval timeout,
2072 struct ctdb_db_context *db,
2075 struct ctdb_transaction_start_state *state;
2076 struct tevent_req *req, *subreq;
2077 struct ctdb_transaction_handle *h;
2079 req = tevent_req_create(mem_ctx, &state,
2080 struct ctdb_transaction_start_state);
2085 if (! db->persistent) {
2086 tevent_req_error(req, EINVAL);
2087 return tevent_req_post(req, ev);
2091 state->client = client;
2092 state->destnode = ctdb_client_pnn(client);
2094 h = talloc_zero(db, struct ctdb_transaction_handle);
2095 if (tevent_req_nomem(h, req)) {
2096 return tevent_req_post(req, ev);
2102 h->readonly = readonly;
2105 /* SRVID is unique for databases, so client can have transactions
2106 * active for multiple databases */
2107 h->sid = ctdb_client_get_server_id(client, db->db_id);
2109 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2110 if (tevent_req_nomem(h->recbuf, req)) {
2111 return tevent_req_post(req, ev);
2114 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2115 if (tevent_req_nomem(h->lock_name, req)) {
2116 return tevent_req_post(req, ev);
2121 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2122 if (tevent_req_nomem(subreq, req)) {
2123 return tevent_req_post(req, ev);
2125 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2130 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2132 struct tevent_req *req = tevent_req_callback_data(
2133 subreq, struct tevent_req);
2134 struct ctdb_transaction_start_state *state = tevent_req_data(
2135 req, struct ctdb_transaction_start_state);
2139 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2140 TALLOC_FREE(subreq);
2143 ("transaction_start: %s attach g_lock.tdb failed\n",
2144 state->h->db->db_name));
2145 tevent_req_error(req, ret);
2149 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2150 state->h->db_g_lock,
2151 state->h->lock_name,
2152 &state->h->sid, state->h->readonly);
2153 if (tevent_req_nomem(subreq, req)) {
2156 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2159 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2161 struct tevent_req *req = tevent_req_callback_data(
2162 subreq, struct tevent_req);
2163 struct ctdb_transaction_start_state *state = tevent_req_data(
2164 req, struct ctdb_transaction_start_state);
2168 status = ctdb_g_lock_lock_recv(subreq, &ret);
2169 TALLOC_FREE(subreq);
2172 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2173 state->h->db->db_name, ret));
2174 tevent_req_error(req, ret);
2178 tevent_req_done(req);
2181 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2182 struct tevent_req *req,
2185 struct ctdb_transaction_start_state *state = tevent_req_data(
2186 req, struct ctdb_transaction_start_state);
2189 if (tevent_req_is_unix_error(req, &err)) {
2199 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2200 struct ctdb_client_context *client,
2201 struct timeval timeout,
2202 struct ctdb_db_context *db, bool readonly,
2203 struct ctdb_transaction_handle **out)
2205 struct tevent_req *req;
2206 struct ctdb_transaction_handle *h;
2209 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2215 tevent_req_poll(req, ev);
2217 h = ctdb_transaction_start_recv(req, &ret);
2226 struct ctdb_transaction_record_fetch_state {
2228 struct ctdb_ltdb_header header;
2232 static int ctdb_transaction_record_fetch_traverse(
2234 struct ctdb_ltdb_header *nullheader,
2235 TDB_DATA key, TDB_DATA data,
2238 struct ctdb_transaction_record_fetch_state *state =
2239 (struct ctdb_transaction_record_fetch_state *)private_data;
2241 if (state->key.dsize == key.dsize &&
2242 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2245 ret = ctdb_ltdb_header_extract(&data, &state->header);
2248 ("record_fetch: Failed to extract header, "
2254 state->found = true;
2260 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2262 struct ctdb_ltdb_header *header,
2265 struct ctdb_transaction_record_fetch_state state;
2269 state.found = false;
2271 ret = ctdb_rec_buffer_traverse(h->recbuf,
2272 ctdb_transaction_record_fetch_traverse,
2279 if (header != NULL) {
2280 *header = state.header;
2291 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2293 TALLOC_CTX *mem_ctx, TDB_DATA *data)
2296 struct ctdb_ltdb_header header;
2299 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2301 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2303 if (data->dptr == NULL) {
2306 data->dsize = tmp_data.dsize;
2310 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2315 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2323 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2324 TDB_DATA key, TDB_DATA data)
2326 TALLOC_CTX *tmp_ctx;
2327 struct ctdb_ltdb_header header;
2335 tmp_ctx = talloc_new(h);
2336 if (tmp_ctx == NULL) {
2340 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2342 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2348 if (old_data.dsize == data.dsize &&
2349 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2350 talloc_free(tmp_ctx);
2354 header.dmaster = ctdb_client_pnn(h->client);
2357 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2358 talloc_free(tmp_ctx);
2367 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2370 return ctdb_transaction_store_record(h, key, tdb_null);
2373 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2376 const char *keyname = CTDB_DB_SEQNUM_KEY;
2378 struct ctdb_ltdb_header header;
2381 key.dptr = discard_const(keyname);
2382 key.dsize = strlen(keyname) + 1;
2384 ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2387 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2388 h->db->db_name, ret));
2392 if (data.dsize == 0) {
2398 if (data.dsize != sizeof(uint64_t)) {
2399 talloc_free(data.dptr);
2403 *seqnum = *(uint64_t *)data.dptr;
2405 talloc_free(data.dptr);
2409 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2412 const char *keyname = CTDB_DB_SEQNUM_KEY;
2415 key.dptr = discard_const(keyname);
2416 key.dsize = strlen(keyname) + 1;
2418 data.dptr = (uint8_t *)&seqnum;
2419 data.dsize = sizeof(seqnum);
2421 return ctdb_transaction_store_record(h, key, data);
2424 struct ctdb_transaction_commit_state {
2425 struct tevent_context *ev;
2426 struct timeval timeout;
2427 struct ctdb_transaction_handle *h;
2431 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2432 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2434 struct tevent_req *ctdb_transaction_commit_send(
2435 TALLOC_CTX *mem_ctx,
2436 struct tevent_context *ev,
2437 struct timeval timeout,
2438 struct ctdb_transaction_handle *h)
2440 struct tevent_req *req, *subreq;
2441 struct ctdb_transaction_commit_state *state;
2442 struct ctdb_req_control request;
2445 req = tevent_req_create(mem_ctx, &state,
2446 struct ctdb_transaction_commit_state);
2452 state->timeout = timeout;
2455 ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2457 tevent_req_error(req, ret);
2458 return tevent_req_post(req, ev);
2461 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2463 tevent_req_error(req, ret);
2464 return tevent_req_post(req, ev);
2467 ctdb_req_control_trans3_commit(&request, h->recbuf);
2468 subreq = ctdb_client_control_send(state, ev, h->client,
2469 ctdb_client_pnn(h->client),
2471 if (tevent_req_nomem(subreq, req)) {
2472 return tevent_req_post(req, ev);
2474 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2479 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2481 struct tevent_req *req = tevent_req_callback_data(
2482 subreq, struct tevent_req);
2483 struct ctdb_transaction_commit_state *state = tevent_req_data(
2484 req, struct ctdb_transaction_commit_state);
2485 struct ctdb_transaction_handle *h = state->h;
2486 struct ctdb_reply_control *reply;
2491 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2492 TALLOC_FREE(subreq);
2495 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2496 h->db->db_name, ret));
2497 tevent_req_error(req, ret);
2501 ret = ctdb_reply_control_trans3_commit(reply);
2505 /* Control failed due to recovery */
2507 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2509 tevent_req_error(req, ret);
2513 if (seqnum == state->seqnum) {
2514 struct ctdb_req_control request;
2517 ctdb_req_control_trans3_commit(&request,
2519 subreq = ctdb_client_control_send(
2520 state, state->ev, state->h->client,
2521 ctdb_client_pnn(state->h->client),
2522 state->timeout, &request);
2523 if (tevent_req_nomem(subreq, req)) {
2526 tevent_req_set_callback(subreq,
2527 ctdb_transaction_commit_done,
2532 if (seqnum != state->seqnum + 1) {
2534 ("transaction_commit: %s seqnum mismatch "
2535 "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2536 state->h->db->db_name, seqnum, state->seqnum));
2537 tevent_req_error(req, EIO);
2542 /* trans3_commit successful */
2543 subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2544 h->db_g_lock, h->lock_name, h->sid);
2545 if (tevent_req_nomem(subreq, req)) {
2548 tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2552 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2554 struct tevent_req *req = tevent_req_callback_data(
2555 subreq, struct tevent_req);
2556 struct ctdb_transaction_commit_state *state = tevent_req_data(
2557 req, struct ctdb_transaction_commit_state);
2561 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2562 TALLOC_FREE(subreq);
2565 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2566 state->h->db->db_name, ret));
2567 tevent_req_error(req, ret);
2571 talloc_free(state->h);
2572 tevent_req_done(req);
2575 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2579 if (tevent_req_is_unix_error(req, &err)) {
2589 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2591 struct tevent_context *ev = h->ev;
2592 TALLOC_CTX *mem_ctx;
2593 struct tevent_req *req;
2597 if (h->readonly || ! h->updated) {
2598 return ctdb_transaction_cancel(h);
2601 mem_ctx = talloc_new(NULL);
2602 if (mem_ctx == NULL) {
2606 req = ctdb_transaction_commit_send(mem_ctx, ev,
2607 tevent_timeval_zero(), h);
2609 talloc_free(mem_ctx);
2613 tevent_req_poll(req, ev);
2615 status = ctdb_transaction_commit_recv(req, &ret);
2617 talloc_free(mem_ctx);
2621 talloc_free(mem_ctx);
2625 struct ctdb_transaction_cancel_state {
2626 struct tevent_context *ev;
2627 struct ctdb_transaction_handle *h;
2628 struct timeval timeout;
2631 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2633 struct tevent_req *ctdb_transaction_cancel_send(
2634 TALLOC_CTX *mem_ctx,
2635 struct tevent_context *ev,
2636 struct timeval timeout,
2637 struct ctdb_transaction_handle *h)
2639 struct tevent_req *req, *subreq;
2640 struct ctdb_transaction_cancel_state *state;
2642 req = tevent_req_create(mem_ctx, &state,
2643 struct ctdb_transaction_cancel_state);
2650 state->timeout = timeout;
2652 subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2653 state->h->db_g_lock,
2654 state->h->lock_name, state->h->sid);
2655 if (tevent_req_nomem(subreq, req)) {
2656 return tevent_req_post(req, ev);
2658 tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2664 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2666 struct tevent_req *req = tevent_req_callback_data(
2667 subreq, struct tevent_req);
2668 struct ctdb_transaction_cancel_state *state = tevent_req_data(
2669 req, struct ctdb_transaction_cancel_state);
2673 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2674 TALLOC_FREE(subreq);
2677 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2678 state->h->db->db_name, ret));
2679 talloc_free(state->h);
2680 tevent_req_error(req, ret);
2684 talloc_free(state->h);
2685 tevent_req_done(req);
2688 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2692 if (tevent_req_is_unix_error(req, &err)) {
2702 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2704 struct tevent_context *ev = h->ev;
2705 struct tevent_req *req;
2706 TALLOC_CTX *mem_ctx;
2710 mem_ctx = talloc_new(NULL);
2711 if (mem_ctx == NULL) {
2716 req = ctdb_transaction_cancel_send(mem_ctx, ev,
2717 tevent_timeval_zero(), h);
2719 talloc_free(mem_ctx);
2724 tevent_req_poll(req, ev);
2726 status = ctdb_transaction_cancel_recv(req, &ret);
2728 talloc_free(mem_ctx);
2732 talloc_free(mem_ctx);
2739 * In future Samba should register SERVER_ID.
2740 * Make that structure same as struct srvid {}.