2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
38 #include "common/logging.h"
40 static int recover_timeout = 30;
44 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
46 #define LOG(...) DEBUG(DEBUG_NOTICE, (__VA_ARGS__))
52 static bool generic_recv(struct tevent_req *req, int *perr)
56 if (tevent_req_is_unix_error(req, &err)) {
66 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
68 static uint64_t srvid_next(void)
75 * Recovery database functions
78 struct recdb_context {
86 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
89 uint32_t hash_size, bool persistent)
91 static char *db_dir_state = NULL;
92 struct recdb_context *recdb;
93 unsigned int tdb_flags;
95 recdb = talloc(mem_ctx, struct recdb_context);
100 if (db_dir_state == NULL) {
101 db_dir_state = getenv("CTDB_DBDIR_STATE");
104 recdb->db_name = db_name;
105 recdb->db_id = db_id;
106 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
107 db_dir_state != NULL ?
109 dirname(discard_const(db_path)),
111 if (recdb->db_path == NULL) {
115 unlink(recdb->db_path);
117 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
118 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
119 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
120 if (recdb->db == NULL) {
122 LOG("failed to create recovery db %s\n", recdb->db_path);
126 recdb->persistent = persistent;
131 static uint32_t recdb_id(struct recdb_context *recdb)
136 static const char *recdb_name(struct recdb_context *recdb)
138 return recdb->db_name;
141 static const char *recdb_path(struct recdb_context *recdb)
143 return recdb->db_path;
146 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
148 return recdb->db->tdb;
151 static bool recdb_persistent(struct recdb_context *recdb)
153 return recdb->persistent;
156 struct recdb_add_traverse_state {
157 struct recdb_context *recdb;
161 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
162 TDB_DATA key, TDB_DATA data,
165 struct recdb_add_traverse_state *state =
166 (struct recdb_add_traverse_state *)private_data;
167 struct ctdb_ltdb_header *hdr;
171 /* header is not marshalled separately in the pulldb control */
172 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
176 hdr = (struct ctdb_ltdb_header *)data.dptr;
178 /* fetch the existing record, if any */
179 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
181 if (prev_data.dptr != NULL) {
182 struct ctdb_ltdb_header prev_hdr;
184 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
185 free(prev_data.dptr);
186 if (hdr->rsn < prev_hdr.rsn ||
187 (hdr->rsn == prev_hdr.rsn &&
188 prev_hdr.dmaster != state->mypnn)) {
193 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
200 static bool recdb_add(struct recdb_context *recdb, int mypnn,
201 struct ctdb_rec_buffer *recbuf)
203 struct recdb_add_traverse_state state;
209 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
217 /* This function decides which records from recdb are retained */
218 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
219 uint32_t reqid, uint32_t dmaster,
220 TDB_DATA key, TDB_DATA data)
222 struct ctdb_ltdb_header *header;
226 * skip empty records - but NOT for persistent databases:
228 * The record-by-record mode of recovery deletes empty records.
229 * For persistent databases, this can lead to data corruption
230 * by deleting records that should be there:
232 * - Assume the cluster has been running for a while.
234 * - A record R in a persistent database has been created and
235 * deleted a couple of times, the last operation being deletion,
236 * leaving an empty record with a high RSN, say 10.
238 * - Now a node N is turned off.
240 * - This leaves the local database copy of D on N with the empty
241 * copy of R and RSN 10. On all other nodes, the recovery has deleted
242 * the copy of record R.
244 * - Now the record is created again while node N is turned off.
245 * This creates R with RSN = 1 on all nodes except for N.
247 * - Now node N is turned on again. The following recovery will chose
248 * the older empty copy of R due to RSN 10 > RSN 1.
250 * ==> Hence the record is gone after the recovery.
252 * On databases like Samba's registry, this can damage the higher-level
253 * data structures built from the various tdb-level records.
255 if (!persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
259 /* update the dmaster field to point to us */
260 header = (struct ctdb_ltdb_header *)data.dptr;
262 header->dmaster = dmaster;
263 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
266 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
274 struct recdb_records_traverse_state {
275 struct ctdb_rec_buffer *recbuf;
282 static int recdb_records_traverse(struct tdb_context *tdb,
283 TDB_DATA key, TDB_DATA data,
286 struct recdb_records_traverse_state *state =
287 (struct recdb_records_traverse_state *)private_data;
290 ret = recbuf_filter_add(state->recbuf, state->persistent,
291 state->reqid, state->dmaster, key, data);
293 state->failed = true;
300 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
304 struct recdb_records_traverse_state state;
307 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
308 if (state.recbuf == NULL) {
311 state.dmaster = dmaster;
313 state.persistent = recdb_persistent(recdb);
314 state.failed = false;
316 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
318 if (ret == -1 || state.failed) {
319 LOG("Failed to marshall recovery records for %s\n",
321 TALLOC_FREE(state.recbuf);
328 struct recdb_file_traverse_state {
329 struct ctdb_rec_buffer *recbuf;
330 struct recdb_context *recdb;
341 static int recdb_file_traverse(struct tdb_context *tdb,
342 TDB_DATA key, TDB_DATA data,
345 struct recdb_file_traverse_state *state =
346 (struct recdb_file_traverse_state *)private_data;
349 ret = recbuf_filter_add(state->recbuf, state->persistent,
350 state->reqid, state->dmaster, key, data);
352 state->failed = true;
356 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
357 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
359 LOG("Failed to collect recovery records for %s\n",
360 recdb_name(state->recdb));
361 state->failed = true;
365 state->num_buffers += 1;
367 TALLOC_FREE(state->recbuf);
368 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
369 recdb_id(state->recdb));
370 if (state->recbuf == NULL) {
371 state->failed = true;
379 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
380 uint32_t dmaster, int fd, int max_size)
382 struct recdb_file_traverse_state state;
385 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
386 if (state.recbuf == NULL) {
390 state.mem_ctx = mem_ctx;
391 state.dmaster = dmaster;
393 state.persistent = recdb_persistent(recdb);
394 state.failed = false;
396 state.max_size = max_size;
397 state.num_buffers = 0;
399 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
400 if (ret == -1 || state.failed) {
401 TALLOC_FREE(state.recbuf);
405 ret = ctdb_rec_buffer_write(state.recbuf, fd);
407 LOG("Failed to collect recovery records for %s\n",
409 TALLOC_FREE(state.recbuf);
412 state.num_buffers += 1;
414 LOG("Wrote %d buffers of recovery records for %s\n",
415 state.num_buffers, recdb_name(recdb));
417 return state.num_buffers;
421 * Pull database from a single node
424 struct pull_database_state {
425 struct tevent_context *ev;
426 struct ctdb_client_context *client;
427 struct recdb_context *recdb;
433 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
435 static void pull_database_register_done(struct tevent_req *subreq);
436 static void pull_database_old_done(struct tevent_req *subreq);
437 static void pull_database_unregister_done(struct tevent_req *subreq);
438 static void pull_database_new_done(struct tevent_req *subreq);
440 static struct tevent_req *pull_database_send(
442 struct tevent_context *ev,
443 struct ctdb_client_context *client,
444 uint32_t pnn, uint32_t caps,
445 struct recdb_context *recdb)
447 struct tevent_req *req, *subreq;
448 struct pull_database_state *state;
449 struct ctdb_req_control request;
451 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
457 state->client = client;
458 state->recdb = recdb;
460 state->srvid = srvid_next();
462 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
463 subreq = ctdb_client_set_message_handler_send(
464 state, state->ev, state->client,
465 state->srvid, pull_database_handler,
467 if (tevent_req_nomem(subreq, req)) {
468 return tevent_req_post(req, ev);
471 tevent_req_set_callback(subreq, pull_database_register_done,
475 struct ctdb_pulldb pulldb;
477 pulldb.db_id = recdb_id(recdb);
478 pulldb.lmaster = CTDB_LMASTER_ANY;
480 ctdb_req_control_pull_db(&request, &pulldb);
481 subreq = ctdb_client_control_send(state, state->ev,
485 if (tevent_req_nomem(subreq, req)) {
486 return tevent_req_post(req, ev);
488 tevent_req_set_callback(subreq, pull_database_old_done, req);
494 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
497 struct tevent_req *req = talloc_get_type_abort(
498 private_data, struct tevent_req);
499 struct pull_database_state *state = tevent_req_data(
500 req, struct pull_database_state);
501 struct ctdb_rec_buffer *recbuf;
505 if (srvid != state->srvid) {
509 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
511 LOG("Invalid data received for DB_PULL messages\n");
515 if (recbuf->db_id != recdb_id(state->recdb)) {
517 LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
518 recbuf->db_id, recdb_name(state->recdb));
522 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
526 LOG("Failed to add records to recdb for %s\n",
527 recdb_name(state->recdb));
531 state->num_records += recbuf->count;
535 static void pull_database_register_done(struct tevent_req *subreq)
537 struct tevent_req *req = tevent_req_callback_data(
538 subreq, struct tevent_req);
539 struct pull_database_state *state = tevent_req_data(
540 req, struct pull_database_state);
541 struct ctdb_req_control request;
542 struct ctdb_pulldb_ext pulldb_ext;
546 status = ctdb_client_set_message_handler_recv(subreq, &ret);
549 LOG("failed to set message handler for DB_PULL for %s\n",
550 recdb_name(state->recdb));
551 tevent_req_error(req, ret);
555 pulldb_ext.db_id = recdb_id(state->recdb);
556 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
557 pulldb_ext.srvid = state->srvid;
559 ctdb_req_control_db_pull(&request, &pulldb_ext);
560 subreq = ctdb_client_control_send(state, state->ev, state->client,
561 state->pnn, TIMEOUT(), &request);
562 if (tevent_req_nomem(subreq, req)) {
565 tevent_req_set_callback(subreq, pull_database_new_done, req);
568 static void pull_database_old_done(struct tevent_req *subreq)
570 struct tevent_req *req = tevent_req_callback_data(
571 subreq, struct tevent_req);
572 struct pull_database_state *state = tevent_req_data(
573 req, struct pull_database_state);
574 struct ctdb_reply_control *reply;
575 struct ctdb_rec_buffer *recbuf;
579 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
582 LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
583 recdb_name(state->recdb), state->pnn, ret);
584 tevent_req_error(req, ret);
588 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
591 tevent_req_error(req, ret);
595 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
599 tevent_req_error(req, EIO);
603 state->num_records = recbuf->count;
606 LOG("Pulled %d records for db %s from node %d\n",
607 state->num_records, recdb_name(state->recdb), state->pnn);
609 tevent_req_done(req);
612 static void pull_database_new_done(struct tevent_req *subreq)
614 struct tevent_req *req = tevent_req_callback_data(
615 subreq, struct tevent_req);
616 struct pull_database_state *state = tevent_req_data(
617 req, struct pull_database_state);
618 struct ctdb_reply_control *reply;
619 uint32_t num_records;
623 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
626 LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
627 recdb_name(state->recdb), state->pnn, ret);
628 tevent_req_error(req, ret);
632 ret = ctdb_reply_control_db_pull(reply, &num_records);
634 if (num_records != state->num_records) {
635 LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
636 num_records, state->num_records, recdb_name(state->recdb));
637 tevent_req_error(req, EIO);
641 LOG("Pulled %d records for db %s from node %d\n",
642 state->num_records, recdb_name(state->recdb), state->pnn);
644 subreq = ctdb_client_remove_message_handler_send(
645 state, state->ev, state->client,
647 if (tevent_req_nomem(subreq, req)) {
650 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
653 static void pull_database_unregister_done(struct tevent_req *subreq)
655 struct tevent_req *req = tevent_req_callback_data(
656 subreq, struct tevent_req);
657 struct pull_database_state *state = tevent_req_data(
658 req, struct pull_database_state);
662 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
665 LOG("failed to remove message handler for DB_PULL for %s\n",
666 recdb_name(state->recdb));
667 tevent_req_error(req, ret);
671 tevent_req_done(req);
674 static bool pull_database_recv(struct tevent_req *req, int *perr)
676 return generic_recv(req, perr);
680 * Push database to specified nodes (old style)
683 struct push_database_old_state {
684 struct tevent_context *ev;
685 struct ctdb_client_context *client;
686 struct recdb_context *recdb;
689 struct ctdb_rec_buffer *recbuf;
693 static void push_database_old_push_done(struct tevent_req *subreq);
695 static struct tevent_req *push_database_old_send(
697 struct tevent_context *ev,
698 struct ctdb_client_context *client,
699 uint32_t *pnn_list, int count,
700 struct recdb_context *recdb)
702 struct tevent_req *req, *subreq;
703 struct push_database_old_state *state;
704 struct ctdb_req_control request;
707 req = tevent_req_create(mem_ctx, &state,
708 struct push_database_old_state);
714 state->client = client;
715 state->recdb = recdb;
716 state->pnn_list = pnn_list;
717 state->count = count;
720 state->recbuf = recdb_records(recdb, state,
721 ctdb_client_pnn(client));
722 if (tevent_req_nomem(state->recbuf, req)) {
723 return tevent_req_post(req, ev);
726 pnn = state->pnn_list[state->index];
728 ctdb_req_control_push_db(&request, state->recbuf);
729 subreq = ctdb_client_control_send(state, ev, client, pnn,
730 TIMEOUT(), &request);
731 if (tevent_req_nomem(subreq, req)) {
732 return tevent_req_post(req, ev);
734 tevent_req_set_callback(subreq, push_database_old_push_done, req);
739 static void push_database_old_push_done(struct tevent_req *subreq)
741 struct tevent_req *req = tevent_req_callback_data(
742 subreq, struct tevent_req);
743 struct push_database_old_state *state = tevent_req_data(
744 req, struct push_database_old_state);
745 struct ctdb_req_control request;
750 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
753 LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
754 recdb_name(state->recdb), state->pnn_list[state->index],
756 tevent_req_error(req, ret);
761 if (state->index == state->count) {
762 TALLOC_FREE(state->recbuf);
763 tevent_req_done(req);
767 pnn = state->pnn_list[state->index];
769 ctdb_req_control_push_db(&request, state->recbuf);
770 subreq = ctdb_client_control_send(state, state->ev, state->client,
771 pnn, TIMEOUT(), &request);
772 if (tevent_req_nomem(subreq, req)) {
775 tevent_req_set_callback(subreq, push_database_old_push_done, req);
778 static bool push_database_old_recv(struct tevent_req *req, int *perr)
780 return generic_recv(req, perr);
784 * Push database to specified nodes (new style)
787 struct push_database_new_state {
788 struct tevent_context *ev;
789 struct ctdb_client_context *client;
790 struct recdb_context *recdb;
797 int num_buffers_sent;
801 static void push_database_new_started(struct tevent_req *subreq);
802 static void push_database_new_send_msg(struct tevent_req *req);
803 static void push_database_new_send_done(struct tevent_req *subreq);
804 static void push_database_new_confirmed(struct tevent_req *subreq);
806 static struct tevent_req *push_database_new_send(
808 struct tevent_context *ev,
809 struct ctdb_client_context *client,
810 uint32_t *pnn_list, int count,
811 struct recdb_context *recdb,
814 struct tevent_req *req, *subreq;
815 struct push_database_new_state *state;
816 struct ctdb_req_control request;
817 struct ctdb_pulldb_ext pulldb_ext;
821 req = tevent_req_create(mem_ctx, &state,
822 struct push_database_new_state);
828 state->client = client;
829 state->recdb = recdb;
830 state->pnn_list = pnn_list;
831 state->count = count;
833 state->srvid = srvid_next();
834 state->dmaster = ctdb_client_pnn(client);
835 state->num_buffers_sent = 0;
836 state->num_records = 0;
838 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
839 if (tevent_req_nomem(filename, req)) {
840 return tevent_req_post(req, ev);
843 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
844 if (state->fd == -1) {
845 tevent_req_error(req, errno);
846 return tevent_req_post(req, ev);
849 talloc_free(filename);
851 state->num_buffers = recdb_file(recdb, state, state->dmaster,
852 state->fd, max_size);
853 if (state->num_buffers == -1) {
854 tevent_req_error(req, ENOMEM);
855 return tevent_req_post(req, ev);
858 offset = lseek(state->fd, 0, SEEK_SET);
860 tevent_req_error(req, EIO);
861 return tevent_req_post(req, ev);
864 pulldb_ext.db_id = recdb_id(recdb);
865 pulldb_ext.srvid = state->srvid;
867 ctdb_req_control_db_push_start(&request, &pulldb_ext);
868 subreq = ctdb_client_control_multi_send(state, ev, client,
870 TIMEOUT(), &request);
871 if (tevent_req_nomem(subreq, req)) {
872 return tevent_req_post(req, ev);
874 tevent_req_set_callback(subreq, push_database_new_started, req);
879 static void push_database_new_started(struct tevent_req *subreq)
881 struct tevent_req *req = tevent_req_callback_data(
882 subreq, struct tevent_req);
883 struct push_database_new_state *state = tevent_req_data(
884 req, struct push_database_new_state);
889 status = ctdb_client_control_multi_recv(subreq, &ret, state,
896 ret2 = ctdb_client_control_multi_error(state->pnn_list,
900 LOG("control DB_PUSH_START failed for db %s "
901 "on node %u, ret=%d\n",
902 recdb_name(state->recdb), pnn, ret2);
904 LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
905 recdb_name(state->recdb), ret);
907 talloc_free(err_list);
909 tevent_req_error(req, ret);
913 push_database_new_send_msg(req);
916 static void push_database_new_send_msg(struct tevent_req *req)
918 struct push_database_new_state *state = tevent_req_data(
919 req, struct push_database_new_state);
920 struct tevent_req *subreq;
921 struct ctdb_rec_buffer *recbuf;
922 struct ctdb_req_message message;
926 if (state->num_buffers_sent == state->num_buffers) {
927 struct ctdb_req_control request;
929 ctdb_req_control_db_push_confirm(&request,
930 recdb_id(state->recdb));
931 subreq = ctdb_client_control_multi_send(state, state->ev,
935 TIMEOUT(), &request);
936 if (tevent_req_nomem(subreq, req)) {
939 tevent_req_set_callback(subreq, push_database_new_confirmed,
944 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
946 tevent_req_error(req, ret);
950 data.dsize = ctdb_rec_buffer_len(recbuf);
951 data.dptr = talloc_size(state, data.dsize);
952 if (tevent_req_nomem(data.dptr, req)) {
956 ctdb_rec_buffer_push(recbuf, data.dptr);
958 message.srvid = state->srvid;
959 message.data.data = data;
961 LOG("Pushing buffer %d with %d records for %s\n",
962 state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
964 subreq = ctdb_client_message_multi_send(state, state->ev,
966 state->pnn_list, state->count,
968 if (tevent_req_nomem(subreq, req)) {
971 tevent_req_set_callback(subreq, push_database_new_send_done, req);
973 state->num_records += recbuf->count;
975 talloc_free(data.dptr);
979 static void push_database_new_send_done(struct tevent_req *subreq)
981 struct tevent_req *req = tevent_req_callback_data(
982 subreq, struct tevent_req);
983 struct push_database_new_state *state = tevent_req_data(
984 req, struct push_database_new_state);
988 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
991 LOG("Sending recovery records failed for %s\n",
992 recdb_name(state->recdb));
993 tevent_req_error(req, ret);
997 state->num_buffers_sent += 1;
999 push_database_new_send_msg(req);
1002 static void push_database_new_confirmed(struct tevent_req *subreq)
1004 struct tevent_req *req = tevent_req_callback_data(
1005 subreq, struct tevent_req);
1006 struct push_database_new_state *state = tevent_req_data(
1007 req, struct push_database_new_state);
1008 struct ctdb_reply_control **reply;
1012 uint32_t num_records;
1014 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1016 TALLOC_FREE(subreq);
1021 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1022 state->count, err_list,
1025 LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
1026 " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1028 LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
1029 recdb_name(state->recdb), ret);
1031 tevent_req_error(req, ret);
1035 for (i=0; i<state->count; i++) {
1036 ret = ctdb_reply_control_db_push_confirm(reply[i],
1039 tevent_req_error(req, EPROTO);
1043 if (num_records != state->num_records) {
1044 LOG("Node %u received %d of %d records for %s\n",
1045 state->pnn_list[i], num_records,
1046 state->num_records, recdb_name(state->recdb));
1047 tevent_req_error(req, EPROTO);
1054 LOG("Pushed %d records for db %s\n",
1055 state->num_records, recdb_name(state->recdb));
1057 tevent_req_done(req);
1060 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1062 return generic_recv(req, perr);
1066 * wrapper for push_database_old and push_database_new
1069 struct push_database_state {
1070 bool old_done, new_done;
1073 static void push_database_old_done(struct tevent_req *subreq);
1074 static void push_database_new_done(struct tevent_req *subreq);
1076 static struct tevent_req *push_database_send(
1077 TALLOC_CTX *mem_ctx,
1078 struct tevent_context *ev,
1079 struct ctdb_client_context *client,
1080 uint32_t *pnn_list, int count, uint32_t *caps,
1081 struct ctdb_tunable_list *tun_list,
1082 struct recdb_context *recdb)
1084 struct tevent_req *req, *subreq;
1085 struct push_database_state *state;
1086 uint32_t *old_list, *new_list;
1087 int old_count, new_count;
1090 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1095 state->old_done = false;
1096 state->new_done = false;
1100 old_list = talloc_array(state, uint32_t, count);
1101 new_list = talloc_array(state, uint32_t, count);
1102 if (tevent_req_nomem(old_list, req) ||
1103 tevent_req_nomem(new_list,req)) {
1104 return tevent_req_post(req, ev);
1107 for (i=0; i<count; i++) {
1108 uint32_t pnn = pnn_list[i];
1110 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1111 new_list[new_count] = pnn;
1114 old_list[old_count] = pnn;
1119 if (old_count > 0) {
1120 subreq = push_database_old_send(state, ev, client,
1121 old_list, old_count, recdb);
1122 if (tevent_req_nomem(subreq, req)) {
1123 return tevent_req_post(req, ev);
1125 tevent_req_set_callback(subreq, push_database_old_done, req);
1127 state->old_done = true;
1130 if (new_count > 0) {
1131 subreq = push_database_new_send(state, ev, client,
1132 new_list, new_count, recdb,
1133 tun_list->rec_buffer_size_limit);
1134 if (tevent_req_nomem(subreq, req)) {
1135 return tevent_req_post(req, ev);
1137 tevent_req_set_callback(subreq, push_database_new_done, req);
1139 state->new_done = true;
1145 static void push_database_old_done(struct tevent_req *subreq)
1147 struct tevent_req *req = tevent_req_callback_data(
1148 subreq, struct tevent_req);
1149 struct push_database_state *state = tevent_req_data(
1150 req, struct push_database_state);
1154 status = push_database_old_recv(subreq, &ret);
1156 tevent_req_error(req, ret);
1160 state->old_done = true;
1162 if (state->old_done && state->new_done) {
1163 tevent_req_done(req);
1167 static void push_database_new_done(struct tevent_req *subreq)
1169 struct tevent_req *req = tevent_req_callback_data(
1170 subreq, struct tevent_req);
1171 struct push_database_state *state = tevent_req_data(
1172 req, struct push_database_state);
1176 status = push_database_new_recv(subreq, &ret);
1178 tevent_req_error(req, ret);
1182 state->new_done = true;
1184 if (state->old_done && state->new_done) {
1185 tevent_req_done(req);
1189 static bool push_database_recv(struct tevent_req *req, int *perr)
1191 return generic_recv(req, perr);
1195 * Collect databases using highest sequence number
1198 struct collect_highseqnum_db_state {
1199 struct tevent_context *ev;
1200 struct ctdb_client_context *client;
1204 uint32_t *ban_credits;
1206 struct recdb_context *recdb;
1210 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1211 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1213 static struct tevent_req *collect_highseqnum_db_send(
1214 TALLOC_CTX *mem_ctx,
1215 struct tevent_context *ev,
1216 struct ctdb_client_context *client,
1217 uint32_t *pnn_list, int count, uint32_t *caps,
1218 uint32_t *ban_credits, uint32_t db_id,
1219 struct recdb_context *recdb)
1221 struct tevent_req *req, *subreq;
1222 struct collect_highseqnum_db_state *state;
1223 struct ctdb_req_control request;
1225 req = tevent_req_create(mem_ctx, &state,
1226 struct collect_highseqnum_db_state);
1232 state->client = client;
1233 state->pnn_list = pnn_list;
1234 state->count = count;
1236 state->ban_credits = ban_credits;
1237 state->db_id = db_id;
1238 state->recdb = recdb;
1240 ctdb_req_control_get_db_seqnum(&request, db_id);
1241 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1242 state->pnn_list, state->count,
1243 TIMEOUT(), &request);
1244 if (tevent_req_nomem(subreq, req)) {
1245 return tevent_req_post(req, ev);
1247 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1253 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1255 struct tevent_req *req = tevent_req_callback_data(
1256 subreq, struct tevent_req);
1257 struct collect_highseqnum_db_state *state = tevent_req_data(
1258 req, struct collect_highseqnum_db_state);
1259 struct ctdb_reply_control **reply;
1263 uint64_t seqnum, max_seqnum;
1265 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1267 TALLOC_FREE(subreq);
1272 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1273 state->count, err_list,
1276 LOG("control GET_DB_SEQNUM failed for %s on node %u,"
1277 " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1279 LOG("control GET_DB_SEQNUM failed for %s, ret=%d\n",
1280 recdb_name(state->recdb), ret);
1282 tevent_req_error(req, ret);
1287 state->max_pnn = state->pnn_list[0];
1288 for (i=0; i<state->count; i++) {
1289 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1291 tevent_req_error(req, EPROTO);
1295 if (max_seqnum < seqnum) {
1296 max_seqnum = seqnum;
1297 state->max_pnn = state->pnn_list[i];
1303 LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1304 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1306 subreq = pull_database_send(state, state->ev, state->client,
1308 state->caps[state->max_pnn],
1310 if (tevent_req_nomem(subreq, req)) {
1313 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1317 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1319 struct tevent_req *req = tevent_req_callback_data(
1320 subreq, struct tevent_req);
1321 struct collect_highseqnum_db_state *state = tevent_req_data(
1322 req, struct collect_highseqnum_db_state);
1326 status = pull_database_recv(subreq, &ret);
1327 TALLOC_FREE(subreq);
1329 state->ban_credits[state->max_pnn] += 1;
1330 tevent_req_error(req, ret);
1334 tevent_req_done(req);
1337 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1339 return generic_recv(req, perr);
1343 * Collect all databases
1346 struct collect_all_db_state {
1347 struct tevent_context *ev;
1348 struct ctdb_client_context *client;
1352 uint32_t *ban_credits;
1354 struct recdb_context *recdb;
1355 struct ctdb_pulldb pulldb;
1359 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1361 static struct tevent_req *collect_all_db_send(
1362 TALLOC_CTX *mem_ctx,
1363 struct tevent_context *ev,
1364 struct ctdb_client_context *client,
1365 uint32_t *pnn_list, int count, uint32_t *caps,
1366 uint32_t *ban_credits, uint32_t db_id,
1367 struct recdb_context *recdb)
1369 struct tevent_req *req, *subreq;
1370 struct collect_all_db_state *state;
1373 req = tevent_req_create(mem_ctx, &state,
1374 struct collect_all_db_state);
1380 state->client = client;
1381 state->pnn_list = pnn_list;
1382 state->count = count;
1384 state->ban_credits = ban_credits;
1385 state->db_id = db_id;
1386 state->recdb = recdb;
1389 pnn = state->pnn_list[state->index];
1391 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1392 if (tevent_req_nomem(subreq, req)) {
1393 return tevent_req_post(req, ev);
1395 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1400 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1402 struct tevent_req *req = tevent_req_callback_data(
1403 subreq, struct tevent_req);
1404 struct collect_all_db_state *state = tevent_req_data(
1405 req, struct collect_all_db_state);
1410 status = pull_database_recv(subreq, &ret);
1411 TALLOC_FREE(subreq);
1413 pnn = state->pnn_list[state->index];
1414 state->ban_credits[pnn] += 1;
1415 tevent_req_error(req, ret);
1420 if (state->index == state->count) {
1421 tevent_req_done(req);
1425 pnn = state->pnn_list[state->index];
1426 subreq = pull_database_send(state, state->ev, state->client,
1427 pnn, state->caps[pnn], state->recdb);
1428 if (tevent_req_nomem(subreq, req)) {
1431 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1434 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1436 return generic_recv(req, perr);
1441 * For each database do the following:
1444 * - Freeze database on all nodes
1445 * - Start transaction on all nodes
1446 * - Collect database from all nodes
1447 * - Wipe database on all nodes
1448 * - Push database to all nodes
1449 * - Commit transaction on all nodes
1450 * - Thaw database on all nodes
1453 struct recover_db_state {
1454 struct tevent_context *ev;
1455 struct ctdb_client_context *client;
1456 struct ctdb_tunable_list *tun_list;
1460 uint32_t *ban_credits;
1465 struct ctdb_transdb transdb;
1467 const char *db_name, *db_path;
1468 struct recdb_context *recdb;
1471 static void recover_db_name_done(struct tevent_req *subreq);
1472 static void recover_db_path_done(struct tevent_req *subreq);
1473 static void recover_db_freeze_done(struct tevent_req *subreq);
1474 static void recover_db_transaction_started(struct tevent_req *subreq);
1475 static void recover_db_collect_done(struct tevent_req *subreq);
1476 static void recover_db_wipedb_done(struct tevent_req *subreq);
1477 static void recover_db_pushdb_done(struct tevent_req *subreq);
1478 static void recover_db_transaction_committed(struct tevent_req *subreq);
1479 static void recover_db_thaw_done(struct tevent_req *subreq);
1481 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1482 struct tevent_context *ev,
1483 struct ctdb_client_context *client,
1484 struct ctdb_tunable_list *tun_list,
1485 uint32_t *pnn_list, int count,
1487 uint32_t *ban_credits,
1488 uint32_t generation,
1489 uint32_t db_id, bool persistent)
1491 struct tevent_req *req, *subreq;
1492 struct recover_db_state *state;
1493 struct ctdb_req_control request;
1495 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1501 state->client = client;
1502 state->tun_list = tun_list;
1503 state->pnn_list = pnn_list;
1504 state->count = count;
1506 state->ban_credits = ban_credits;
1507 state->db_id = db_id;
1508 state->persistent = persistent;
1510 state->destnode = ctdb_client_pnn(client);
1511 state->transdb.db_id = db_id;
1512 state->transdb.tid = generation;
1514 ctdb_req_control_get_dbname(&request, db_id);
1515 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1516 TIMEOUT(), &request);
1517 if (tevent_req_nomem(subreq, req)) {
1518 return tevent_req_post(req, ev);
1520 tevent_req_set_callback(subreq, recover_db_name_done, req);
1525 static void recover_db_name_done(struct tevent_req *subreq)
1527 struct tevent_req *req = tevent_req_callback_data(
1528 subreq, struct tevent_req);
1529 struct recover_db_state *state = tevent_req_data(
1530 req, struct recover_db_state);
1531 struct ctdb_reply_control *reply;
1532 struct ctdb_req_control request;
1536 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1537 TALLOC_FREE(subreq);
1539 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1541 tevent_req_error(req, ret);
1545 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1547 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1549 tevent_req_error(req, EPROTO);
1555 ctdb_req_control_getdbpath(&request, state->db_id);
1556 subreq = ctdb_client_control_send(state, state->ev, state->client,
1557 state->destnode, TIMEOUT(),
1559 if (tevent_req_nomem(subreq, req)) {
1562 tevent_req_set_callback(subreq, recover_db_path_done, req);
1565 static void recover_db_path_done(struct tevent_req *subreq)
1567 struct tevent_req *req = tevent_req_callback_data(
1568 subreq, struct tevent_req);
1569 struct recover_db_state *state = tevent_req_data(
1570 req, struct recover_db_state);
1571 struct ctdb_reply_control *reply;
1572 struct ctdb_req_control request;
1576 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1577 TALLOC_FREE(subreq);
1579 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1580 state->db_name, ret);
1581 tevent_req_error(req, ret);
1585 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1587 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1588 state->db_name, ret);
1589 tevent_req_error(req, EPROTO);
1595 ctdb_req_control_db_freeze(&request, state->db_id);
1596 subreq = ctdb_client_control_multi_send(state, state->ev,
1598 state->pnn_list, state->count,
1599 TIMEOUT(), &request);
1600 if (tevent_req_nomem(subreq, req)) {
1603 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1606 static void recover_db_freeze_done(struct tevent_req *subreq)
1608 struct tevent_req *req = tevent_req_callback_data(
1609 subreq, struct tevent_req);
1610 struct recover_db_state *state = tevent_req_data(
1611 req, struct recover_db_state);
1612 struct ctdb_req_control request;
1617 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1619 TALLOC_FREE(subreq);
1624 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1625 state->count, err_list,
1628 LOG("control FREEZE_DB failed for db %s on node %u,"
1629 " ret=%d\n", state->db_name, pnn, ret2);
1631 LOG("control FREEZE_DB failed for db %s, ret=%d\n",
1632 state->db_name, ret);
1634 tevent_req_error(req, ret);
1638 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1639 subreq = ctdb_client_control_multi_send(state, state->ev,
1641 state->pnn_list, state->count,
1642 TIMEOUT(), &request);
1643 if (tevent_req_nomem(subreq, req)) {
1646 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1649 static void recover_db_transaction_started(struct tevent_req *subreq)
1651 struct tevent_req *req = tevent_req_callback_data(
1652 subreq, struct tevent_req);
1653 struct recover_db_state *state = tevent_req_data(
1654 req, struct recover_db_state);
1659 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1661 TALLOC_FREE(subreq);
1666 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1670 LOG("control TRANSACTION_DB failed for db=%s on node %u,"
1671 " ret=%d\n", state->db_name, pnn, ret2);
1673 LOG("control TRANSACTION_DB failed for db=%s,"
1674 " ret=%d\n", state->db_name, ret);
1676 tevent_req_error(req, ret);
1680 state->recdb = recdb_create(state, state->db_id, state->db_name,
1682 state->tun_list->database_hash_size,
1684 if (tevent_req_nomem(state->recdb, req)) {
1688 if (state->persistent) {
1689 subreq = collect_highseqnum_db_send(
1690 state, state->ev, state->client,
1691 state->pnn_list, state->count, state->caps,
1692 state->ban_credits, state->db_id,
1695 subreq = collect_all_db_send(
1696 state, state->ev, state->client,
1697 state->pnn_list, state->count, state->caps,
1698 state->ban_credits, state->db_id,
1701 if (tevent_req_nomem(subreq, req)) {
1704 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1707 static void recover_db_collect_done(struct tevent_req *subreq)
1709 struct tevent_req *req = tevent_req_callback_data(
1710 subreq, struct tevent_req);
1711 struct recover_db_state *state = tevent_req_data(
1712 req, struct recover_db_state);
1713 struct ctdb_req_control request;
1717 if (state->persistent) {
1718 status = collect_highseqnum_db_recv(subreq, &ret);
1720 status = collect_all_db_recv(subreq, &ret);
1722 TALLOC_FREE(subreq);
1724 tevent_req_error(req, ret);
1728 ctdb_req_control_wipe_database(&request, &state->transdb);
1729 subreq = ctdb_client_control_multi_send(state, state->ev,
1731 state->pnn_list, state->count,
1732 TIMEOUT(), &request);
1733 if (tevent_req_nomem(subreq, req)) {
1736 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1739 static void recover_db_wipedb_done(struct tevent_req *subreq)
1741 struct tevent_req *req = tevent_req_callback_data(
1742 subreq, struct tevent_req);
1743 struct recover_db_state *state = tevent_req_data(
1744 req, struct recover_db_state);
1749 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1751 TALLOC_FREE(subreq);
1756 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1760 LOG("control WIPEDB failed for db %s on node %u,"
1761 " ret=%d\n", state->db_name, pnn, ret2);
1763 LOG("control WIPEDB failed for db %s, ret=%d\n",
1764 state->db_name, ret);
1766 tevent_req_error(req, ret);
1770 subreq = push_database_send(state, state->ev, state->client,
1771 state->pnn_list, state->count,
1772 state->caps, state->tun_list,
1774 if (tevent_req_nomem(subreq, req)) {
1777 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1780 static void recover_db_pushdb_done(struct tevent_req *subreq)
1782 struct tevent_req *req = tevent_req_callback_data(
1783 subreq, struct tevent_req);
1784 struct recover_db_state *state = tevent_req_data(
1785 req, struct recover_db_state);
1786 struct ctdb_req_control request;
1790 status = push_database_recv(subreq, &ret);
1791 TALLOC_FREE(subreq);
1793 tevent_req_error(req, ret);
1797 TALLOC_FREE(state->recdb);
1799 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1800 subreq = ctdb_client_control_multi_send(state, state->ev,
1802 state->pnn_list, state->count,
1803 TIMEOUT(), &request);
1804 if (tevent_req_nomem(subreq, req)) {
1807 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1810 static void recover_db_transaction_committed(struct tevent_req *subreq)
1812 struct tevent_req *req = tevent_req_callback_data(
1813 subreq, struct tevent_req);
1814 struct recover_db_state *state = tevent_req_data(
1815 req, struct recover_db_state);
1816 struct ctdb_req_control request;
1821 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1823 TALLOC_FREE(subreq);
1828 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1832 LOG("control DB_TRANSACTION_COMMIT failed for db %s"
1833 " on node %u, ret=%d\n", state->db_name, pnn, ret2);
1835 LOG("control DB_TRANSACTION_COMMIT failed for db %s,"
1836 " ret=%d\n", state->db_name, ret);
1838 tevent_req_error(req, ret);
1842 ctdb_req_control_db_thaw(&request, state->db_id);
1843 subreq = ctdb_client_control_multi_send(state, state->ev,
1845 state->pnn_list, state->count,
1846 TIMEOUT(), &request);
1847 if (tevent_req_nomem(subreq, req)) {
1850 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1853 static void recover_db_thaw_done(struct tevent_req *subreq)
1855 struct tevent_req *req = tevent_req_callback_data(
1856 subreq, struct tevent_req);
1857 struct recover_db_state *state = tevent_req_data(
1858 req, struct recover_db_state);
1863 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1865 TALLOC_FREE(subreq);
1870 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1874 LOG("control DB_THAW failed for db %s on node %u,"
1875 " ret=%d\n", state->db_name, pnn, ret2);
1877 LOG("control DB_THAW failed for db %s, ret=%d\n",
1878 state->db_name, ret);
1880 tevent_req_error(req, ret);
1884 tevent_req_done(req);
1887 static bool recover_db_recv(struct tevent_req *req)
1889 return generic_recv(req, NULL);
1894 * Start database recovery for each database
1896 * Try to recover each database 5 times before failing recovery.
1899 struct db_recovery_state {
1900 struct tevent_context *ev;
1901 struct ctdb_dbid_map *dbmap;
1906 struct db_recovery_one_state {
1907 struct tevent_req *req;
1908 struct ctdb_client_context *client;
1909 struct ctdb_dbid_map *dbmap;
1910 struct ctdb_tunable_list *tun_list;
1914 uint32_t *ban_credits;
1915 uint32_t generation;
1921 static void db_recovery_one_done(struct tevent_req *subreq);
1923 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1924 struct tevent_context *ev,
1925 struct ctdb_client_context *client,
1926 struct ctdb_dbid_map *dbmap,
1927 struct ctdb_tunable_list *tun_list,
1928 uint32_t *pnn_list, int count,
1930 uint32_t *ban_credits,
1931 uint32_t generation)
1933 struct tevent_req *req, *subreq;
1934 struct db_recovery_state *state;
1937 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1943 state->dbmap = dbmap;
1944 state->num_replies = 0;
1945 state->num_failed = 0;
1947 if (dbmap->num == 0) {
1948 tevent_req_done(req);
1949 return tevent_req_post(req, ev);
1952 for (i=0; i<dbmap->num; i++) {
1953 struct db_recovery_one_state *substate;
1955 substate = talloc_zero(state, struct db_recovery_one_state);
1956 if (tevent_req_nomem(substate, req)) {
1957 return tevent_req_post(req, ev);
1960 substate->req = req;
1961 substate->client = client;
1962 substate->dbmap = dbmap;
1963 substate->tun_list = tun_list;
1964 substate->pnn_list = pnn_list;
1965 substate->count = count;
1966 substate->caps = caps;
1967 substate->ban_credits = ban_credits;
1968 substate->generation = generation;
1969 substate->db_id = dbmap->dbs[i].db_id;
1970 substate->persistent = dbmap->dbs[i].flags &
1971 CTDB_DB_FLAGS_PERSISTENT;
1973 subreq = recover_db_send(state, ev, client, tun_list,
1974 pnn_list, count, caps, ban_credits,
1975 generation, substate->db_id,
1976 substate->persistent);
1977 if (tevent_req_nomem(subreq, req)) {
1978 return tevent_req_post(req, ev);
1980 tevent_req_set_callback(subreq, db_recovery_one_done,
1982 LOG("recover database 0x%08x\n", substate->db_id);
1988 static void db_recovery_one_done(struct tevent_req *subreq)
1990 struct db_recovery_one_state *substate = tevent_req_callback_data(
1991 subreq, struct db_recovery_one_state);
1992 struct tevent_req *req = substate->req;
1993 struct db_recovery_state *state = tevent_req_data(
1994 req, struct db_recovery_state);
1997 status = recover_db_recv(subreq);
1998 TALLOC_FREE(subreq);
2001 talloc_free(substate);
2005 substate->num_fails += 1;
2006 if (substate->num_fails < NUM_RETRIES) {
2007 subreq = recover_db_send(state, state->ev, substate->client,
2009 substate->pnn_list, substate->count,
2010 substate->caps, substate->ban_credits,
2011 substate->generation, substate->db_id,
2012 substate->persistent);
2013 if (tevent_req_nomem(subreq, req)) {
2016 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2017 LOG("recover database 0x%08x, attempt %d\n", substate->db_id,
2018 substate->num_fails+1);
2023 state->num_failed += 1;
2026 state->num_replies += 1;
2028 if (state->num_replies == state->dbmap->num) {
2029 tevent_req_done(req);
2033 static bool db_recovery_recv(struct tevent_req *req, int *count)
2035 struct db_recovery_state *state = tevent_req_data(
2036 req, struct db_recovery_state);
2039 if (tevent_req_is_unix_error(req, &err)) {
2044 *count = state->num_replies - state->num_failed;
2046 if (state->num_failed > 0) {
2055 * Run the parallel database recovery
2060 * - Get capabilities from all nodes
2062 * - Set RECOVERY_ACTIVE
2063 * - Send START_RECOVERY
2064 * - Update vnnmap on all nodes
2065 * - Run database recovery
2066 * - Set RECOVERY_NORMAL
2067 * - Send END_RECOVERY
2070 struct recovery_state {
2071 struct tevent_context *ev;
2072 struct ctdb_client_context *client;
2073 uint32_t generation;
2077 struct ctdb_node_map *nodemap;
2079 uint32_t *ban_credits;
2080 struct ctdb_tunable_list *tun_list;
2081 struct ctdb_vnn_map *vnnmap;
2082 struct ctdb_dbid_map *dbmap;
2085 static void recovery_tunables_done(struct tevent_req *subreq);
2086 static void recovery_nodemap_done(struct tevent_req *subreq);
2087 static void recovery_vnnmap_done(struct tevent_req *subreq);
2088 static void recovery_capabilities_done(struct tevent_req *subreq);
2089 static void recovery_dbmap_done(struct tevent_req *subreq);
2090 static void recovery_active_done(struct tevent_req *subreq);
2091 static void recovery_start_recovery_done(struct tevent_req *subreq);
2092 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2093 static void recovery_db_recovery_done(struct tevent_req *subreq);
2094 static void recovery_failed_done(struct tevent_req *subreq);
2095 static void recovery_normal_done(struct tevent_req *subreq);
2096 static void recovery_end_recovery_done(struct tevent_req *subreq);
2098 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2099 struct tevent_context *ev,
2100 struct ctdb_client_context *client,
2101 uint32_t generation)
2103 struct tevent_req *req, *subreq;
2104 struct recovery_state *state;
2105 struct ctdb_req_control request;
2107 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2113 state->client = client;
2114 state->generation = generation;
2115 state->destnode = ctdb_client_pnn(client);
2117 ctdb_req_control_get_all_tunables(&request);
2118 subreq = ctdb_client_control_send(state, state->ev, state->client,
2119 state->destnode, TIMEOUT(),
2121 if (tevent_req_nomem(subreq, req)) {
2122 return tevent_req_post(req, ev);
2124 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2129 static void recovery_tunables_done(struct tevent_req *subreq)
2131 struct tevent_req *req = tevent_req_callback_data(
2132 subreq, struct tevent_req);
2133 struct recovery_state *state = tevent_req_data(
2134 req, struct recovery_state);
2135 struct ctdb_reply_control *reply;
2136 struct ctdb_req_control request;
2140 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2141 TALLOC_FREE(subreq);
2143 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2144 tevent_req_error(req, ret);
2148 ret = ctdb_reply_control_get_all_tunables(reply, state,
2151 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2152 tevent_req_error(req, EPROTO);
2158 recover_timeout = state->tun_list->recover_timeout;
2160 ctdb_req_control_get_nodemap(&request);
2161 subreq = ctdb_client_control_send(state, state->ev, state->client,
2162 state->destnode, TIMEOUT(),
2164 if (tevent_req_nomem(subreq, req)) {
2167 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2170 static void recovery_nodemap_done(struct tevent_req *subreq)
2172 struct tevent_req *req = tevent_req_callback_data(
2173 subreq, struct tevent_req);
2174 struct recovery_state *state = tevent_req_data(
2175 req, struct recovery_state);
2176 struct ctdb_reply_control *reply;
2177 struct ctdb_req_control request;
2181 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2182 TALLOC_FREE(subreq);
2184 LOG("control GET_NODEMAP failed to node %u, ret=%d\n",
2185 state->destnode, ret);
2186 tevent_req_error(req, ret);
2190 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2192 LOG("control GET_NODEMAP failed, ret=%d\n", ret);
2193 tevent_req_error(req, ret);
2197 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2198 state, &state->pnn_list);
2199 if (state->count <= 0) {
2200 tevent_req_error(req, ENOMEM);
2204 state->ban_credits = talloc_zero_array(state, uint32_t,
2205 state->nodemap->num);
2206 if (tevent_req_nomem(state->ban_credits, req)) {
2210 ctdb_req_control_getvnnmap(&request);
2211 subreq = ctdb_client_control_send(state, state->ev, state->client,
2212 state->destnode, TIMEOUT(),
2214 if (tevent_req_nomem(subreq, req)) {
2217 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2220 static void recovery_vnnmap_done(struct tevent_req *subreq)
2222 struct tevent_req *req = tevent_req_callback_data(
2223 subreq, struct tevent_req);
2224 struct recovery_state *state = tevent_req_data(
2225 req, struct recovery_state);
2226 struct ctdb_reply_control *reply;
2227 struct ctdb_req_control request;
2231 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2232 TALLOC_FREE(subreq);
2234 LOG("control GETVNNMAP failed to node %u, ret=%d\n",
2235 state->destnode, ret);
2236 tevent_req_error(req, ret);
2240 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2242 LOG("control GETVNNMAP failed, ret=%d\n", ret);
2243 tevent_req_error(req, ret);
2247 ctdb_req_control_get_capabilities(&request);
2248 subreq = ctdb_client_control_multi_send(state, state->ev,
2250 state->pnn_list, state->count,
2251 TIMEOUT(), &request);
2252 if (tevent_req_nomem(subreq, req)) {
2255 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2258 static void recovery_capabilities_done(struct tevent_req *subreq)
2260 struct tevent_req *req = tevent_req_callback_data(
2261 subreq, struct tevent_req);
2262 struct recovery_state *state = tevent_req_data(
2263 req, struct recovery_state);
2264 struct ctdb_reply_control **reply;
2265 struct ctdb_req_control request;
2270 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2272 TALLOC_FREE(subreq);
2277 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2281 LOG("control GET_CAPABILITIES failed on node %u,"
2282 " ret=%d\n", pnn, ret2);
2284 LOG("control GET_CAPABILITIES failed, ret=%d\n", ret);
2286 tevent_req_error(req, ret);
2290 /* Make the array size same as nodemap */
2291 state->caps = talloc_zero_array(state, uint32_t,
2292 state->nodemap->num);
2293 if (tevent_req_nomem(state->caps, req)) {
2297 for (i=0; i<state->count; i++) {
2300 pnn = state->pnn_list[i];
2301 ret = ctdb_reply_control_get_capabilities(reply[i],
2304 LOG("control GET_CAPABILITIES failed on node %u\n", pnn);
2305 tevent_req_error(req, EPROTO);
2312 ctdb_req_control_get_dbmap(&request);
2313 subreq = ctdb_client_control_send(state, state->ev, state->client,
2314 state->destnode, TIMEOUT(),
2316 if (tevent_req_nomem(subreq, req)) {
2319 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2322 static void recovery_dbmap_done(struct tevent_req *subreq)
2324 struct tevent_req *req = tevent_req_callback_data(
2325 subreq, struct tevent_req);
2326 struct recovery_state *state = tevent_req_data(
2327 req, struct recovery_state);
2328 struct ctdb_reply_control *reply;
2329 struct ctdb_req_control request;
2333 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2334 TALLOC_FREE(subreq);
2336 LOG("control GET_DBMAP failed to node %u, ret=%d\n",
2337 state->destnode, ret);
2338 tevent_req_error(req, ret);
2342 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2344 LOG("control GET_DBMAP failed, ret=%d\n", ret);
2345 tevent_req_error(req, ret);
2349 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2350 subreq = ctdb_client_control_multi_send(state, state->ev,
2352 state->pnn_list, state->count,
2353 TIMEOUT(), &request);
2354 if (tevent_req_nomem(subreq, req)) {
2357 tevent_req_set_callback(subreq, recovery_active_done, req);
2360 static void recovery_active_done(struct tevent_req *subreq)
2362 struct tevent_req *req = tevent_req_callback_data(
2363 subreq, struct tevent_req);
2364 struct recovery_state *state = tevent_req_data(
2365 req, struct recovery_state);
2366 struct ctdb_req_control request;
2367 struct ctdb_vnn_map *vnnmap;
2372 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2374 TALLOC_FREE(subreq);
2379 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2383 LOG("failed to set recovery mode to ACTIVE on node %u,"
2384 " ret=%d\n", pnn, ret2);
2386 LOG("failed to set recovery mode to ACTIVE, ret=%d\n",
2389 tevent_req_error(req, ret);
2393 LOG("set recovery mode to ACTIVE\n");
2395 /* Calculate new VNNMAP */
2397 for (i=0; i<state->nodemap->num; i++) {
2398 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2401 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2408 LOG("no active lmasters found. Adding recmaster anyway\n");
2411 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2412 if (tevent_req_nomem(vnnmap, req)) {
2416 vnnmap->size = (count == 0 ? 1 : count);
2417 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2418 if (tevent_req_nomem(vnnmap->map, req)) {
2423 vnnmap->map[0] = state->destnode;
2426 for (i=0; i<state->nodemap->num; i++) {
2427 if (state->nodemap->node[i].flags &
2428 NODE_FLAGS_INACTIVE) {
2431 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2435 vnnmap->map[count] = state->nodemap->node[i].pnn;
2440 vnnmap->generation = state->generation;
2442 talloc_free(state->vnnmap);
2443 state->vnnmap = vnnmap;
2445 ctdb_req_control_start_recovery(&request);
2446 subreq = ctdb_client_control_multi_send(state, state->ev,
2448 state->pnn_list, state->count,
2449 TIMEOUT(), &request);
2450 if (tevent_req_nomem(subreq, req)) {
2453 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2456 static void recovery_start_recovery_done(struct tevent_req *subreq)
2458 struct tevent_req *req = tevent_req_callback_data(
2459 subreq, struct tevent_req);
2460 struct recovery_state *state = tevent_req_data(
2461 req, struct recovery_state);
2462 struct ctdb_req_control request;
2467 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2469 TALLOC_FREE(subreq);
2474 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2478 LOG("failed to run start_recovery event on node %u,"
2479 " ret=%d\n", pnn, ret2);
2481 LOG("failed to run start_recovery event, ret=%d\n",
2484 tevent_req_error(req, ret);
2488 LOG("start_recovery event finished\n");
2490 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2491 subreq = ctdb_client_control_multi_send(state, state->ev,
2493 state->pnn_list, state->count,
2494 TIMEOUT(), &request);
2495 if (tevent_req_nomem(subreq, req)) {
2498 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2501 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2503 struct tevent_req *req = tevent_req_callback_data(
2504 subreq, struct tevent_req);
2505 struct recovery_state *state = tevent_req_data(
2506 req, struct recovery_state);
2511 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2513 TALLOC_FREE(subreq);
2518 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2522 LOG("failed to update VNNMAP on node %u, ret=%d\n",
2525 LOG("failed to update VNNMAP, ret=%d\n", ret);
2527 tevent_req_error(req, ret);
2531 LOG("updated VNNMAP\n");
2533 subreq = db_recovery_send(state, state->ev, state->client,
2534 state->dbmap, state->tun_list,
2535 state->pnn_list, state->count,
2536 state->caps, state->ban_credits,
2537 state->vnnmap->generation);
2538 if (tevent_req_nomem(subreq, req)) {
2541 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2544 static void recovery_db_recovery_done(struct tevent_req *subreq)
2546 struct tevent_req *req = tevent_req_callback_data(
2547 subreq, struct tevent_req);
2548 struct recovery_state *state = tevent_req_data(
2549 req, struct recovery_state);
2550 struct ctdb_req_control request;
2554 status = db_recovery_recv(subreq, &count);
2555 TALLOC_FREE(subreq);
2557 LOG("%d of %d databases recovered\n", count, state->dbmap->num);
2560 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2563 /* Bans are not enabled */
2564 if (state->tun_list->enable_bans == 0) {
2565 tevent_req_error(req, EIO);
2569 for (i=0; i<state->count; i++) {
2571 pnn = state->pnn_list[i];
2572 if (state->ban_credits[pnn] > max_credits) {
2574 max_credits = state->ban_credits[pnn];
2578 /* If pulling database fails multiple times */
2579 if (max_credits >= NUM_RETRIES) {
2580 struct ctdb_req_message message;
2582 LOG("Assigning banning credits to node %u\n", max_pnn);
2584 message.srvid = CTDB_SRVID_BANNING;
2585 message.data.pnn = max_pnn;
2587 subreq = ctdb_client_message_send(
2588 state, state->ev, state->client,
2589 ctdb_client_pnn(state->client),
2591 if (tevent_req_nomem(subreq, req)) {
2594 tevent_req_set_callback(subreq, recovery_failed_done,
2597 tevent_req_error(req, EIO);
2602 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2603 subreq = ctdb_client_control_multi_send(state, state->ev,
2605 state->pnn_list, state->count,
2606 TIMEOUT(), &request);
2607 if (tevent_req_nomem(subreq, req)) {
2610 tevent_req_set_callback(subreq, recovery_normal_done, req);
2613 static void recovery_failed_done(struct tevent_req *subreq)
2615 struct tevent_req *req = tevent_req_callback_data(
2616 subreq, struct tevent_req);
2620 status = ctdb_client_message_recv(subreq, &ret);
2621 TALLOC_FREE(subreq);
2623 LOG("failed to assign banning credits, ret=%d\n", ret);
2626 tevent_req_error(req, EIO);
2629 static void recovery_normal_done(struct tevent_req *subreq)
2631 struct tevent_req *req = tevent_req_callback_data(
2632 subreq, struct tevent_req);
2633 struct recovery_state *state = tevent_req_data(
2634 req, struct recovery_state);
2635 struct ctdb_req_control request;
2640 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2642 TALLOC_FREE(subreq);
2647 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2651 LOG("failed to set recovery mode to NORMAL on node %u,"
2652 " ret=%d\n", pnn, ret2);
2654 LOG("failed to set recovery mode to NORMAL, ret=%d\n",
2657 tevent_req_error(req, ret);
2661 LOG("set recovery mode to NORMAL\n");
2663 ctdb_req_control_end_recovery(&request);
2664 subreq = ctdb_client_control_multi_send(state, state->ev,
2666 state->pnn_list, state->count,
2667 TIMEOUT(), &request);
2668 if (tevent_req_nomem(subreq, req)) {
2671 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2674 static void recovery_end_recovery_done(struct tevent_req *subreq)
2676 struct tevent_req *req = tevent_req_callback_data(
2677 subreq, struct tevent_req);
2678 struct recovery_state *state = tevent_req_data(
2679 req, struct recovery_state);
2684 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2686 TALLOC_FREE(subreq);
2691 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2695 LOG("failed to run recovered event on node %u,"
2696 " ret=%d\n", pnn, ret2);
2698 LOG("failed to run recovered event, ret=%d\n", ret);
2700 tevent_req_error(req, ret);
2704 LOG("recovered event finished\n");
2706 tevent_req_done(req);
2709 static void recovery_recv(struct tevent_req *req, int *perr)
2711 generic_recv(req, perr);
2714 static void usage(const char *progname)
2716 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2722 * Arguments - log fd, write fd, socket path, generation
2724 int main(int argc, char *argv[])
2727 const char *sockpath;
2728 TALLOC_CTX *mem_ctx;
2729 struct tevent_context *ev;
2730 struct ctdb_client_context *client;
2732 struct tevent_req *req;
2733 uint32_t generation;
2740 write_fd = atoi(argv[1]);
2742 generation = (uint32_t)strtoul(argv[3], NULL, 0);
2744 mem_ctx = talloc_new(NULL);
2745 if (mem_ctx == NULL) {
2746 fprintf(stderr, "recovery: talloc_new() failed\n");
2750 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2752 fprintf(stderr, "recovery: Unable to initialize logging\n");
2756 ev = tevent_context_init(mem_ctx);
2758 LOG("tevent_context_init() failed\n");
2762 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2764 LOG("ctdb_client_init() failed, ret=%d\n", ret);
2768 req = recovery_send(mem_ctx, ev, client, generation);
2770 LOG("database_recover_send() failed\n");
2774 if (! tevent_req_poll(req, ev)) {
2775 LOG("tevent_req_poll() failed\n");
2779 recovery_recv(req, &ret);
2782 LOG("database recovery failed, ret=%d\n", ret);
2786 sys_write(write_fd, &ret, sizeof(ret));
2790 TALLOC_FREE(mem_ctx);