2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
35 #include "lib/util/smb_strtox.h"
37 #include "protocol/protocol.h"
38 #include "protocol/protocol_api.h"
39 #include "client/client.h"
41 #include "common/logging.h"
43 static int recover_timeout = 30;
47 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
53 static bool generic_recv(struct tevent_req *req, int *perr)
57 if (tevent_req_is_unix_error(req, &err)) {
67 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
69 static uint64_t srvid_next(void)
76 * Node related functions
82 uint32_t *ban_credits;
87 static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
89 struct node_list *nlist;
92 nlist = talloc_zero(mem_ctx, struct node_list);
97 nlist->pnn_list = talloc_array(nlist, uint32_t, size);
98 nlist->caps = talloc_zero_array(nlist, uint32_t, size);
99 nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
101 if (nlist->pnn_list == NULL ||
102 nlist->caps == NULL ||
103 nlist->ban_credits == NULL) {
109 for (i=0; i<nlist->size; i++) {
110 nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
116 static bool node_list_add(struct node_list *nlist, uint32_t pnn)
120 if (nlist->count == nlist->size) {
124 for (i=0; i<nlist->count; i++) {
125 if (nlist->pnn_list[i] == pnn) {
130 nlist->pnn_list[nlist->count] = pnn;
136 static uint32_t *node_list_lmaster(struct node_list *nlist,
138 unsigned int *pnn_count)
141 unsigned int count, i;
143 pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
144 if (pnn_list == NULL) {
149 for (i=0; i<nlist->count; i++) {
150 if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
154 pnn_list[count] = nlist->pnn_list[i];
162 static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
166 for (i=0; i<nlist->count; i++) {
167 if (nlist->pnn_list[i] == pnn) {
168 nlist->ban_credits[i] += 1;
175 * Database list functions
177 * Simple, naive implementation that could be updated to a db_hash or similar
181 struct db *prev, *next;
186 unsigned int num_nodes;
190 unsigned int num_dbs;
192 unsigned int num_nodes;
195 static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
199 l = talloc_zero(mem_ctx, struct db_list);
200 l->num_nodes = num_nodes;
205 static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
209 if (dblist == NULL) {
214 while (db != NULL && db->db_id != db_id) {
221 static int db_list_add(struct db_list *dblist,
226 struct db *db = NULL;
228 if (dblist == NULL) {
232 db = talloc_zero(dblist, struct db);
238 db->db_flags = db_flags;
239 db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
240 if (db->pnn_list == NULL) {
244 db->pnn_list[0] = node;
247 DLIST_ADD_END(dblist->db, db);
253 static int db_list_check_and_add(struct db_list *dblist,
258 struct db *db = NULL;
262 * These flags are masked out because they are only set on a
263 * node when a client attaches to that node, so they might not
264 * be set yet. They can't be passed as part of the attch, so
265 * they're no use here.
267 db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
269 if (dblist == NULL) {
273 db = db_list_find(dblist, db_id);
275 ret = db_list_add(dblist, db_id, db_flags, node);
279 if (db->db_flags != db_flags) {
280 D_ERR("Incompatible database flags for 0x%"PRIx32" "
281 "(0x%"PRIx32" != 0x%"PRIx32")\n",
288 if (db->num_nodes >= dblist->num_nodes) {
292 db->pnn_list[db->num_nodes] = node;
299 * Create database on nodes where it is missing
302 struct db_create_missing_state {
303 struct tevent_context *ev;
304 struct ctdb_client_context *client;
306 struct node_list *nlist;
309 uint32_t *missing_pnn_list;
310 int missing_num_nodes;
313 static void db_create_missing_done(struct tevent_req *subreq);
315 static struct tevent_req *db_create_missing_send(
317 struct tevent_context *ev,
318 struct ctdb_client_context *client,
319 struct node_list *nlist,
323 struct tevent_req *req, *subreq;
324 struct db_create_missing_state *state;
325 struct ctdb_req_control request;
328 req = tevent_req_create(mem_ctx,
330 struct db_create_missing_state);
336 state->client = client;
337 state->nlist = nlist;
338 state->db_name = db_name;
340 if (nlist->count == db->num_nodes) {
341 tevent_req_done(req);
342 return tevent_req_post(req, ev);
345 state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
346 if (tevent_req_nomem(state->missing_pnn_list, req)) {
347 return tevent_req_post(req, ev);
350 for (i = 0; i < nlist->count; i++) {
351 uint32_t pnn = nlist->pnn_list[i] ;
353 for (j = 0; j < db->num_nodes; j++) {
354 if (pnn == db->pnn_list[j]) {
359 if (j < db->num_nodes) {
363 DBG_INFO("Create database %s on node %u\n",
366 state->missing_pnn_list[state->missing_num_nodes] = pnn;
367 state->missing_num_nodes++;
370 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
371 ctdb_req_control_db_attach_persistent(&request, db_name);
372 } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
373 ctdb_req_control_db_attach_replicated(&request, db_name);
375 ctdb_req_control_db_attach(&request, db_name);
377 request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
378 subreq = ctdb_client_control_multi_send(state,
381 state->missing_pnn_list,
382 state->missing_num_nodes,
385 if (tevent_req_nomem(subreq, req)) {
386 return tevent_req_post(req, ev);
388 tevent_req_set_callback(subreq, db_create_missing_done, req);
393 static void db_create_missing_done(struct tevent_req *subreq)
395 struct tevent_req *req = tevent_req_callback_data(
396 subreq, struct tevent_req);
397 struct db_create_missing_state *state = tevent_req_data(
398 req, struct db_create_missing_state);
403 status = ctdb_client_control_multi_recv(subreq,
413 ret2 = ctdb_client_control_multi_error(
414 state->missing_pnn_list,
415 state->missing_num_nodes,
419 D_ERR("control DB_ATTACH failed for db %s"
420 " on node %u, ret=%d\n",
424 node_list_ban_credits(state->nlist, pnn);
426 D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
430 tevent_req_error(req, ret);
434 tevent_req_done(req);
437 static bool db_create_missing_recv(struct tevent_req *req, int *perr)
439 return generic_recv(req, perr);
443 * Recovery database functions
446 struct recdb_context {
454 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
457 uint32_t hash_size, bool persistent)
459 static char *db_dir_state = NULL;
460 struct recdb_context *recdb;
461 unsigned int tdb_flags;
463 recdb = talloc(mem_ctx, struct recdb_context);
468 if (db_dir_state == NULL) {
469 db_dir_state = getenv("CTDB_DBDIR_STATE");
472 recdb->db_name = db_name;
473 recdb->db_id = db_id;
474 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
475 db_dir_state != NULL ?
477 dirname(discard_const(db_path)),
479 if (recdb->db_path == NULL) {
483 unlink(recdb->db_path);
485 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
486 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
487 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
488 if (recdb->db == NULL) {
490 D_ERR("failed to create recovery db %s\n", recdb->db_path);
494 recdb->persistent = persistent;
499 static uint32_t recdb_id(struct recdb_context *recdb)
504 static const char *recdb_name(struct recdb_context *recdb)
506 return recdb->db_name;
509 static const char *recdb_path(struct recdb_context *recdb)
511 return recdb->db_path;
514 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
516 return recdb->db->tdb;
519 static bool recdb_persistent(struct recdb_context *recdb)
521 return recdb->persistent;
524 struct recdb_add_traverse_state {
525 struct recdb_context *recdb;
529 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
530 TDB_DATA key, TDB_DATA data,
533 struct recdb_add_traverse_state *state =
534 (struct recdb_add_traverse_state *)private_data;
535 struct ctdb_ltdb_header *hdr;
539 /* header is not marshalled separately in the pulldb control */
540 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
544 hdr = (struct ctdb_ltdb_header *)data.dptr;
546 /* fetch the existing record, if any */
547 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
549 if (prev_data.dptr != NULL) {
550 struct ctdb_ltdb_header prev_hdr;
552 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
553 free(prev_data.dptr);
554 if (hdr->rsn < prev_hdr.rsn ||
555 (hdr->rsn == prev_hdr.rsn &&
556 prev_hdr.dmaster != state->mypnn)) {
561 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
568 static bool recdb_add(struct recdb_context *recdb, int mypnn,
569 struct ctdb_rec_buffer *recbuf)
571 struct recdb_add_traverse_state state;
577 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
585 /* This function decides which records from recdb are retained */
586 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
587 uint32_t reqid, uint32_t dmaster,
588 TDB_DATA key, TDB_DATA data)
590 struct ctdb_ltdb_header *header;
593 /* Skip empty records */
594 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
598 /* update the dmaster field to point to us */
599 header = (struct ctdb_ltdb_header *)data.dptr;
601 header->dmaster = dmaster;
602 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
605 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
613 struct recdb_records_traverse_state {
614 struct ctdb_rec_buffer *recbuf;
621 static int recdb_records_traverse(struct tdb_context *tdb,
622 TDB_DATA key, TDB_DATA data,
625 struct recdb_records_traverse_state *state =
626 (struct recdb_records_traverse_state *)private_data;
629 ret = recbuf_filter_add(state->recbuf, state->persistent,
630 state->reqid, state->dmaster, key, data);
632 state->failed = true;
639 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
643 struct recdb_records_traverse_state state;
646 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
647 if (state.recbuf == NULL) {
650 state.dmaster = dmaster;
652 state.persistent = recdb_persistent(recdb);
653 state.failed = false;
655 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
657 if (ret == -1 || state.failed) {
658 D_ERR("Failed to marshall recovery records for %s\n",
660 TALLOC_FREE(state.recbuf);
667 struct recdb_file_traverse_state {
668 struct ctdb_rec_buffer *recbuf;
669 struct recdb_context *recdb;
677 unsigned int num_buffers;
680 static int recdb_file_traverse(struct tdb_context *tdb,
681 TDB_DATA key, TDB_DATA data,
684 struct recdb_file_traverse_state *state =
685 (struct recdb_file_traverse_state *)private_data;
688 ret = recbuf_filter_add(state->recbuf, state->persistent,
689 state->reqid, state->dmaster, key, data);
691 state->failed = true;
695 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
696 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
698 D_ERR("Failed to collect recovery records for %s\n",
699 recdb_name(state->recdb));
700 state->failed = true;
704 state->num_buffers += 1;
706 TALLOC_FREE(state->recbuf);
707 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
708 recdb_id(state->recdb));
709 if (state->recbuf == NULL) {
710 state->failed = true;
718 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
719 uint32_t dmaster, int fd, int max_size)
721 struct recdb_file_traverse_state state;
724 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
725 if (state.recbuf == NULL) {
729 state.mem_ctx = mem_ctx;
730 state.dmaster = dmaster;
732 state.persistent = recdb_persistent(recdb);
733 state.failed = false;
735 state.max_size = max_size;
736 state.num_buffers = 0;
738 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
739 if (ret == -1 || state.failed) {
740 TALLOC_FREE(state.recbuf);
744 ret = ctdb_rec_buffer_write(state.recbuf, fd);
746 D_ERR("Failed to collect recovery records for %s\n",
748 TALLOC_FREE(state.recbuf);
751 state.num_buffers += 1;
753 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
754 state.num_buffers, recdb_name(recdb));
756 return state.num_buffers;
760 * Pull database from a single node
763 struct pull_database_state {
764 struct tevent_context *ev;
765 struct ctdb_client_context *client;
766 struct recdb_context *recdb;
769 unsigned int num_records;
773 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
775 static void pull_database_register_done(struct tevent_req *subreq);
776 static void pull_database_old_done(struct tevent_req *subreq);
777 static void pull_database_unregister_done(struct tevent_req *subreq);
778 static void pull_database_new_done(struct tevent_req *subreq);
780 static struct tevent_req *pull_database_send(
782 struct tevent_context *ev,
783 struct ctdb_client_context *client,
784 uint32_t pnn, uint32_t caps,
785 struct recdb_context *recdb)
787 struct tevent_req *req, *subreq;
788 struct pull_database_state *state;
789 struct ctdb_req_control request;
791 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
797 state->client = client;
798 state->recdb = recdb;
800 state->srvid = srvid_next();
802 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
803 subreq = ctdb_client_set_message_handler_send(
804 state, state->ev, state->client,
805 state->srvid, pull_database_handler,
807 if (tevent_req_nomem(subreq, req)) {
808 return tevent_req_post(req, ev);
811 tevent_req_set_callback(subreq, pull_database_register_done,
815 struct ctdb_pulldb pulldb;
817 pulldb.db_id = recdb_id(recdb);
818 pulldb.lmaster = CTDB_LMASTER_ANY;
820 ctdb_req_control_pull_db(&request, &pulldb);
821 subreq = ctdb_client_control_send(state, state->ev,
825 if (tevent_req_nomem(subreq, req)) {
826 return tevent_req_post(req, ev);
828 tevent_req_set_callback(subreq, pull_database_old_done, req);
834 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
837 struct tevent_req *req = talloc_get_type_abort(
838 private_data, struct tevent_req);
839 struct pull_database_state *state = tevent_req_data(
840 req, struct pull_database_state);
841 struct ctdb_rec_buffer *recbuf;
846 if (srvid != state->srvid) {
850 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
852 D_ERR("Invalid data received for DB_PULL messages\n");
856 if (recbuf->db_id != recdb_id(state->recdb)) {
858 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
859 recbuf->db_id, recdb_name(state->recdb));
863 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
867 D_ERR("Failed to add records to recdb for %s\n",
868 recdb_name(state->recdb));
872 state->num_records += recbuf->count;
876 static void pull_database_register_done(struct tevent_req *subreq)
878 struct tevent_req *req = tevent_req_callback_data(
879 subreq, struct tevent_req);
880 struct pull_database_state *state = tevent_req_data(
881 req, struct pull_database_state);
882 struct ctdb_req_control request;
883 struct ctdb_pulldb_ext pulldb_ext;
887 status = ctdb_client_set_message_handler_recv(subreq, &ret);
890 D_ERR("Failed to set message handler for DB_PULL for %s\n",
891 recdb_name(state->recdb));
892 tevent_req_error(req, ret);
896 pulldb_ext.db_id = recdb_id(state->recdb);
897 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
898 pulldb_ext.srvid = state->srvid;
900 ctdb_req_control_db_pull(&request, &pulldb_ext);
901 subreq = ctdb_client_control_send(state, state->ev, state->client,
902 state->pnn, TIMEOUT(), &request);
903 if (tevent_req_nomem(subreq, req)) {
906 tevent_req_set_callback(subreq, pull_database_new_done, req);
909 static void pull_database_old_done(struct tevent_req *subreq)
911 struct tevent_req *req = tevent_req_callback_data(
912 subreq, struct tevent_req);
913 struct pull_database_state *state = tevent_req_data(
914 req, struct pull_database_state);
915 struct ctdb_reply_control *reply;
916 struct ctdb_rec_buffer *recbuf;
920 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
923 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
924 recdb_name(state->recdb), state->pnn, ret);
925 tevent_req_error(req, ret);
929 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
932 tevent_req_error(req, ret);
936 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
940 tevent_req_error(req, EIO);
944 state->num_records = recbuf->count;
947 D_INFO("Pulled %d records for db %s from node %d\n",
948 state->num_records, recdb_name(state->recdb), state->pnn);
950 tevent_req_done(req);
953 static void pull_database_new_done(struct tevent_req *subreq)
955 struct tevent_req *req = tevent_req_callback_data(
956 subreq, struct tevent_req);
957 struct pull_database_state *state = tevent_req_data(
958 req, struct pull_database_state);
959 struct ctdb_reply_control *reply;
960 uint32_t num_records;
964 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
967 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
968 recdb_name(state->recdb), state->pnn, ret);
973 ret = ctdb_reply_control_db_pull(reply, &num_records);
975 if (num_records != state->num_records) {
976 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
977 num_records, state->num_records,
978 recdb_name(state->recdb));
983 D_INFO("Pulled %d records for db %s from node %d\n",
984 state->num_records, recdb_name(state->recdb), state->pnn);
988 subreq = ctdb_client_remove_message_handler_send(
989 state, state->ev, state->client,
991 if (tevent_req_nomem(subreq, req)) {
994 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
997 static void pull_database_unregister_done(struct tevent_req *subreq)
999 struct tevent_req *req = tevent_req_callback_data(
1000 subreq, struct tevent_req);
1001 struct pull_database_state *state = tevent_req_data(
1002 req, struct pull_database_state);
1006 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1007 TALLOC_FREE(subreq);
1009 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
1010 recdb_name(state->recdb));
1011 tevent_req_error(req, ret);
1015 if (state->result != 0) {
1016 tevent_req_error(req, state->result);
1020 tevent_req_done(req);
1023 static bool pull_database_recv(struct tevent_req *req, int *perr)
1025 return generic_recv(req, perr);
1029 * Push database to specified nodes (old style)
1032 struct push_database_old_state {
1033 struct tevent_context *ev;
1034 struct ctdb_client_context *client;
1035 struct recdb_context *recdb;
1038 struct ctdb_rec_buffer *recbuf;
1042 static void push_database_old_push_done(struct tevent_req *subreq);
1044 static struct tevent_req *push_database_old_send(
1045 TALLOC_CTX *mem_ctx,
1046 struct tevent_context *ev,
1047 struct ctdb_client_context *client,
1050 struct recdb_context *recdb)
1052 struct tevent_req *req, *subreq;
1053 struct push_database_old_state *state;
1054 struct ctdb_req_control request;
1057 req = tevent_req_create(mem_ctx, &state,
1058 struct push_database_old_state);
1064 state->client = client;
1065 state->recdb = recdb;
1066 state->pnn_list = pnn_list;
1067 state->count = count;
1070 state->recbuf = recdb_records(recdb, state,
1071 ctdb_client_pnn(client));
1072 if (tevent_req_nomem(state->recbuf, req)) {
1073 return tevent_req_post(req, ev);
1076 pnn = state->pnn_list[state->index];
1078 ctdb_req_control_push_db(&request, state->recbuf);
1079 subreq = ctdb_client_control_send(state, ev, client, pnn,
1080 TIMEOUT(), &request);
1081 if (tevent_req_nomem(subreq, req)) {
1082 return tevent_req_post(req, ev);
1084 tevent_req_set_callback(subreq, push_database_old_push_done, req);
1089 static void push_database_old_push_done(struct tevent_req *subreq)
1091 struct tevent_req *req = tevent_req_callback_data(
1092 subreq, struct tevent_req);
1093 struct push_database_old_state *state = tevent_req_data(
1094 req, struct push_database_old_state);
1095 struct ctdb_req_control request;
1100 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1101 TALLOC_FREE(subreq);
1103 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
1104 recdb_name(state->recdb), state->pnn_list[state->index],
1106 tevent_req_error(req, ret);
1111 if (state->index == state->count) {
1112 TALLOC_FREE(state->recbuf);
1113 tevent_req_done(req);
1117 pnn = state->pnn_list[state->index];
1119 ctdb_req_control_push_db(&request, state->recbuf);
1120 subreq = ctdb_client_control_send(state, state->ev, state->client,
1121 pnn, TIMEOUT(), &request);
1122 if (tevent_req_nomem(subreq, req)) {
1125 tevent_req_set_callback(subreq, push_database_old_push_done, req);
1128 static bool push_database_old_recv(struct tevent_req *req, int *perr)
1130 return generic_recv(req, perr);
1134 * Push database to specified nodes (new style)
1137 struct push_database_new_state {
1138 struct tevent_context *ev;
1139 struct ctdb_client_context *client;
1140 struct recdb_context *recdb;
1147 int num_buffers_sent;
1148 unsigned int num_records;
1151 static void push_database_new_started(struct tevent_req *subreq);
1152 static void push_database_new_send_msg(struct tevent_req *req);
1153 static void push_database_new_send_done(struct tevent_req *subreq);
1154 static void push_database_new_confirmed(struct tevent_req *subreq);
1156 static struct tevent_req *push_database_new_send(
1157 TALLOC_CTX *mem_ctx,
1158 struct tevent_context *ev,
1159 struct ctdb_client_context *client,
1162 struct recdb_context *recdb,
1165 struct tevent_req *req, *subreq;
1166 struct push_database_new_state *state;
1167 struct ctdb_req_control request;
1168 struct ctdb_pulldb_ext pulldb_ext;
1172 req = tevent_req_create(mem_ctx, &state,
1173 struct push_database_new_state);
1179 state->client = client;
1180 state->recdb = recdb;
1181 state->pnn_list = pnn_list;
1182 state->count = count;
1184 state->srvid = srvid_next();
1185 state->dmaster = ctdb_client_pnn(client);
1186 state->num_buffers_sent = 0;
1187 state->num_records = 0;
1189 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
1190 if (tevent_req_nomem(filename, req)) {
1191 return tevent_req_post(req, ev);
1194 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
1195 if (state->fd == -1) {
1196 tevent_req_error(req, errno);
1197 return tevent_req_post(req, ev);
1200 talloc_free(filename);
1202 state->num_buffers = recdb_file(recdb, state, state->dmaster,
1203 state->fd, max_size);
1204 if (state->num_buffers == -1) {
1205 tevent_req_error(req, ENOMEM);
1206 return tevent_req_post(req, ev);
1209 offset = lseek(state->fd, 0, SEEK_SET);
1211 tevent_req_error(req, EIO);
1212 return tevent_req_post(req, ev);
1215 pulldb_ext.db_id = recdb_id(recdb);
1216 pulldb_ext.srvid = state->srvid;
1218 ctdb_req_control_db_push_start(&request, &pulldb_ext);
1219 subreq = ctdb_client_control_multi_send(state, ev, client,
1221 TIMEOUT(), &request);
1222 if (tevent_req_nomem(subreq, req)) {
1223 return tevent_req_post(req, ev);
1225 tevent_req_set_callback(subreq, push_database_new_started, req);
1230 static void push_database_new_started(struct tevent_req *subreq)
1232 struct tevent_req *req = tevent_req_callback_data(
1233 subreq, struct tevent_req);
1234 struct push_database_new_state *state = tevent_req_data(
1235 req, struct push_database_new_state);
1240 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1242 TALLOC_FREE(subreq);
1247 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1251 D_ERR("control DB_PUSH_START failed for db %s"
1252 " on node %u, ret=%d\n",
1253 recdb_name(state->recdb), pnn, ret2);
1255 D_ERR("control DB_PUSH_START failed for db %s,"
1257 recdb_name(state->recdb), ret);
1259 talloc_free(err_list);
1261 tevent_req_error(req, ret);
1265 push_database_new_send_msg(req);
1268 static void push_database_new_send_msg(struct tevent_req *req)
1270 struct push_database_new_state *state = tevent_req_data(
1271 req, struct push_database_new_state);
1272 struct tevent_req *subreq;
1273 struct ctdb_rec_buffer *recbuf;
1274 struct ctdb_req_message message;
1279 if (state->num_buffers_sent == state->num_buffers) {
1280 struct ctdb_req_control request;
1282 ctdb_req_control_db_push_confirm(&request,
1283 recdb_id(state->recdb));
1284 subreq = ctdb_client_control_multi_send(state, state->ev,
1288 TIMEOUT(), &request);
1289 if (tevent_req_nomem(subreq, req)) {
1292 tevent_req_set_callback(subreq, push_database_new_confirmed,
1297 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
1299 tevent_req_error(req, ret);
1303 data.dsize = ctdb_rec_buffer_len(recbuf);
1304 data.dptr = talloc_size(state, data.dsize);
1305 if (tevent_req_nomem(data.dptr, req)) {
1309 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
1311 message.srvid = state->srvid;
1312 message.data.data = data;
1314 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1315 state->num_buffers_sent, recbuf->count,
1316 recdb_name(state->recdb));
1318 subreq = ctdb_client_message_multi_send(state, state->ev,
1320 state->pnn_list, state->count,
1322 if (tevent_req_nomem(subreq, req)) {
1325 tevent_req_set_callback(subreq, push_database_new_send_done, req);
1327 state->num_records += recbuf->count;
1329 talloc_free(data.dptr);
1330 talloc_free(recbuf);
1333 static void push_database_new_send_done(struct tevent_req *subreq)
1335 struct tevent_req *req = tevent_req_callback_data(
1336 subreq, struct tevent_req);
1337 struct push_database_new_state *state = tevent_req_data(
1338 req, struct push_database_new_state);
1342 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1343 TALLOC_FREE(subreq);
1345 D_ERR("Sending recovery records failed for %s\n",
1346 recdb_name(state->recdb));
1347 tevent_req_error(req, ret);
1351 state->num_buffers_sent += 1;
1353 push_database_new_send_msg(req);
1356 static void push_database_new_confirmed(struct tevent_req *subreq)
1358 struct tevent_req *req = tevent_req_callback_data(
1359 subreq, struct tevent_req);
1360 struct push_database_new_state *state = tevent_req_data(
1361 req, struct push_database_new_state);
1362 struct ctdb_reply_control **reply;
1367 uint32_t num_records;
1369 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1371 TALLOC_FREE(subreq);
1376 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1377 state->count, err_list,
1380 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1381 " on node %u, ret=%d\n",
1382 recdb_name(state->recdb), pnn, ret2);
1384 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1386 recdb_name(state->recdb), ret);
1388 tevent_req_error(req, ret);
1392 for (i=0; i<state->count; i++) {
1393 ret = ctdb_reply_control_db_push_confirm(reply[i],
1396 tevent_req_error(req, EPROTO);
1400 if (num_records != state->num_records) {
1401 D_ERR("Node %u received %d of %d records for %s\n",
1402 state->pnn_list[i], num_records,
1403 state->num_records, recdb_name(state->recdb));
1404 tevent_req_error(req, EPROTO);
1411 D_INFO("Pushed %d records for db %s\n",
1412 state->num_records, recdb_name(state->recdb));
1414 tevent_req_done(req);
1417 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1419 return generic_recv(req, perr);
1423 * wrapper for push_database_old and push_database_new
1426 struct push_database_state {
1427 bool old_done, new_done;
1430 static void push_database_old_done(struct tevent_req *subreq);
1431 static void push_database_new_done(struct tevent_req *subreq);
1433 static struct tevent_req *push_database_send(
1434 TALLOC_CTX *mem_ctx,
1435 struct tevent_context *ev,
1436 struct ctdb_client_context *client,
1437 struct node_list *nlist,
1438 struct ctdb_tunable_list *tun_list,
1439 struct recdb_context *recdb)
1441 struct tevent_req *req, *subreq;
1442 struct push_database_state *state;
1443 uint32_t *old_list, *new_list;
1444 unsigned int old_count, new_count;
1447 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1452 state->old_done = false;
1453 state->new_done = false;
1457 old_list = talloc_array(state, uint32_t, nlist->count);
1458 new_list = talloc_array(state, uint32_t, nlist->count);
1459 if (tevent_req_nomem(old_list, req) ||
1460 tevent_req_nomem(new_list,req)) {
1461 return tevent_req_post(req, ev);
1464 for (i=0; i<nlist->count; i++) {
1465 if (nlist->caps[i] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1466 new_list[new_count] = nlist->pnn_list[i];
1469 old_list[old_count] = nlist->pnn_list[i];
1474 if (old_count > 0) {
1475 subreq = push_database_old_send(state, ev, client,
1476 old_list, old_count, recdb);
1477 if (tevent_req_nomem(subreq, req)) {
1478 return tevent_req_post(req, ev);
1480 tevent_req_set_callback(subreq, push_database_old_done, req);
1482 state->old_done = true;
1485 if (new_count > 0) {
1486 subreq = push_database_new_send(state, ev, client,
1487 new_list, new_count, recdb,
1488 tun_list->rec_buffer_size_limit);
1489 if (tevent_req_nomem(subreq, req)) {
1490 return tevent_req_post(req, ev);
1492 tevent_req_set_callback(subreq, push_database_new_done, req);
1494 state->new_done = true;
1500 static void push_database_old_done(struct tevent_req *subreq)
1502 struct tevent_req *req = tevent_req_callback_data(
1503 subreq, struct tevent_req);
1504 struct push_database_state *state = tevent_req_data(
1505 req, struct push_database_state);
1509 status = push_database_old_recv(subreq, &ret);
1511 tevent_req_error(req, ret);
1515 state->old_done = true;
1517 if (state->old_done && state->new_done) {
1518 tevent_req_done(req);
1522 static void push_database_new_done(struct tevent_req *subreq)
1524 struct tevent_req *req = tevent_req_callback_data(
1525 subreq, struct tevent_req);
1526 struct push_database_state *state = tevent_req_data(
1527 req, struct push_database_state);
1531 status = push_database_new_recv(subreq, &ret);
1533 tevent_req_error(req, ret);
1537 state->new_done = true;
1539 if (state->old_done && state->new_done) {
1540 tevent_req_done(req);
1544 static bool push_database_recv(struct tevent_req *req, int *perr)
1546 return generic_recv(req, perr);
1550 * Collect databases using highest sequence number
1553 struct collect_highseqnum_db_state {
1554 struct tevent_context *ev;
1555 struct ctdb_client_context *client;
1556 struct node_list *nlist;
1558 struct recdb_context *recdb;
1563 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1564 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1566 static struct tevent_req *collect_highseqnum_db_send(
1567 TALLOC_CTX *mem_ctx,
1568 struct tevent_context *ev,
1569 struct ctdb_client_context *client,
1570 struct node_list *nlist,
1572 struct recdb_context *recdb)
1574 struct tevent_req *req, *subreq;
1575 struct collect_highseqnum_db_state *state;
1576 struct ctdb_req_control request;
1578 req = tevent_req_create(mem_ctx, &state,
1579 struct collect_highseqnum_db_state);
1585 state->client = client;
1586 state->nlist = nlist;
1587 state->db_id = db_id;
1588 state->recdb = recdb;
1590 ctdb_req_control_get_db_seqnum(&request, db_id);
1591 subreq = ctdb_client_control_multi_send(mem_ctx,
1598 if (tevent_req_nomem(subreq, req)) {
1599 return tevent_req_post(req, ev);
1601 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1607 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1609 struct tevent_req *req = tevent_req_callback_data(
1610 subreq, struct tevent_req);
1611 struct collect_highseqnum_db_state *state = tevent_req_data(
1612 req, struct collect_highseqnum_db_state);
1613 struct ctdb_reply_control **reply;
1618 uint64_t seqnum, max_seqnum;
1621 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1623 TALLOC_FREE(subreq);
1628 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1629 state->nlist->count,
1633 D_ERR("control GET_DB_SEQNUM failed for db %s"
1634 " on node %u, ret=%d\n",
1635 recdb_name(state->recdb), pnn, ret2);
1637 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1639 recdb_name(state->recdb), ret);
1641 tevent_req_error(req, ret);
1646 state->max_pnn = state->nlist->pnn_list[0];
1647 max_caps = state->nlist->caps[0];
1648 for (i=0; i<state->nlist->count; i++) {
1649 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1651 tevent_req_error(req, EPROTO);
1655 if (max_seqnum < seqnum) {
1656 max_seqnum = seqnum;
1657 state->max_pnn = state->nlist->pnn_list[i];
1658 max_caps = state->nlist->caps[i];
1664 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1665 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1667 subreq = pull_database_send(state,
1673 if (tevent_req_nomem(subreq, req)) {
1676 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1680 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1682 struct tevent_req *req = tevent_req_callback_data(
1683 subreq, struct tevent_req);
1684 struct collect_highseqnum_db_state *state = tevent_req_data(
1685 req, struct collect_highseqnum_db_state);
1689 status = pull_database_recv(subreq, &ret);
1690 TALLOC_FREE(subreq);
1692 node_list_ban_credits(state->nlist, state->max_pnn);
1693 tevent_req_error(req, ret);
1697 tevent_req_done(req);
1700 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1702 return generic_recv(req, perr);
1706 * Collect all databases
1709 struct collect_all_db_state {
1710 struct tevent_context *ev;
1711 struct ctdb_client_context *client;
1712 struct node_list *nlist;
1714 struct recdb_context *recdb;
1716 struct ctdb_pulldb pulldb;
1720 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1722 static struct tevent_req *collect_all_db_send(
1723 TALLOC_CTX *mem_ctx,
1724 struct tevent_context *ev,
1725 struct ctdb_client_context *client,
1726 struct node_list *nlist,
1728 struct recdb_context *recdb)
1730 struct tevent_req *req, *subreq;
1731 struct collect_all_db_state *state;
1733 req = tevent_req_create(mem_ctx, &state,
1734 struct collect_all_db_state);
1740 state->client = client;
1741 state->nlist = nlist;
1742 state->db_id = db_id;
1743 state->recdb = recdb;
1746 subreq = pull_database_send(state,
1749 nlist->pnn_list[state->index],
1750 nlist->caps[state->index],
1752 if (tevent_req_nomem(subreq, req)) {
1753 return tevent_req_post(req, ev);
1755 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1760 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1762 struct tevent_req *req = tevent_req_callback_data(
1763 subreq, struct tevent_req);
1764 struct collect_all_db_state *state = tevent_req_data(
1765 req, struct collect_all_db_state);
1769 status = pull_database_recv(subreq, &ret);
1770 TALLOC_FREE(subreq);
1772 node_list_ban_credits(state->nlist,
1773 state->nlist->pnn_list[state->index]);
1774 tevent_req_error(req, ret);
1779 if (state->index == state->nlist->count) {
1780 tevent_req_done(req);
1784 subreq = pull_database_send(state,
1787 state->nlist->pnn_list[state->index],
1788 state->nlist->caps[state->index],
1790 if (tevent_req_nomem(subreq, req)) {
1793 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1796 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1798 return generic_recv(req, perr);
1803 * For each database do the following:
1804 * - Get DB name from all nodes
1805 * - Attach database on missing nodes
1807 * - Freeze database on all nodes
1808 * - Start transaction on all nodes
1809 * - Collect database from all nodes
1810 * - Wipe database on all nodes
1811 * - Push database to all nodes
1812 * - Commit transaction on all nodes
1813 * - Thaw database on all nodes
1816 struct recover_db_state {
1817 struct tevent_context *ev;
1818 struct ctdb_client_context *client;
1819 struct ctdb_tunable_list *tun_list;
1820 struct node_list *nlist;
1824 struct ctdb_transdb transdb;
1826 const char *db_name, *db_path;
1827 struct recdb_context *recdb;
1830 static void recover_db_name_done(struct tevent_req *subreq);
1831 static void recover_db_create_missing_done(struct tevent_req *subreq);
1832 static void recover_db_path_done(struct tevent_req *subreq);
1833 static void recover_db_freeze_done(struct tevent_req *subreq);
1834 static void recover_db_transaction_started(struct tevent_req *subreq);
1835 static void recover_db_collect_done(struct tevent_req *subreq);
1836 static void recover_db_wipedb_done(struct tevent_req *subreq);
1837 static void recover_db_pushdb_done(struct tevent_req *subreq);
1838 static void recover_db_transaction_committed(struct tevent_req *subreq);
1839 static void recover_db_thaw_done(struct tevent_req *subreq);
1841 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1842 struct tevent_context *ev,
1843 struct ctdb_client_context *client,
1844 struct ctdb_tunable_list *tun_list,
1845 struct node_list *nlist,
1846 uint32_t generation,
1849 struct tevent_req *req, *subreq;
1850 struct recover_db_state *state;
1851 struct ctdb_req_control request;
1853 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1859 state->client = client;
1860 state->tun_list = tun_list;
1861 state->nlist = nlist;
1864 state->destnode = ctdb_client_pnn(client);
1865 state->transdb.db_id = db->db_id;
1866 state->transdb.tid = generation;
1868 ctdb_req_control_get_dbname(&request, db->db_id);
1869 subreq = ctdb_client_control_multi_send(state,
1872 state->db->pnn_list,
1873 state->db->num_nodes,
1876 if (tevent_req_nomem(subreq, req)) {
1877 return tevent_req_post(req, ev);
1879 tevent_req_set_callback(subreq, recover_db_name_done, req);
1884 static void recover_db_name_done(struct tevent_req *subreq)
1886 struct tevent_req *req = tevent_req_callback_data(
1887 subreq, struct tevent_req);
1888 struct recover_db_state *state = tevent_req_data(
1889 req, struct recover_db_state);
1890 struct ctdb_reply_control **reply;
1896 status = ctdb_client_control_multi_recv(subreq,
1901 TALLOC_FREE(subreq);
1906 ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
1907 state->db->num_nodes,
1911 D_ERR("control GET_DBNAME failed on node %u,"
1916 D_ERR("control GET_DBNAME failed, ret=%d\n",
1919 tevent_req_error(req, ret);
1923 for (i = 0; i < state->db->num_nodes; i++) {
1924 const char *db_name;
1927 pnn = state->nlist->pnn_list[i];
1929 ret = ctdb_reply_control_get_dbname(reply[i],
1933 D_ERR("control GET_DBNAME failed on node %u "
1934 "for db=0x%x, ret=%d\n",
1938 tevent_req_error(req, EPROTO);
1942 if (state->db_name == NULL) {
1943 state->db_name = db_name;
1947 if (strcmp(state->db_name, db_name) != 0) {
1948 D_ERR("Incompatible database name for 0x%"PRIx32" "
1949 "(%s != %s) on node %"PRIu32"\n",
1954 node_list_ban_credits(state->nlist, pnn);
1955 tevent_req_error(req, ret);
1962 subreq = db_create_missing_send(state,
1969 if (tevent_req_nomem(subreq, req)) {
1972 tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
1975 static void recover_db_create_missing_done(struct tevent_req *subreq)
1977 struct tevent_req *req = tevent_req_callback_data(
1978 subreq, struct tevent_req);
1979 struct recover_db_state *state = tevent_req_data(
1980 req, struct recover_db_state);
1981 struct ctdb_req_control request;
1985 /* Could sanity check the db_id here */
1986 status = db_create_missing_recv(subreq, &ret);
1987 TALLOC_FREE(subreq);
1989 tevent_req_error(req, ret);
1993 ctdb_req_control_getdbpath(&request, state->db->db_id);
1994 subreq = ctdb_client_control_send(state, state->ev, state->client,
1995 state->destnode, TIMEOUT(),
1997 if (tevent_req_nomem(subreq, req)) {
2000 tevent_req_set_callback(subreq, recover_db_path_done, req);
2003 static void recover_db_path_done(struct tevent_req *subreq)
2005 struct tevent_req *req = tevent_req_callback_data(
2006 subreq, struct tevent_req);
2007 struct recover_db_state *state = tevent_req_data(
2008 req, struct recover_db_state);
2009 struct ctdb_reply_control *reply;
2010 struct ctdb_req_control request;
2014 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2015 TALLOC_FREE(subreq);
2017 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
2018 state->db_name, ret);
2019 tevent_req_error(req, ret);
2023 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
2025 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
2026 state->db_name, ret);
2027 tevent_req_error(req, EPROTO);
2033 ctdb_req_control_db_freeze(&request, state->db->db_id);
2034 subreq = ctdb_client_control_multi_send(state,
2037 state->nlist->pnn_list,
2038 state->nlist->count,
2041 if (tevent_req_nomem(subreq, req)) {
2044 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
2047 static void recover_db_freeze_done(struct tevent_req *subreq)
2049 struct tevent_req *req = tevent_req_callback_data(
2050 subreq, struct tevent_req);
2051 struct recover_db_state *state = tevent_req_data(
2052 req, struct recover_db_state);
2053 struct ctdb_req_control request;
2058 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2060 TALLOC_FREE(subreq);
2065 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2066 state->nlist->count,
2070 D_ERR("control FREEZE_DB failed for db %s"
2071 " on node %u, ret=%d\n",
2072 state->db_name, pnn, ret2);
2074 node_list_ban_credits(state->nlist, pnn);
2076 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
2077 state->db_name, ret);
2079 tevent_req_error(req, ret);
2083 ctdb_req_control_db_transaction_start(&request, &state->transdb);
2084 subreq = ctdb_client_control_multi_send(state,
2087 state->nlist->pnn_list,
2088 state->nlist->count,
2091 if (tevent_req_nomem(subreq, req)) {
2094 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
2097 static void recover_db_transaction_started(struct tevent_req *subreq)
2099 struct tevent_req *req = tevent_req_callback_data(
2100 subreq, struct tevent_req);
2101 struct recover_db_state *state = tevent_req_data(
2102 req, struct recover_db_state);
2108 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2110 TALLOC_FREE(subreq);
2115 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2116 state->nlist->count,
2120 D_ERR("control TRANSACTION_DB failed for db=%s"
2121 " on node %u, ret=%d\n",
2122 state->db_name, pnn, ret2);
2124 D_ERR("control TRANSACTION_DB failed for db=%s,"
2125 " ret=%d\n", state->db_name, ret);
2127 tevent_req_error(req, ret);
2131 flags = state->db->db_flags;
2132 state->recdb = recdb_create(state,
2136 state->tun_list->database_hash_size,
2137 flags & CTDB_DB_FLAGS_PERSISTENT);
2138 if (tevent_req_nomem(state->recdb, req)) {
2142 if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
2143 (flags & CTDB_DB_FLAGS_REPLICATED)) {
2144 subreq = collect_highseqnum_db_send(state,
2151 subreq = collect_all_db_send(state,
2158 if (tevent_req_nomem(subreq, req)) {
2161 tevent_req_set_callback(subreq, recover_db_collect_done, req);
2164 static void recover_db_collect_done(struct tevent_req *subreq)
2166 struct tevent_req *req = tevent_req_callback_data(
2167 subreq, struct tevent_req);
2168 struct recover_db_state *state = tevent_req_data(
2169 req, struct recover_db_state);
2170 struct ctdb_req_control request;
2174 if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
2175 (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
2176 status = collect_highseqnum_db_recv(subreq, &ret);
2178 status = collect_all_db_recv(subreq, &ret);
2180 TALLOC_FREE(subreq);
2182 tevent_req_error(req, ret);
2186 ctdb_req_control_wipe_database(&request, &state->transdb);
2187 subreq = ctdb_client_control_multi_send(state,
2190 state->nlist->pnn_list,
2191 state->nlist->count,
2194 if (tevent_req_nomem(subreq, req)) {
2197 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
2200 static void recover_db_wipedb_done(struct tevent_req *subreq)
2202 struct tevent_req *req = tevent_req_callback_data(
2203 subreq, struct tevent_req);
2204 struct recover_db_state *state = tevent_req_data(
2205 req, struct recover_db_state);
2210 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2212 TALLOC_FREE(subreq);
2217 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2218 state->nlist->count,
2222 D_ERR("control WIPEDB failed for db %s on node %u,"
2223 " ret=%d\n", state->db_name, pnn, ret2);
2225 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
2226 state->db_name, ret);
2228 tevent_req_error(req, ret);
2232 subreq = push_database_send(state,
2238 if (tevent_req_nomem(subreq, req)) {
2241 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
2244 static void recover_db_pushdb_done(struct tevent_req *subreq)
2246 struct tevent_req *req = tevent_req_callback_data(
2247 subreq, struct tevent_req);
2248 struct recover_db_state *state = tevent_req_data(
2249 req, struct recover_db_state);
2250 struct ctdb_req_control request;
2254 status = push_database_recv(subreq, &ret);
2255 TALLOC_FREE(subreq);
2257 tevent_req_error(req, ret);
2261 TALLOC_FREE(state->recdb);
2263 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
2264 subreq = ctdb_client_control_multi_send(state,
2267 state->nlist->pnn_list,
2268 state->nlist->count,
2271 if (tevent_req_nomem(subreq, req)) {
2274 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
2277 static void recover_db_transaction_committed(struct tevent_req *subreq)
2279 struct tevent_req *req = tevent_req_callback_data(
2280 subreq, struct tevent_req);
2281 struct recover_db_state *state = tevent_req_data(
2282 req, struct recover_db_state);
2283 struct ctdb_req_control request;
2288 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2290 TALLOC_FREE(subreq);
2295 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2296 state->nlist->count,
2300 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
2301 " on node %u, ret=%d\n",
2302 state->db_name, pnn, ret2);
2304 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
2305 " ret=%d\n", state->db_name, ret);
2307 tevent_req_error(req, ret);
2311 ctdb_req_control_db_thaw(&request, state->db->db_id);
2312 subreq = ctdb_client_control_multi_send(state,
2315 state->nlist->pnn_list,
2316 state->nlist->count,
2319 if (tevent_req_nomem(subreq, req)) {
2322 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
2325 static void recover_db_thaw_done(struct tevent_req *subreq)
2327 struct tevent_req *req = tevent_req_callback_data(
2328 subreq, struct tevent_req);
2329 struct recover_db_state *state = tevent_req_data(
2330 req, struct recover_db_state);
2335 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2337 TALLOC_FREE(subreq);
2342 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2343 state->nlist->count,
2347 D_ERR("control DB_THAW failed for db %s on node %u,"
2348 " ret=%d\n", state->db_name, pnn, ret2);
2350 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
2351 state->db_name, ret);
2353 tevent_req_error(req, ret);
2357 tevent_req_done(req);
2360 static bool recover_db_recv(struct tevent_req *req)
2362 return generic_recv(req, NULL);
2367 * Start database recovery for each database
2369 * Try to recover each database 5 times before failing recovery.
2372 struct db_recovery_state {
2373 struct tevent_context *ev;
2374 struct db_list *dblist;
2375 unsigned int num_replies;
2376 unsigned int num_failed;
2379 struct db_recovery_one_state {
2380 struct tevent_req *req;
2381 struct ctdb_client_context *client;
2382 struct db_list *dblist;
2383 struct ctdb_tunable_list *tun_list;
2384 struct node_list *nlist;
2385 uint32_t generation;
2390 static void db_recovery_one_done(struct tevent_req *subreq);
2392 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
2393 struct tevent_context *ev,
2394 struct ctdb_client_context *client,
2395 struct db_list *dblist,
2396 struct ctdb_tunable_list *tun_list,
2397 struct node_list *nlist,
2398 uint32_t generation)
2400 struct tevent_req *req, *subreq;
2401 struct db_recovery_state *state;
2404 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
2410 state->dblist = dblist;
2411 state->num_replies = 0;
2412 state->num_failed = 0;
2414 if (dblist->num_dbs == 0) {
2415 tevent_req_done(req);
2416 return tevent_req_post(req, ev);
2419 for (db = dblist->db; db != NULL; db = db->next) {
2420 struct db_recovery_one_state *substate;
2422 substate = talloc_zero(state, struct db_recovery_one_state);
2423 if (tevent_req_nomem(substate, req)) {
2424 return tevent_req_post(req, ev);
2427 substate->req = req;
2428 substate->client = client;
2429 substate->dblist = dblist;
2430 substate->tun_list = tun_list;
2431 substate->nlist = nlist;
2432 substate->generation = generation;
2435 subreq = recover_db_send(state,
2442 if (tevent_req_nomem(subreq, req)) {
2443 return tevent_req_post(req, ev);
2445 tevent_req_set_callback(subreq, db_recovery_one_done,
2447 D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
2453 static void db_recovery_one_done(struct tevent_req *subreq)
2455 struct db_recovery_one_state *substate = tevent_req_callback_data(
2456 subreq, struct db_recovery_one_state);
2457 struct tevent_req *req = substate->req;
2458 struct db_recovery_state *state = tevent_req_data(
2459 req, struct db_recovery_state);
2462 status = recover_db_recv(subreq);
2463 TALLOC_FREE(subreq);
2466 talloc_free(substate);
2470 substate->num_fails += 1;
2471 if (substate->num_fails < NUM_RETRIES) {
2472 subreq = recover_db_send(state,
2477 substate->generation,
2479 if (tevent_req_nomem(subreq, req)) {
2482 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2483 D_NOTICE("recover database 0x%08x, attempt %d\n",
2484 substate->db->db_id, substate->num_fails+1);
2489 state->num_failed += 1;
2492 state->num_replies += 1;
2494 if (state->num_replies == state->dblist->num_dbs) {
2495 tevent_req_done(req);
2499 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2501 struct db_recovery_state *state = tevent_req_data(
2502 req, struct db_recovery_state);
2505 if (tevent_req_is_unix_error(req, &err)) {
2510 *count = state->num_replies - state->num_failed;
2512 if (state->num_failed > 0) {
2519 struct ban_node_state {
2520 struct tevent_context *ev;
2521 struct ctdb_client_context *client;
2522 struct ctdb_tunable_list *tun_list;
2523 struct node_list *nlist;
2529 static bool ban_node_check(struct tevent_req *req);
2530 static void ban_node_check_done(struct tevent_req *subreq);
2531 static void ban_node_done(struct tevent_req *subreq);
2533 static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
2534 struct tevent_context *ev,
2535 struct ctdb_client_context *client,
2536 struct ctdb_tunable_list *tun_list,
2537 struct node_list *nlist)
2539 struct tevent_req *req;
2540 struct ban_node_state *state;
2543 req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
2549 state->client = client;
2550 state->tun_list = tun_list;
2551 state->nlist = nlist;
2552 state->destnode = ctdb_client_pnn(client);
2554 /* Bans are not enabled */
2555 if (state->tun_list->enable_bans == 0) {
2556 D_ERR("Bans are not enabled\n");
2557 tevent_req_done(req);
2558 return tevent_req_post(req, ev);
2561 ok = ban_node_check(req);
2563 return tevent_req_post(req, ev);
2569 static bool ban_node_check(struct tevent_req *req)
2571 struct tevent_req *subreq;
2572 struct ban_node_state *state = tevent_req_data(
2573 req, struct ban_node_state);
2574 struct ctdb_req_control request;
2575 unsigned max_credits = 0, i;
2577 for (i=0; i<state->nlist->count; i++) {
2578 if (state->nlist->ban_credits[i] > max_credits) {
2579 state->max_pnn = state->nlist->pnn_list[i];
2580 max_credits = state->nlist->ban_credits[i];
2584 if (max_credits < NUM_RETRIES) {
2585 tevent_req_done(req);
2589 ctdb_req_control_get_nodemap(&request);
2590 subreq = ctdb_client_control_send(state,
2596 if (tevent_req_nomem(subreq, req)) {
2599 tevent_req_set_callback(subreq, ban_node_check_done, req);
2604 static void ban_node_check_done(struct tevent_req *subreq)
2606 struct tevent_req *req = tevent_req_callback_data(
2607 subreq, struct tevent_req);
2608 struct ban_node_state *state = tevent_req_data(
2609 req, struct ban_node_state);
2610 struct ctdb_reply_control *reply;
2611 struct ctdb_node_map *nodemap;
2612 struct ctdb_req_control request;
2613 struct ctdb_ban_state ban;
2618 ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
2619 TALLOC_FREE(subreq);
2621 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2622 state->max_pnn, ret);
2623 tevent_req_error(req, ret);
2627 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2629 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2630 tevent_req_error(req, ret);
2634 for (i=0; i<nodemap->num; i++) {
2635 if (nodemap->node[i].pnn != state->max_pnn) {
2639 /* If the node became inactive, reset ban_credits */
2640 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2643 for (j=0; j<state->nlist->count; j++) {
2644 if (state->nlist->pnn_list[j] ==
2646 state->nlist->ban_credits[j] = 0;
2650 state->max_pnn = CTDB_UNKNOWN_PNN;
2654 talloc_free(nodemap);
2657 /* If node becames inactive during recovery, pick next */
2658 if (state->max_pnn == CTDB_UNKNOWN_PNN) {
2659 (void) ban_node_check(req);
2663 ban = (struct ctdb_ban_state) {
2664 .pnn = state->max_pnn,
2665 .time = state->tun_list->recovery_ban_period,
2668 D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
2670 ctdb_req_control_set_ban_state(&request, &ban);
2671 subreq = ctdb_client_control_send(state,
2677 if (tevent_req_nomem(subreq, req)) {
2680 tevent_req_set_callback(subreq, ban_node_done, req);
2683 static void ban_node_done(struct tevent_req *subreq)
2685 struct tevent_req *req = tevent_req_callback_data(
2686 subreq, struct tevent_req);
2687 struct node_ban_state *state = tevent_req_data(
2688 req, struct node_ban_state);
2689 struct ctdb_reply_control *reply;
2693 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2694 TALLOC_FREE(subreq);
2696 tevent_req_error(req, ret);
2700 ret = ctdb_reply_control_set_ban_state(reply);
2702 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2703 tevent_req_error(req, ret);
2708 tevent_req_done(req);
2711 static bool ban_node_recv(struct tevent_req *req, int *perr)
2713 if (tevent_req_is_unix_error(req, perr)) {
2721 * Run the parallel database recovery
2724 * - Get nodemap from all nodes
2725 * - Get capabilities from all nodes
2727 * - Set RECOVERY_ACTIVE
2728 * - Send START_RECOVERY
2729 * - Update vnnmap on all nodes
2730 * - Run database recovery
2731 * - Set RECOVERY_NORMAL
2732 * - Send END_RECOVERY
2735 struct recovery_state {
2736 struct tevent_context *ev;
2737 struct ctdb_client_context *client;
2738 uint32_t generation;
2740 struct node_list *nlist;
2741 struct ctdb_tunable_list *tun_list;
2742 struct ctdb_vnn_map *vnnmap;
2743 struct db_list *dblist;
2746 static void recovery_tunables_done(struct tevent_req *subreq);
2747 static void recovery_nodemap_done(struct tevent_req *subreq);
2748 static void recovery_nodemap_verify(struct tevent_req *subreq);
2749 static void recovery_capabilities_done(struct tevent_req *subreq);
2750 static void recovery_dbmap_done(struct tevent_req *subreq);
2751 static void recovery_active_done(struct tevent_req *subreq);
2752 static void recovery_start_recovery_done(struct tevent_req *subreq);
2753 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2754 static void recovery_db_recovery_done(struct tevent_req *subreq);
2755 static void recovery_failed_done(struct tevent_req *subreq);
2756 static void recovery_normal_done(struct tevent_req *subreq);
2757 static void recovery_end_recovery_done(struct tevent_req *subreq);
2759 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2760 struct tevent_context *ev,
2761 struct ctdb_client_context *client,
2762 uint32_t generation)
2764 struct tevent_req *req, *subreq;
2765 struct recovery_state *state;
2766 struct ctdb_req_control request;
2768 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2774 state->client = client;
2775 state->generation = generation;
2776 state->destnode = ctdb_client_pnn(client);
2778 ctdb_req_control_get_all_tunables(&request);
2779 subreq = ctdb_client_control_send(state, state->ev, state->client,
2780 state->destnode, TIMEOUT(),
2782 if (tevent_req_nomem(subreq, req)) {
2783 return tevent_req_post(req, ev);
2785 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2790 static void recovery_tunables_done(struct tevent_req *subreq)
2792 struct tevent_req *req = tevent_req_callback_data(
2793 subreq, struct tevent_req);
2794 struct recovery_state *state = tevent_req_data(
2795 req, struct recovery_state);
2796 struct ctdb_reply_control *reply;
2797 struct ctdb_req_control request;
2801 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2802 TALLOC_FREE(subreq);
2804 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2805 tevent_req_error(req, ret);
2809 ret = ctdb_reply_control_get_all_tunables(reply, state,
2812 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2813 tevent_req_error(req, EPROTO);
2819 recover_timeout = state->tun_list->recover_timeout;
2821 ctdb_req_control_get_nodemap(&request);
2822 subreq = ctdb_client_control_send(state, state->ev, state->client,
2823 state->destnode, TIMEOUT(),
2825 if (tevent_req_nomem(subreq, req)) {
2828 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2831 static void recovery_nodemap_done(struct tevent_req *subreq)
2833 struct tevent_req *req = tevent_req_callback_data(
2834 subreq, struct tevent_req);
2835 struct recovery_state *state = tevent_req_data(
2836 req, struct recovery_state);
2837 struct ctdb_reply_control *reply;
2838 struct ctdb_req_control request;
2839 struct ctdb_node_map *nodemap;
2844 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2845 TALLOC_FREE(subreq);
2847 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2848 state->destnode, ret);
2849 tevent_req_error(req, ret);
2853 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2855 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2856 tevent_req_error(req, ret);
2860 state->nlist = node_list_init(state, nodemap->num);
2861 if (tevent_req_nomem(state->nlist, req)) {
2865 for (i=0; i<nodemap->num; i++) {
2868 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2872 ok = node_list_add(state->nlist, nodemap->node[i].pnn);
2874 tevent_req_error(req, EINVAL);
2879 talloc_free(nodemap);
2882 /* Verify flags by getting local node information from each node */
2883 ctdb_req_control_get_nodemap(&request);
2884 subreq = ctdb_client_control_multi_send(state,
2887 state->nlist->pnn_list,
2888 state->nlist->count,
2891 if (tevent_req_nomem(subreq, req)) {
2894 tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
2897 static void recovery_nodemap_verify(struct tevent_req *subreq)
2899 struct tevent_req *req = tevent_req_callback_data(
2900 subreq, struct tevent_req);
2901 struct recovery_state *state = tevent_req_data(
2902 req, struct recovery_state);
2903 struct ctdb_req_control request;
2904 struct ctdb_reply_control **reply;
2905 struct node_list *nlist;
2911 status = ctdb_client_control_multi_recv(subreq,
2916 TALLOC_FREE(subreq);
2921 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2922 state->nlist->count,
2926 D_ERR("control GET_NODEMAP failed on node %u,"
2927 " ret=%d\n", pnn, ret2);
2929 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2931 tevent_req_error(req, ret);
2935 nlist = node_list_init(state, state->nlist->size);
2936 if (tevent_req_nomem(nlist, req)) {
2940 for (i=0; i<state->nlist->count; i++) {
2941 struct ctdb_node_map *nodemap = NULL;
2942 uint32_t pnn, flags;
2946 pnn = state->nlist->pnn_list[i];
2947 ret = ctdb_reply_control_get_nodemap(reply[i],
2951 D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
2952 tevent_req_error(req, EPROTO);
2956 flags = NODE_FLAGS_DISCONNECTED;
2957 for (j=0; j<nodemap->num; j++) {
2958 if (nodemap->node[j].pnn == pnn) {
2959 flags = nodemap->node[j].flags;
2964 TALLOC_FREE(nodemap);
2966 if (flags & NODE_FLAGS_INACTIVE) {
2970 ok = node_list_add(nlist, pnn);
2972 tevent_req_error(req, EINVAL);
2979 talloc_free(state->nlist);
2980 state->nlist = nlist;
2982 ctdb_req_control_get_capabilities(&request);
2983 subreq = ctdb_client_control_multi_send(state,
2986 state->nlist->pnn_list,
2987 state->nlist->count,
2990 if (tevent_req_nomem(subreq, req)) {
2993 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2996 static void recovery_capabilities_done(struct tevent_req *subreq)
2998 struct tevent_req *req = tevent_req_callback_data(
2999 subreq, struct tevent_req);
3000 struct recovery_state *state = tevent_req_data(
3001 req, struct recovery_state);
3002 struct ctdb_reply_control **reply;
3003 struct ctdb_req_control request;
3009 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3011 TALLOC_FREE(subreq);
3016 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3017 state->nlist->count,
3021 D_ERR("control GET_CAPABILITIES failed on node %u,"
3022 " ret=%d\n", pnn, ret2);
3024 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
3027 tevent_req_error(req, ret);
3031 for (i=0; i<state->nlist->count; i++) {
3034 ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
3036 D_ERR("control GET_CAPABILITIES failed on node %u\n",
3037 state->nlist->pnn_list[i]);
3038 tevent_req_error(req, EPROTO);
3042 state->nlist->caps[i] = caps;
3047 ctdb_req_control_get_dbmap(&request);
3048 subreq = ctdb_client_control_multi_send(state,
3051 state->nlist->pnn_list,
3052 state->nlist->count,
3055 if (tevent_req_nomem(subreq, req)) {
3058 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
3061 static void recovery_dbmap_done(struct tevent_req *subreq)
3063 struct tevent_req *req = tevent_req_callback_data(
3064 subreq, struct tevent_req);
3065 struct recovery_state *state = tevent_req_data(
3066 req, struct recovery_state);
3067 struct ctdb_reply_control **reply;
3068 struct ctdb_req_control request;
3074 status = ctdb_client_control_multi_recv(subreq,
3079 TALLOC_FREE(subreq);
3084 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3085 state->nlist->count,
3089 D_ERR("control GET_DBMAP failed on node %u,"
3090 " ret=%d\n", pnn, ret2);
3092 D_ERR("control GET_DBMAP failed, ret=%d\n",
3095 tevent_req_error(req, ret);
3099 state->dblist = db_list_init(state, state->nlist->count);
3100 if (tevent_req_nomem(state->dblist, req)) {
3101 D_ERR("memory allocation error\n");
3105 for (i = 0; i < state->nlist->count; i++) {
3106 struct ctdb_dbid_map *dbmap = NULL;
3109 pnn = state->nlist->pnn_list[i];
3111 ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
3113 D_ERR("control GET_DBMAP failed on node %u\n",
3115 tevent_req_error(req, EPROTO);
3119 for (j = 0; j < dbmap->num; j++) {
3120 ret = db_list_check_and_add(state->dblist,
3121 dbmap->dbs[j].db_id,
3122 dbmap->dbs[j].flags,
3125 D_ERR("failed to add database list entry, "
3128 tevent_req_error(req, ret);
3136 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
3137 subreq = ctdb_client_control_multi_send(state,
3140 state->nlist->pnn_list,
3141 state->nlist->count,
3144 if (tevent_req_nomem(subreq, req)) {
3147 tevent_req_set_callback(subreq, recovery_active_done, req);
3150 static void recovery_active_done(struct tevent_req *subreq)
3152 struct tevent_req *req = tevent_req_callback_data(
3153 subreq, struct tevent_req);
3154 struct recovery_state *state = tevent_req_data(
3155 req, struct recovery_state);
3156 struct ctdb_req_control request;
3157 struct ctdb_vnn_map *vnnmap;
3162 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3164 TALLOC_FREE(subreq);
3169 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3170 state->nlist->count,
3174 D_ERR("failed to set recovery mode ACTIVE on node %u,"
3175 " ret=%d\n", pnn, ret2);
3177 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
3180 tevent_req_error(req, ret);
3184 D_ERR("Set recovery mode to ACTIVE\n");
3186 /* Calculate new VNNMAP */
3187 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
3188 if (tevent_req_nomem(vnnmap, req)) {
3192 vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
3193 if (tevent_req_nomem(vnnmap->map, req)) {
3197 if (vnnmap->size == 0) {
3198 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
3199 vnnmap->map[0] = state->destnode;
3203 vnnmap->generation = state->generation;
3205 state->vnnmap = vnnmap;
3207 ctdb_req_control_start_recovery(&request);
3208 subreq = ctdb_client_control_multi_send(state,
3211 state->nlist->pnn_list,
3212 state->nlist->count,
3215 if (tevent_req_nomem(subreq, req)) {
3218 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
3221 static void recovery_start_recovery_done(struct tevent_req *subreq)
3223 struct tevent_req *req = tevent_req_callback_data(
3224 subreq, struct tevent_req);
3225 struct recovery_state *state = tevent_req_data(
3226 req, struct recovery_state);
3227 struct ctdb_req_control request;
3232 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3234 TALLOC_FREE(subreq);
3239 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3240 state->nlist->count,
3244 D_ERR("failed to run start_recovery event on node %u,"
3245 " ret=%d\n", pnn, ret2);
3247 D_ERR("failed to run start_recovery event, ret=%d\n",
3250 tevent_req_error(req, ret);
3254 D_ERR("start_recovery event finished\n");
3256 ctdb_req_control_setvnnmap(&request, state->vnnmap);
3257 subreq = ctdb_client_control_multi_send(state,
3260 state->nlist->pnn_list,
3261 state->nlist->count,
3264 if (tevent_req_nomem(subreq, req)) {
3267 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
3270 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
3272 struct tevent_req *req = tevent_req_callback_data(
3273 subreq, struct tevent_req);
3274 struct recovery_state *state = tevent_req_data(
3275 req, struct recovery_state);
3280 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3282 TALLOC_FREE(subreq);
3287 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3288 state->nlist->count,
3292 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
3295 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
3297 tevent_req_error(req, ret);
3301 D_NOTICE("updated VNNMAP\n");
3303 subreq = db_recovery_send(state,
3309 state->vnnmap->generation);
3310 if (tevent_req_nomem(subreq, req)) {
3313 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
3316 static void recovery_db_recovery_done(struct tevent_req *subreq)
3318 struct tevent_req *req = tevent_req_callback_data(
3319 subreq, struct tevent_req);
3320 struct recovery_state *state = tevent_req_data(
3321 req, struct recovery_state);
3322 struct ctdb_req_control request;
3326 status = db_recovery_recv(subreq, &count);
3327 TALLOC_FREE(subreq);
3329 D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
3332 subreq = ban_node_send(state,
3337 if (tevent_req_nomem(subreq, req)) {
3340 tevent_req_set_callback(subreq, recovery_failed_done, req);
3344 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
3345 subreq = ctdb_client_control_multi_send(state,
3348 state->nlist->pnn_list,
3349 state->nlist->count,
3352 if (tevent_req_nomem(subreq, req)) {
3355 tevent_req_set_callback(subreq, recovery_normal_done, req);
3358 static void recovery_failed_done(struct tevent_req *subreq)
3360 struct tevent_req *req = tevent_req_callback_data(
3361 subreq, struct tevent_req);
3365 status = ban_node_recv(subreq, &ret);
3366 TALLOC_FREE(subreq);
3368 D_ERR("failed to ban node, ret=%d\n", ret);
3371 tevent_req_error(req, EIO);
3374 static void recovery_normal_done(struct tevent_req *subreq)
3376 struct tevent_req *req = tevent_req_callback_data(
3377 subreq, struct tevent_req);
3378 struct recovery_state *state = tevent_req_data(
3379 req, struct recovery_state);
3380 struct ctdb_req_control request;
3385 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3387 TALLOC_FREE(subreq);
3392 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3393 state->nlist->count,
3397 D_ERR("failed to set recovery mode NORMAL on node %u,"
3398 " ret=%d\n", pnn, ret2);
3400 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3403 tevent_req_error(req, ret);
3407 D_ERR("Set recovery mode to NORMAL\n");
3409 ctdb_req_control_end_recovery(&request);
3410 subreq = ctdb_client_control_multi_send(state,
3413 state->nlist->pnn_list,
3414 state->nlist->count,
3417 if (tevent_req_nomem(subreq, req)) {
3420 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
3423 static void recovery_end_recovery_done(struct tevent_req *subreq)
3425 struct tevent_req *req = tevent_req_callback_data(
3426 subreq, struct tevent_req);
3427 struct recovery_state *state = tevent_req_data(
3428 req, struct recovery_state);
3433 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3435 TALLOC_FREE(subreq);
3440 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3441 state->nlist->count,
3445 D_ERR("failed to run recovered event on node %u,"
3446 " ret=%d\n", pnn, ret2);
3448 D_ERR("failed to run recovered event, ret=%d\n", ret);
3450 tevent_req_error(req, ret);
3454 D_ERR("recovered event finished\n");
3456 tevent_req_done(req);
3459 static void recovery_recv(struct tevent_req *req, int *perr)
3461 generic_recv(req, perr);
3464 static void usage(const char *progname)
3466 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3472 * Arguments - log fd, write fd, socket path, generation
3474 int main(int argc, char *argv[])
3477 const char *sockpath;
3478 TALLOC_CTX *mem_ctx = NULL;
3479 struct tevent_context *ev;
3480 struct ctdb_client_context *client;
3482 struct tevent_req *req;
3483 uint32_t generation;
3490 write_fd = atoi(argv[1]);
3492 generation = (uint32_t)smb_strtoul(argv[3],
3498 fprintf(stderr, "recovery: unable to initialize generation\n");
3502 mem_ctx = talloc_new(NULL);
3503 if (mem_ctx == NULL) {
3504 fprintf(stderr, "recovery: talloc_new() failed\n");
3508 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
3510 fprintf(stderr, "recovery: Unable to initialize logging\n");
3514 ev = tevent_context_init(mem_ctx);
3516 D_ERR("tevent_context_init() failed\n");
3520 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
3522 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
3526 req = recovery_send(mem_ctx, ev, client, generation);
3528 D_ERR("database_recover_send() failed\n");
3532 if (! tevent_req_poll(req, ev)) {
3533 D_ERR("tevent_req_poll() failed\n");
3537 recovery_recv(req, &ret);
3540 D_ERR("database recovery failed, ret=%d\n", ret);
3544 sys_write(write_fd, &ret, sizeof(ret));
3548 TALLOC_FREE(mem_ctx);