2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
38 #include "common/logging.h"
40 static int recover_timeout = 30;
44 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
50 static bool generic_recv(struct tevent_req *req, int *perr)
54 if (tevent_req_is_unix_error(req, &err)) {
64 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
66 static uint64_t srvid_next(void)
73 * Recovery database functions
76 struct recdb_context {
84 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
87 uint32_t hash_size, bool persistent)
89 static char *db_dir_state = NULL;
90 struct recdb_context *recdb;
91 unsigned int tdb_flags;
93 recdb = talloc(mem_ctx, struct recdb_context);
98 if (db_dir_state == NULL) {
99 db_dir_state = getenv("CTDB_DBDIR_STATE");
102 recdb->db_name = db_name;
103 recdb->db_id = db_id;
104 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
105 db_dir_state != NULL ?
107 dirname(discard_const(db_path)),
109 if (recdb->db_path == NULL) {
113 unlink(recdb->db_path);
115 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
116 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
117 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
118 if (recdb->db == NULL) {
120 D_ERR("failed to create recovery db %s\n", recdb->db_path);
124 recdb->persistent = persistent;
129 static uint32_t recdb_id(struct recdb_context *recdb)
134 static const char *recdb_name(struct recdb_context *recdb)
136 return recdb->db_name;
139 static const char *recdb_path(struct recdb_context *recdb)
141 return recdb->db_path;
144 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
146 return recdb->db->tdb;
149 static bool recdb_persistent(struct recdb_context *recdb)
151 return recdb->persistent;
154 struct recdb_add_traverse_state {
155 struct recdb_context *recdb;
159 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
160 TDB_DATA key, TDB_DATA data,
163 struct recdb_add_traverse_state *state =
164 (struct recdb_add_traverse_state *)private_data;
165 struct ctdb_ltdb_header *hdr;
169 /* header is not marshalled separately in the pulldb control */
170 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
174 hdr = (struct ctdb_ltdb_header *)data.dptr;
176 /* fetch the existing record, if any */
177 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
179 if (prev_data.dptr != NULL) {
180 struct ctdb_ltdb_header prev_hdr;
182 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
183 free(prev_data.dptr);
184 if (hdr->rsn < prev_hdr.rsn ||
185 (hdr->rsn == prev_hdr.rsn &&
186 prev_hdr.dmaster != state->mypnn)) {
191 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
198 static bool recdb_add(struct recdb_context *recdb, int mypnn,
199 struct ctdb_rec_buffer *recbuf)
201 struct recdb_add_traverse_state state;
207 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
217 uint32_t reqid, uint32_t dmaster,
218 TDB_DATA key, TDB_DATA data)
220 struct ctdb_ltdb_header *header;
223 /* Skip empty records */
224 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
228 /* update the dmaster field to point to us */
229 header = (struct ctdb_ltdb_header *)data.dptr;
231 header->dmaster = dmaster;
232 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
235 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
243 struct recdb_records_traverse_state {
244 struct ctdb_rec_buffer *recbuf;
251 static int recdb_records_traverse(struct tdb_context *tdb,
252 TDB_DATA key, TDB_DATA data,
255 struct recdb_records_traverse_state *state =
256 (struct recdb_records_traverse_state *)private_data;
259 ret = recbuf_filter_add(state->recbuf, state->persistent,
260 state->reqid, state->dmaster, key, data);
262 state->failed = true;
269 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
273 struct recdb_records_traverse_state state;
276 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
277 if (state.recbuf == NULL) {
280 state.dmaster = dmaster;
282 state.persistent = recdb_persistent(recdb);
283 state.failed = false;
285 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
287 if (ret == -1 || state.failed) {
288 D_ERR("Failed to marshall recovery records for %s\n",
290 TALLOC_FREE(state.recbuf);
297 struct recdb_file_traverse_state {
298 struct ctdb_rec_buffer *recbuf;
299 struct recdb_context *recdb;
310 static int recdb_file_traverse(struct tdb_context *tdb,
311 TDB_DATA key, TDB_DATA data,
314 struct recdb_file_traverse_state *state =
315 (struct recdb_file_traverse_state *)private_data;
318 ret = recbuf_filter_add(state->recbuf, state->persistent,
319 state->reqid, state->dmaster, key, data);
321 state->failed = true;
325 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
326 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
328 D_ERR("Failed to collect recovery records for %s\n",
329 recdb_name(state->recdb));
330 state->failed = true;
334 state->num_buffers += 1;
336 TALLOC_FREE(state->recbuf);
337 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
338 recdb_id(state->recdb));
339 if (state->recbuf == NULL) {
340 state->failed = true;
348 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
349 uint32_t dmaster, int fd, int max_size)
351 struct recdb_file_traverse_state state;
354 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
355 if (state.recbuf == NULL) {
359 state.mem_ctx = mem_ctx;
360 state.dmaster = dmaster;
362 state.persistent = recdb_persistent(recdb);
363 state.failed = false;
365 state.max_size = max_size;
366 state.num_buffers = 0;
368 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
369 if (ret == -1 || state.failed) {
370 TALLOC_FREE(state.recbuf);
374 ret = ctdb_rec_buffer_write(state.recbuf, fd);
376 D_ERR("Failed to collect recovery records for %s\n",
378 TALLOC_FREE(state.recbuf);
381 state.num_buffers += 1;
383 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384 state.num_buffers, recdb_name(recdb));
386 return state.num_buffers;
390 * Pull database from a single node
393 struct pull_database_state {
394 struct tevent_context *ev;
395 struct ctdb_client_context *client;
396 struct recdb_context *recdb;
402 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
404 static void pull_database_register_done(struct tevent_req *subreq);
405 static void pull_database_old_done(struct tevent_req *subreq);
406 static void pull_database_unregister_done(struct tevent_req *subreq);
407 static void pull_database_new_done(struct tevent_req *subreq);
409 static struct tevent_req *pull_database_send(
411 struct tevent_context *ev,
412 struct ctdb_client_context *client,
413 uint32_t pnn, uint32_t caps,
414 struct recdb_context *recdb)
416 struct tevent_req *req, *subreq;
417 struct pull_database_state *state;
418 struct ctdb_req_control request;
420 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
426 state->client = client;
427 state->recdb = recdb;
429 state->srvid = srvid_next();
431 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
432 subreq = ctdb_client_set_message_handler_send(
433 state, state->ev, state->client,
434 state->srvid, pull_database_handler,
436 if (tevent_req_nomem(subreq, req)) {
437 return tevent_req_post(req, ev);
440 tevent_req_set_callback(subreq, pull_database_register_done,
444 struct ctdb_pulldb pulldb;
446 pulldb.db_id = recdb_id(recdb);
447 pulldb.lmaster = CTDB_LMASTER_ANY;
449 ctdb_req_control_pull_db(&request, &pulldb);
450 subreq = ctdb_client_control_send(state, state->ev,
454 if (tevent_req_nomem(subreq, req)) {
455 return tevent_req_post(req, ev);
457 tevent_req_set_callback(subreq, pull_database_old_done, req);
463 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
466 struct tevent_req *req = talloc_get_type_abort(
467 private_data, struct tevent_req);
468 struct pull_database_state *state = tevent_req_data(
469 req, struct pull_database_state);
470 struct ctdb_rec_buffer *recbuf;
475 if (srvid != state->srvid) {
479 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
481 D_ERR("Invalid data received for DB_PULL messages\n");
485 if (recbuf->db_id != recdb_id(state->recdb)) {
487 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
488 recbuf->db_id, recdb_name(state->recdb));
492 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
496 D_ERR("Failed to add records to recdb for %s\n",
497 recdb_name(state->recdb));
501 state->num_records += recbuf->count;
505 static void pull_database_register_done(struct tevent_req *subreq)
507 struct tevent_req *req = tevent_req_callback_data(
508 subreq, struct tevent_req);
509 struct pull_database_state *state = tevent_req_data(
510 req, struct pull_database_state);
511 struct ctdb_req_control request;
512 struct ctdb_pulldb_ext pulldb_ext;
516 status = ctdb_client_set_message_handler_recv(subreq, &ret);
519 D_ERR("Failed to set message handler for DB_PULL for %s\n",
520 recdb_name(state->recdb));
521 tevent_req_error(req, ret);
525 pulldb_ext.db_id = recdb_id(state->recdb);
526 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
527 pulldb_ext.srvid = state->srvid;
529 ctdb_req_control_db_pull(&request, &pulldb_ext);
530 subreq = ctdb_client_control_send(state, state->ev, state->client,
531 state->pnn, TIMEOUT(), &request);
532 if (tevent_req_nomem(subreq, req)) {
535 tevent_req_set_callback(subreq, pull_database_new_done, req);
538 static void pull_database_old_done(struct tevent_req *subreq)
540 struct tevent_req *req = tevent_req_callback_data(
541 subreq, struct tevent_req);
542 struct pull_database_state *state = tevent_req_data(
543 req, struct pull_database_state);
544 struct ctdb_reply_control *reply;
545 struct ctdb_rec_buffer *recbuf;
549 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
552 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
553 recdb_name(state->recdb), state->pnn, ret);
554 tevent_req_error(req, ret);
558 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
561 tevent_req_error(req, ret);
565 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
569 tevent_req_error(req, EIO);
573 state->num_records = recbuf->count;
576 D_INFO("Pulled %d records for db %s from node %d\n",
577 state->num_records, recdb_name(state->recdb), state->pnn);
579 tevent_req_done(req);
582 static void pull_database_new_done(struct tevent_req *subreq)
584 struct tevent_req *req = tevent_req_callback_data(
585 subreq, struct tevent_req);
586 struct pull_database_state *state = tevent_req_data(
587 req, struct pull_database_state);
588 struct ctdb_reply_control *reply;
589 uint32_t num_records;
593 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
596 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
597 recdb_name(state->recdb), state->pnn, ret);
598 tevent_req_error(req, ret);
602 ret = ctdb_reply_control_db_pull(reply, &num_records);
604 if (num_records != state->num_records) {
605 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
606 num_records, state->num_records,
607 recdb_name(state->recdb));
608 tevent_req_error(req, EIO);
612 D_INFO("Pulled %d records for db %s from node %d\n",
613 state->num_records, recdb_name(state->recdb), state->pnn);
615 subreq = ctdb_client_remove_message_handler_send(
616 state, state->ev, state->client,
618 if (tevent_req_nomem(subreq, req)) {
621 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
624 static void pull_database_unregister_done(struct tevent_req *subreq)
626 struct tevent_req *req = tevent_req_callback_data(
627 subreq, struct tevent_req);
628 struct pull_database_state *state = tevent_req_data(
629 req, struct pull_database_state);
633 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
636 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
637 recdb_name(state->recdb));
638 tevent_req_error(req, ret);
642 tevent_req_done(req);
645 static bool pull_database_recv(struct tevent_req *req, int *perr)
647 return generic_recv(req, perr);
651 * Push database to specified nodes (old style)
654 struct push_database_old_state {
655 struct tevent_context *ev;
656 struct ctdb_client_context *client;
657 struct recdb_context *recdb;
660 struct ctdb_rec_buffer *recbuf;
664 static void push_database_old_push_done(struct tevent_req *subreq);
666 static struct tevent_req *push_database_old_send(
668 struct tevent_context *ev,
669 struct ctdb_client_context *client,
670 uint32_t *pnn_list, int count,
671 struct recdb_context *recdb)
673 struct tevent_req *req, *subreq;
674 struct push_database_old_state *state;
675 struct ctdb_req_control request;
678 req = tevent_req_create(mem_ctx, &state,
679 struct push_database_old_state);
685 state->client = client;
686 state->recdb = recdb;
687 state->pnn_list = pnn_list;
688 state->count = count;
691 state->recbuf = recdb_records(recdb, state,
692 ctdb_client_pnn(client));
693 if (tevent_req_nomem(state->recbuf, req)) {
694 return tevent_req_post(req, ev);
697 pnn = state->pnn_list[state->index];
699 ctdb_req_control_push_db(&request, state->recbuf);
700 subreq = ctdb_client_control_send(state, ev, client, pnn,
701 TIMEOUT(), &request);
702 if (tevent_req_nomem(subreq, req)) {
703 return tevent_req_post(req, ev);
705 tevent_req_set_callback(subreq, push_database_old_push_done, req);
710 static void push_database_old_push_done(struct tevent_req *subreq)
712 struct tevent_req *req = tevent_req_callback_data(
713 subreq, struct tevent_req);
714 struct push_database_old_state *state = tevent_req_data(
715 req, struct push_database_old_state);
716 struct ctdb_req_control request;
721 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
724 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
725 recdb_name(state->recdb), state->pnn_list[state->index],
727 tevent_req_error(req, ret);
732 if (state->index == state->count) {
733 TALLOC_FREE(state->recbuf);
734 tevent_req_done(req);
738 pnn = state->pnn_list[state->index];
740 ctdb_req_control_push_db(&request, state->recbuf);
741 subreq = ctdb_client_control_send(state, state->ev, state->client,
742 pnn, TIMEOUT(), &request);
743 if (tevent_req_nomem(subreq, req)) {
746 tevent_req_set_callback(subreq, push_database_old_push_done, req);
749 static bool push_database_old_recv(struct tevent_req *req, int *perr)
751 return generic_recv(req, perr);
755 * Push database to specified nodes (new style)
758 struct push_database_new_state {
759 struct tevent_context *ev;
760 struct ctdb_client_context *client;
761 struct recdb_context *recdb;
768 int num_buffers_sent;
772 static void push_database_new_started(struct tevent_req *subreq);
773 static void push_database_new_send_msg(struct tevent_req *req);
774 static void push_database_new_send_done(struct tevent_req *subreq);
775 static void push_database_new_confirmed(struct tevent_req *subreq);
777 static struct tevent_req *push_database_new_send(
779 struct tevent_context *ev,
780 struct ctdb_client_context *client,
781 uint32_t *pnn_list, int count,
782 struct recdb_context *recdb,
785 struct tevent_req *req, *subreq;
786 struct push_database_new_state *state;
787 struct ctdb_req_control request;
788 struct ctdb_pulldb_ext pulldb_ext;
792 req = tevent_req_create(mem_ctx, &state,
793 struct push_database_new_state);
799 state->client = client;
800 state->recdb = recdb;
801 state->pnn_list = pnn_list;
802 state->count = count;
804 state->srvid = srvid_next();
805 state->dmaster = ctdb_client_pnn(client);
806 state->num_buffers_sent = 0;
807 state->num_records = 0;
809 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
810 if (tevent_req_nomem(filename, req)) {
811 return tevent_req_post(req, ev);
814 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
815 if (state->fd == -1) {
816 tevent_req_error(req, errno);
817 return tevent_req_post(req, ev);
820 talloc_free(filename);
822 state->num_buffers = recdb_file(recdb, state, state->dmaster,
823 state->fd, max_size);
824 if (state->num_buffers == -1) {
825 tevent_req_error(req, ENOMEM);
826 return tevent_req_post(req, ev);
829 offset = lseek(state->fd, 0, SEEK_SET);
831 tevent_req_error(req, EIO);
832 return tevent_req_post(req, ev);
835 pulldb_ext.db_id = recdb_id(recdb);
836 pulldb_ext.srvid = state->srvid;
838 ctdb_req_control_db_push_start(&request, &pulldb_ext);
839 subreq = ctdb_client_control_multi_send(state, ev, client,
841 TIMEOUT(), &request);
842 if (tevent_req_nomem(subreq, req)) {
843 return tevent_req_post(req, ev);
845 tevent_req_set_callback(subreq, push_database_new_started, req);
850 static void push_database_new_started(struct tevent_req *subreq)
852 struct tevent_req *req = tevent_req_callback_data(
853 subreq, struct tevent_req);
854 struct push_database_new_state *state = tevent_req_data(
855 req, struct push_database_new_state);
860 status = ctdb_client_control_multi_recv(subreq, &ret, state,
867 ret2 = ctdb_client_control_multi_error(state->pnn_list,
871 D_ERR("control DB_PUSH_START failed for db %s"
872 " on node %u, ret=%d\n",
873 recdb_name(state->recdb), pnn, ret2);
875 D_ERR("control DB_PUSH_START failed for db %s,"
877 recdb_name(state->recdb), ret);
879 talloc_free(err_list);
881 tevent_req_error(req, ret);
885 push_database_new_send_msg(req);
888 static void push_database_new_send_msg(struct tevent_req *req)
890 struct push_database_new_state *state = tevent_req_data(
891 req, struct push_database_new_state);
892 struct tevent_req *subreq;
893 struct ctdb_rec_buffer *recbuf;
894 struct ctdb_req_message message;
899 if (state->num_buffers_sent == state->num_buffers) {
900 struct ctdb_req_control request;
902 ctdb_req_control_db_push_confirm(&request,
903 recdb_id(state->recdb));
904 subreq = ctdb_client_control_multi_send(state, state->ev,
908 TIMEOUT(), &request);
909 if (tevent_req_nomem(subreq, req)) {
912 tevent_req_set_callback(subreq, push_database_new_confirmed,
917 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
919 tevent_req_error(req, ret);
923 data.dsize = ctdb_rec_buffer_len(recbuf);
924 data.dptr = talloc_size(state, data.dsize);
925 if (tevent_req_nomem(data.dptr, req)) {
929 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
931 message.srvid = state->srvid;
932 message.data.data = data;
934 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
935 state->num_buffers_sent, recbuf->count,
936 recdb_name(state->recdb));
938 subreq = ctdb_client_message_multi_send(state, state->ev,
940 state->pnn_list, state->count,
942 if (tevent_req_nomem(subreq, req)) {
945 tevent_req_set_callback(subreq, push_database_new_send_done, req);
947 state->num_records += recbuf->count;
949 talloc_free(data.dptr);
953 static void push_database_new_send_done(struct tevent_req *subreq)
955 struct tevent_req *req = tevent_req_callback_data(
956 subreq, struct tevent_req);
957 struct push_database_new_state *state = tevent_req_data(
958 req, struct push_database_new_state);
962 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
965 D_ERR("Sending recovery records failed for %s\n",
966 recdb_name(state->recdb));
967 tevent_req_error(req, ret);
971 state->num_buffers_sent += 1;
973 push_database_new_send_msg(req);
976 static void push_database_new_confirmed(struct tevent_req *subreq)
978 struct tevent_req *req = tevent_req_callback_data(
979 subreq, struct tevent_req);
980 struct push_database_new_state *state = tevent_req_data(
981 req, struct push_database_new_state);
982 struct ctdb_reply_control **reply;
986 uint32_t num_records;
988 status = ctdb_client_control_multi_recv(subreq, &ret, state,
995 ret2 = ctdb_client_control_multi_error(state->pnn_list,
996 state->count, err_list,
999 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1000 " on node %u, ret=%d\n",
1001 recdb_name(state->recdb), pnn, ret2);
1003 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1005 recdb_name(state->recdb), ret);
1007 tevent_req_error(req, ret);
1011 for (i=0; i<state->count; i++) {
1012 ret = ctdb_reply_control_db_push_confirm(reply[i],
1015 tevent_req_error(req, EPROTO);
1019 if (num_records != state->num_records) {
1020 D_ERR("Node %u received %d of %d records for %s\n",
1021 state->pnn_list[i], num_records,
1022 state->num_records, recdb_name(state->recdb));
1023 tevent_req_error(req, EPROTO);
1030 D_INFO("Pushed %d records for db %s\n",
1031 state->num_records, recdb_name(state->recdb));
1033 tevent_req_done(req);
1036 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1038 return generic_recv(req, perr);
1042 * wrapper for push_database_old and push_database_new
1045 struct push_database_state {
1046 bool old_done, new_done;
1049 static void push_database_old_done(struct tevent_req *subreq);
1050 static void push_database_new_done(struct tevent_req *subreq);
1052 static struct tevent_req *push_database_send(
1053 TALLOC_CTX *mem_ctx,
1054 struct tevent_context *ev,
1055 struct ctdb_client_context *client,
1056 uint32_t *pnn_list, int count, uint32_t *caps,
1057 struct ctdb_tunable_list *tun_list,
1058 struct recdb_context *recdb)
1060 struct tevent_req *req, *subreq;
1061 struct push_database_state *state;
1062 uint32_t *old_list, *new_list;
1063 int old_count, new_count;
1066 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1071 state->old_done = false;
1072 state->new_done = false;
1076 old_list = talloc_array(state, uint32_t, count);
1077 new_list = talloc_array(state, uint32_t, count);
1078 if (tevent_req_nomem(old_list, req) ||
1079 tevent_req_nomem(new_list,req)) {
1080 return tevent_req_post(req, ev);
1083 for (i=0; i<count; i++) {
1084 uint32_t pnn = pnn_list[i];
1086 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1087 new_list[new_count] = pnn;
1090 old_list[old_count] = pnn;
1095 if (old_count > 0) {
1096 subreq = push_database_old_send(state, ev, client,
1097 old_list, old_count, recdb);
1098 if (tevent_req_nomem(subreq, req)) {
1099 return tevent_req_post(req, ev);
1101 tevent_req_set_callback(subreq, push_database_old_done, req);
1103 state->old_done = true;
1106 if (new_count > 0) {
1107 subreq = push_database_new_send(state, ev, client,
1108 new_list, new_count, recdb,
1109 tun_list->rec_buffer_size_limit);
1110 if (tevent_req_nomem(subreq, req)) {
1111 return tevent_req_post(req, ev);
1113 tevent_req_set_callback(subreq, push_database_new_done, req);
1115 state->new_done = true;
1121 static void push_database_old_done(struct tevent_req *subreq)
1123 struct tevent_req *req = tevent_req_callback_data(
1124 subreq, struct tevent_req);
1125 struct push_database_state *state = tevent_req_data(
1126 req, struct push_database_state);
1130 status = push_database_old_recv(subreq, &ret);
1132 tevent_req_error(req, ret);
1136 state->old_done = true;
1138 if (state->old_done && state->new_done) {
1139 tevent_req_done(req);
1143 static void push_database_new_done(struct tevent_req *subreq)
1145 struct tevent_req *req = tevent_req_callback_data(
1146 subreq, struct tevent_req);
1147 struct push_database_state *state = tevent_req_data(
1148 req, struct push_database_state);
1152 status = push_database_new_recv(subreq, &ret);
1154 tevent_req_error(req, ret);
1158 state->new_done = true;
1160 if (state->old_done && state->new_done) {
1161 tevent_req_done(req);
1165 static bool push_database_recv(struct tevent_req *req, int *perr)
1167 return generic_recv(req, perr);
1171 * Collect databases using highest sequence number
1174 struct collect_highseqnum_db_state {
1175 struct tevent_context *ev;
1176 struct ctdb_client_context *client;
1180 uint32_t *ban_credits;
1182 struct recdb_context *recdb;
1186 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1187 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1189 static struct tevent_req *collect_highseqnum_db_send(
1190 TALLOC_CTX *mem_ctx,
1191 struct tevent_context *ev,
1192 struct ctdb_client_context *client,
1193 uint32_t *pnn_list, int count, uint32_t *caps,
1194 uint32_t *ban_credits, uint32_t db_id,
1195 struct recdb_context *recdb)
1197 struct tevent_req *req, *subreq;
1198 struct collect_highseqnum_db_state *state;
1199 struct ctdb_req_control request;
1201 req = tevent_req_create(mem_ctx, &state,
1202 struct collect_highseqnum_db_state);
1208 state->client = client;
1209 state->pnn_list = pnn_list;
1210 state->count = count;
1212 state->ban_credits = ban_credits;
1213 state->db_id = db_id;
1214 state->recdb = recdb;
1216 ctdb_req_control_get_db_seqnum(&request, db_id);
1217 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1218 state->pnn_list, state->count,
1219 TIMEOUT(), &request);
1220 if (tevent_req_nomem(subreq, req)) {
1221 return tevent_req_post(req, ev);
1223 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1229 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1231 struct tevent_req *req = tevent_req_callback_data(
1232 subreq, struct tevent_req);
1233 struct collect_highseqnum_db_state *state = tevent_req_data(
1234 req, struct collect_highseqnum_db_state);
1235 struct ctdb_reply_control **reply;
1239 uint64_t seqnum, max_seqnum;
1241 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1243 TALLOC_FREE(subreq);
1248 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1249 state->count, err_list,
1252 D_ERR("control GET_DB_SEQNUM failed for db %s"
1253 " on node %u, ret=%d\n",
1254 recdb_name(state->recdb), pnn, ret2);
1256 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1258 recdb_name(state->recdb), ret);
1260 tevent_req_error(req, ret);
1265 state->max_pnn = state->pnn_list[0];
1266 for (i=0; i<state->count; i++) {
1267 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1269 tevent_req_error(req, EPROTO);
1273 if (max_seqnum < seqnum) {
1274 max_seqnum = seqnum;
1275 state->max_pnn = state->pnn_list[i];
1281 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1282 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1284 subreq = pull_database_send(state, state->ev, state->client,
1286 state->caps[state->max_pnn],
1288 if (tevent_req_nomem(subreq, req)) {
1291 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1295 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1297 struct tevent_req *req = tevent_req_callback_data(
1298 subreq, struct tevent_req);
1299 struct collect_highseqnum_db_state *state = tevent_req_data(
1300 req, struct collect_highseqnum_db_state);
1304 status = pull_database_recv(subreq, &ret);
1305 TALLOC_FREE(subreq);
1307 state->ban_credits[state->max_pnn] += 1;
1308 tevent_req_error(req, ret);
1312 tevent_req_done(req);
1315 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1317 return generic_recv(req, perr);
1321 * Collect all databases
1324 struct collect_all_db_state {
1325 struct tevent_context *ev;
1326 struct ctdb_client_context *client;
1330 uint32_t *ban_credits;
1332 struct recdb_context *recdb;
1333 struct ctdb_pulldb pulldb;
1337 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1339 static struct tevent_req *collect_all_db_send(
1340 TALLOC_CTX *mem_ctx,
1341 struct tevent_context *ev,
1342 struct ctdb_client_context *client,
1343 uint32_t *pnn_list, int count, uint32_t *caps,
1344 uint32_t *ban_credits, uint32_t db_id,
1345 struct recdb_context *recdb)
1347 struct tevent_req *req, *subreq;
1348 struct collect_all_db_state *state;
1351 req = tevent_req_create(mem_ctx, &state,
1352 struct collect_all_db_state);
1358 state->client = client;
1359 state->pnn_list = pnn_list;
1360 state->count = count;
1362 state->ban_credits = ban_credits;
1363 state->db_id = db_id;
1364 state->recdb = recdb;
1367 pnn = state->pnn_list[state->index];
1369 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1370 if (tevent_req_nomem(subreq, req)) {
1371 return tevent_req_post(req, ev);
1373 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1378 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1380 struct tevent_req *req = tevent_req_callback_data(
1381 subreq, struct tevent_req);
1382 struct collect_all_db_state *state = tevent_req_data(
1383 req, struct collect_all_db_state);
1388 status = pull_database_recv(subreq, &ret);
1389 TALLOC_FREE(subreq);
1391 pnn = state->pnn_list[state->index];
1392 state->ban_credits[pnn] += 1;
1393 tevent_req_error(req, ret);
1398 if (state->index == state->count) {
1399 tevent_req_done(req);
1403 pnn = state->pnn_list[state->index];
1404 subreq = pull_database_send(state, state->ev, state->client,
1405 pnn, state->caps[pnn], state->recdb);
1406 if (tevent_req_nomem(subreq, req)) {
1409 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1412 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1414 return generic_recv(req, perr);
1419 * For each database do the following:
1422 * - Freeze database on all nodes
1423 * - Start transaction on all nodes
1424 * - Collect database from all nodes
1425 * - Wipe database on all nodes
1426 * - Push database to all nodes
1427 * - Commit transaction on all nodes
1428 * - Thaw database on all nodes
1431 struct recover_db_state {
1432 struct tevent_context *ev;
1433 struct ctdb_client_context *client;
1434 struct ctdb_tunable_list *tun_list;
1438 uint32_t *ban_credits;
1443 struct ctdb_transdb transdb;
1445 const char *db_name, *db_path;
1446 struct recdb_context *recdb;
1449 static void recover_db_name_done(struct tevent_req *subreq);
1450 static void recover_db_path_done(struct tevent_req *subreq);
1451 static void recover_db_freeze_done(struct tevent_req *subreq);
1452 static void recover_db_transaction_started(struct tevent_req *subreq);
1453 static void recover_db_collect_done(struct tevent_req *subreq);
1454 static void recover_db_wipedb_done(struct tevent_req *subreq);
1455 static void recover_db_pushdb_done(struct tevent_req *subreq);
1456 static void recover_db_transaction_committed(struct tevent_req *subreq);
1457 static void recover_db_thaw_done(struct tevent_req *subreq);
1459 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1460 struct tevent_context *ev,
1461 struct ctdb_client_context *client,
1462 struct ctdb_tunable_list *tun_list,
1463 uint32_t *pnn_list, int count,
1465 uint32_t *ban_credits,
1466 uint32_t generation,
1467 uint32_t db_id, uint8_t db_flags)
1469 struct tevent_req *req, *subreq;
1470 struct recover_db_state *state;
1471 struct ctdb_req_control request;
1473 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1479 state->client = client;
1480 state->tun_list = tun_list;
1481 state->pnn_list = pnn_list;
1482 state->count = count;
1484 state->ban_credits = ban_credits;
1485 state->db_id = db_id;
1486 state->db_flags = db_flags;
1488 state->destnode = ctdb_client_pnn(client);
1489 state->transdb.db_id = db_id;
1490 state->transdb.tid = generation;
1492 ctdb_req_control_get_dbname(&request, db_id);
1493 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1494 TIMEOUT(), &request);
1495 if (tevent_req_nomem(subreq, req)) {
1496 return tevent_req_post(req, ev);
1498 tevent_req_set_callback(subreq, recover_db_name_done, req);
1503 static void recover_db_name_done(struct tevent_req *subreq)
1505 struct tevent_req *req = tevent_req_callback_data(
1506 subreq, struct tevent_req);
1507 struct recover_db_state *state = tevent_req_data(
1508 req, struct recover_db_state);
1509 struct ctdb_reply_control *reply;
1510 struct ctdb_req_control request;
1514 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1515 TALLOC_FREE(subreq);
1517 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1519 tevent_req_error(req, ret);
1523 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1525 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1527 tevent_req_error(req, EPROTO);
1533 ctdb_req_control_getdbpath(&request, state->db_id);
1534 subreq = ctdb_client_control_send(state, state->ev, state->client,
1535 state->destnode, TIMEOUT(),
1537 if (tevent_req_nomem(subreq, req)) {
1540 tevent_req_set_callback(subreq, recover_db_path_done, req);
1543 static void recover_db_path_done(struct tevent_req *subreq)
1545 struct tevent_req *req = tevent_req_callback_data(
1546 subreq, struct tevent_req);
1547 struct recover_db_state *state = tevent_req_data(
1548 req, struct recover_db_state);
1549 struct ctdb_reply_control *reply;
1550 struct ctdb_req_control request;
1554 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1555 TALLOC_FREE(subreq);
1557 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1558 state->db_name, ret);
1559 tevent_req_error(req, ret);
1563 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1565 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1566 state->db_name, ret);
1567 tevent_req_error(req, EPROTO);
1573 ctdb_req_control_db_freeze(&request, state->db_id);
1574 subreq = ctdb_client_control_multi_send(state, state->ev,
1576 state->pnn_list, state->count,
1577 TIMEOUT(), &request);
1578 if (tevent_req_nomem(subreq, req)) {
1581 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1584 static void recover_db_freeze_done(struct tevent_req *subreq)
1586 struct tevent_req *req = tevent_req_callback_data(
1587 subreq, struct tevent_req);
1588 struct recover_db_state *state = tevent_req_data(
1589 req, struct recover_db_state);
1590 struct ctdb_req_control request;
1595 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1597 TALLOC_FREE(subreq);
1602 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1603 state->count, err_list,
1606 D_ERR("control FREEZE_DB failed for db %s"
1607 " on node %u, ret=%d\n",
1608 state->db_name, pnn, ret2);
1609 state->ban_credits[pnn] += 1;
1611 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1612 state->db_name, ret);
1614 tevent_req_error(req, ret);
1618 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1619 subreq = ctdb_client_control_multi_send(state, state->ev,
1621 state->pnn_list, state->count,
1622 TIMEOUT(), &request);
1623 if (tevent_req_nomem(subreq, req)) {
1626 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1629 static void recover_db_transaction_started(struct tevent_req *subreq)
1631 struct tevent_req *req = tevent_req_callback_data(
1632 subreq, struct tevent_req);
1633 struct recover_db_state *state = tevent_req_data(
1634 req, struct recover_db_state);
1639 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1641 TALLOC_FREE(subreq);
1646 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1650 D_ERR("control TRANSACTION_DB failed for db=%s"
1651 " on node %u, ret=%d\n",
1652 state->db_name, pnn, ret2);
1654 D_ERR("control TRANSACTION_DB failed for db=%s,"
1655 " ret=%d\n", state->db_name, ret);
1657 tevent_req_error(req, ret);
1661 state->recdb = recdb_create(state, state->db_id, state->db_name,
1663 state->tun_list->database_hash_size,
1664 state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1665 if (tevent_req_nomem(state->recdb, req)) {
1669 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1670 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1671 subreq = collect_highseqnum_db_send(
1672 state, state->ev, state->client,
1673 state->pnn_list, state->count, state->caps,
1674 state->ban_credits, state->db_id,
1677 subreq = collect_all_db_send(
1678 state, state->ev, state->client,
1679 state->pnn_list, state->count, state->caps,
1680 state->ban_credits, state->db_id,
1683 if (tevent_req_nomem(subreq, req)) {
1686 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1689 static void recover_db_collect_done(struct tevent_req *subreq)
1691 struct tevent_req *req = tevent_req_callback_data(
1692 subreq, struct tevent_req);
1693 struct recover_db_state *state = tevent_req_data(
1694 req, struct recover_db_state);
1695 struct ctdb_req_control request;
1699 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1700 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1701 status = collect_highseqnum_db_recv(subreq, &ret);
1703 status = collect_all_db_recv(subreq, &ret);
1705 TALLOC_FREE(subreq);
1707 tevent_req_error(req, ret);
1711 ctdb_req_control_wipe_database(&request, &state->transdb);
1712 subreq = ctdb_client_control_multi_send(state, state->ev,
1714 state->pnn_list, state->count,
1715 TIMEOUT(), &request);
1716 if (tevent_req_nomem(subreq, req)) {
1719 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1722 static void recover_db_wipedb_done(struct tevent_req *subreq)
1724 struct tevent_req *req = tevent_req_callback_data(
1725 subreq, struct tevent_req);
1726 struct recover_db_state *state = tevent_req_data(
1727 req, struct recover_db_state);
1732 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1734 TALLOC_FREE(subreq);
1739 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1743 D_ERR("control WIPEDB failed for db %s on node %u,"
1744 " ret=%d\n", state->db_name, pnn, ret2);
1746 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1747 state->db_name, ret);
1749 tevent_req_error(req, ret);
1753 subreq = push_database_send(state, state->ev, state->client,
1754 state->pnn_list, state->count,
1755 state->caps, state->tun_list,
1757 if (tevent_req_nomem(subreq, req)) {
1760 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1763 static void recover_db_pushdb_done(struct tevent_req *subreq)
1765 struct tevent_req *req = tevent_req_callback_data(
1766 subreq, struct tevent_req);
1767 struct recover_db_state *state = tevent_req_data(
1768 req, struct recover_db_state);
1769 struct ctdb_req_control request;
1773 status = push_database_recv(subreq, &ret);
1774 TALLOC_FREE(subreq);
1776 tevent_req_error(req, ret);
1780 TALLOC_FREE(state->recdb);
1782 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1783 subreq = ctdb_client_control_multi_send(state, state->ev,
1785 state->pnn_list, state->count,
1786 TIMEOUT(), &request);
1787 if (tevent_req_nomem(subreq, req)) {
1790 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1793 static void recover_db_transaction_committed(struct tevent_req *subreq)
1795 struct tevent_req *req = tevent_req_callback_data(
1796 subreq, struct tevent_req);
1797 struct recover_db_state *state = tevent_req_data(
1798 req, struct recover_db_state);
1799 struct ctdb_req_control request;
1804 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1806 TALLOC_FREE(subreq);
1811 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1815 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1816 " on node %u, ret=%d\n",
1817 state->db_name, pnn, ret2);
1819 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1820 " ret=%d\n", state->db_name, ret);
1822 tevent_req_error(req, ret);
1826 ctdb_req_control_db_thaw(&request, state->db_id);
1827 subreq = ctdb_client_control_multi_send(state, state->ev,
1829 state->pnn_list, state->count,
1830 TIMEOUT(), &request);
1831 if (tevent_req_nomem(subreq, req)) {
1834 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1837 static void recover_db_thaw_done(struct tevent_req *subreq)
1839 struct tevent_req *req = tevent_req_callback_data(
1840 subreq, struct tevent_req);
1841 struct recover_db_state *state = tevent_req_data(
1842 req, struct recover_db_state);
1847 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1849 TALLOC_FREE(subreq);
1854 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1858 D_ERR("control DB_THAW failed for db %s on node %u,"
1859 " ret=%d\n", state->db_name, pnn, ret2);
1861 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1862 state->db_name, ret);
1864 tevent_req_error(req, ret);
1868 tevent_req_done(req);
1871 static bool recover_db_recv(struct tevent_req *req)
1873 return generic_recv(req, NULL);
1878 * Start database recovery for each database
1880 * Try to recover each database 5 times before failing recovery.
1883 struct db_recovery_state {
1884 struct tevent_context *ev;
1885 struct ctdb_dbid_map *dbmap;
1890 struct db_recovery_one_state {
1891 struct tevent_req *req;
1892 struct ctdb_client_context *client;
1893 struct ctdb_dbid_map *dbmap;
1894 struct ctdb_tunable_list *tun_list;
1898 uint32_t *ban_credits;
1899 uint32_t generation;
1905 static void db_recovery_one_done(struct tevent_req *subreq);
1907 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1908 struct tevent_context *ev,
1909 struct ctdb_client_context *client,
1910 struct ctdb_dbid_map *dbmap,
1911 struct ctdb_tunable_list *tun_list,
1912 uint32_t *pnn_list, int count,
1914 uint32_t *ban_credits,
1915 uint32_t generation)
1917 struct tevent_req *req, *subreq;
1918 struct db_recovery_state *state;
1921 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1927 state->dbmap = dbmap;
1928 state->num_replies = 0;
1929 state->num_failed = 0;
1931 if (dbmap->num == 0) {
1932 tevent_req_done(req);
1933 return tevent_req_post(req, ev);
1936 for (i=0; i<dbmap->num; i++) {
1937 struct db_recovery_one_state *substate;
1939 substate = talloc_zero(state, struct db_recovery_one_state);
1940 if (tevent_req_nomem(substate, req)) {
1941 return tevent_req_post(req, ev);
1944 substate->req = req;
1945 substate->client = client;
1946 substate->dbmap = dbmap;
1947 substate->tun_list = tun_list;
1948 substate->pnn_list = pnn_list;
1949 substate->count = count;
1950 substate->caps = caps;
1951 substate->ban_credits = ban_credits;
1952 substate->generation = generation;
1953 substate->db_id = dbmap->dbs[i].db_id;
1954 substate->db_flags = dbmap->dbs[i].flags;
1956 subreq = recover_db_send(state, ev, client, tun_list,
1957 pnn_list, count, caps, ban_credits,
1958 generation, substate->db_id,
1959 substate->db_flags);
1960 if (tevent_req_nomem(subreq, req)) {
1961 return tevent_req_post(req, ev);
1963 tevent_req_set_callback(subreq, db_recovery_one_done,
1965 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1971 static void db_recovery_one_done(struct tevent_req *subreq)
1973 struct db_recovery_one_state *substate = tevent_req_callback_data(
1974 subreq, struct db_recovery_one_state);
1975 struct tevent_req *req = substate->req;
1976 struct db_recovery_state *state = tevent_req_data(
1977 req, struct db_recovery_state);
1980 status = recover_db_recv(subreq);
1981 TALLOC_FREE(subreq);
1984 talloc_free(substate);
1988 substate->num_fails += 1;
1989 if (substate->num_fails < NUM_RETRIES) {
1990 subreq = recover_db_send(state, state->ev, substate->client,
1992 substate->pnn_list, substate->count,
1993 substate->caps, substate->ban_credits,
1994 substate->generation, substate->db_id,
1995 substate->db_flags);
1996 if (tevent_req_nomem(subreq, req)) {
1999 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2000 D_NOTICE("recover database 0x%08x, attempt %d\n",
2001 substate->db_id, substate->num_fails+1);
2006 state->num_failed += 1;
2009 state->num_replies += 1;
2011 if (state->num_replies == state->dbmap->num) {
2012 tevent_req_done(req);
2016 static bool db_recovery_recv(struct tevent_req *req, int *count)
2018 struct db_recovery_state *state = tevent_req_data(
2019 req, struct db_recovery_state);
2022 if (tevent_req_is_unix_error(req, &err)) {
2027 *count = state->num_replies - state->num_failed;
2029 if (state->num_failed > 0) {
2038 * Run the parallel database recovery
2043 * - Get capabilities from all nodes
2045 * - Set RECOVERY_ACTIVE
2046 * - Send START_RECOVERY
2047 * - Update vnnmap on all nodes
2048 * - Run database recovery
2049 * - Set RECOVERY_NORMAL
2050 * - Send END_RECOVERY
2053 struct recovery_state {
2054 struct tevent_context *ev;
2055 struct ctdb_client_context *client;
2056 uint32_t generation;
2060 struct ctdb_node_map *nodemap;
2062 uint32_t *ban_credits;
2063 struct ctdb_tunable_list *tun_list;
2064 struct ctdb_vnn_map *vnnmap;
2065 struct ctdb_dbid_map *dbmap;
2068 static void recovery_tunables_done(struct tevent_req *subreq);
2069 static void recovery_nodemap_done(struct tevent_req *subreq);
2070 static void recovery_vnnmap_done(struct tevent_req *subreq);
2071 static void recovery_capabilities_done(struct tevent_req *subreq);
2072 static void recovery_dbmap_done(struct tevent_req *subreq);
2073 static void recovery_active_done(struct tevent_req *subreq);
2074 static void recovery_start_recovery_done(struct tevent_req *subreq);
2075 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2076 static void recovery_db_recovery_done(struct tevent_req *subreq);
2077 static void recovery_failed_done(struct tevent_req *subreq);
2078 static void recovery_normal_done(struct tevent_req *subreq);
2079 static void recovery_end_recovery_done(struct tevent_req *subreq);
2081 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2082 struct tevent_context *ev,
2083 struct ctdb_client_context *client,
2084 uint32_t generation)
2086 struct tevent_req *req, *subreq;
2087 struct recovery_state *state;
2088 struct ctdb_req_control request;
2090 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2096 state->client = client;
2097 state->generation = generation;
2098 state->destnode = ctdb_client_pnn(client);
2100 ctdb_req_control_get_all_tunables(&request);
2101 subreq = ctdb_client_control_send(state, state->ev, state->client,
2102 state->destnode, TIMEOUT(),
2104 if (tevent_req_nomem(subreq, req)) {
2105 return tevent_req_post(req, ev);
2107 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2112 static void recovery_tunables_done(struct tevent_req *subreq)
2114 struct tevent_req *req = tevent_req_callback_data(
2115 subreq, struct tevent_req);
2116 struct recovery_state *state = tevent_req_data(
2117 req, struct recovery_state);
2118 struct ctdb_reply_control *reply;
2119 struct ctdb_req_control request;
2123 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2124 TALLOC_FREE(subreq);
2126 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2127 tevent_req_error(req, ret);
2131 ret = ctdb_reply_control_get_all_tunables(reply, state,
2134 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2135 tevent_req_error(req, EPROTO);
2141 recover_timeout = state->tun_list->recover_timeout;
2143 ctdb_req_control_get_nodemap(&request);
2144 subreq = ctdb_client_control_send(state, state->ev, state->client,
2145 state->destnode, TIMEOUT(),
2147 if (tevent_req_nomem(subreq, req)) {
2150 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2153 static void recovery_nodemap_done(struct tevent_req *subreq)
2155 struct tevent_req *req = tevent_req_callback_data(
2156 subreq, struct tevent_req);
2157 struct recovery_state *state = tevent_req_data(
2158 req, struct recovery_state);
2159 struct ctdb_reply_control *reply;
2160 struct ctdb_req_control request;
2164 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2165 TALLOC_FREE(subreq);
2167 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2168 state->destnode, ret);
2169 tevent_req_error(req, ret);
2173 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2175 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2176 tevent_req_error(req, ret);
2180 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2181 state, &state->pnn_list);
2182 if (state->count <= 0) {
2183 tevent_req_error(req, ENOMEM);
2187 state->ban_credits = talloc_zero_array(state, uint32_t,
2188 state->nodemap->num);
2189 if (tevent_req_nomem(state->ban_credits, req)) {
2193 ctdb_req_control_getvnnmap(&request);
2194 subreq = ctdb_client_control_send(state, state->ev, state->client,
2195 state->destnode, TIMEOUT(),
2197 if (tevent_req_nomem(subreq, req)) {
2200 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2203 static void recovery_vnnmap_done(struct tevent_req *subreq)
2205 struct tevent_req *req = tevent_req_callback_data(
2206 subreq, struct tevent_req);
2207 struct recovery_state *state = tevent_req_data(
2208 req, struct recovery_state);
2209 struct ctdb_reply_control *reply;
2210 struct ctdb_req_control request;
2214 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2215 TALLOC_FREE(subreq);
2217 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2218 state->destnode, ret);
2219 tevent_req_error(req, ret);
2223 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2225 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2226 tevent_req_error(req, ret);
2230 ctdb_req_control_get_capabilities(&request);
2231 subreq = ctdb_client_control_multi_send(state, state->ev,
2233 state->pnn_list, state->count,
2234 TIMEOUT(), &request);
2235 if (tevent_req_nomem(subreq, req)) {
2238 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2241 static void recovery_capabilities_done(struct tevent_req *subreq)
2243 struct tevent_req *req = tevent_req_callback_data(
2244 subreq, struct tevent_req);
2245 struct recovery_state *state = tevent_req_data(
2246 req, struct recovery_state);
2247 struct ctdb_reply_control **reply;
2248 struct ctdb_req_control request;
2253 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2255 TALLOC_FREE(subreq);
2260 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2264 D_ERR("control GET_CAPABILITIES failed on node %u,"
2265 " ret=%d\n", pnn, ret2);
2267 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2270 tevent_req_error(req, ret);
2274 /* Make the array size same as nodemap */
2275 state->caps = talloc_zero_array(state, uint32_t,
2276 state->nodemap->num);
2277 if (tevent_req_nomem(state->caps, req)) {
2281 for (i=0; i<state->count; i++) {
2284 pnn = state->pnn_list[i];
2285 ret = ctdb_reply_control_get_capabilities(reply[i],
2288 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2290 tevent_req_error(req, EPROTO);
2297 ctdb_req_control_get_dbmap(&request);
2298 subreq = ctdb_client_control_send(state, state->ev, state->client,
2299 state->destnode, TIMEOUT(),
2301 if (tevent_req_nomem(subreq, req)) {
2304 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2307 static void recovery_dbmap_done(struct tevent_req *subreq)
2309 struct tevent_req *req = tevent_req_callback_data(
2310 subreq, struct tevent_req);
2311 struct recovery_state *state = tevent_req_data(
2312 req, struct recovery_state);
2313 struct ctdb_reply_control *reply;
2314 struct ctdb_req_control request;
2318 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2319 TALLOC_FREE(subreq);
2321 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2322 state->destnode, ret);
2323 tevent_req_error(req, ret);
2327 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2329 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2330 tevent_req_error(req, ret);
2334 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2335 subreq = ctdb_client_control_multi_send(state, state->ev,
2337 state->pnn_list, state->count,
2338 TIMEOUT(), &request);
2339 if (tevent_req_nomem(subreq, req)) {
2342 tevent_req_set_callback(subreq, recovery_active_done, req);
2345 static void recovery_active_done(struct tevent_req *subreq)
2347 struct tevent_req *req = tevent_req_callback_data(
2348 subreq, struct tevent_req);
2349 struct recovery_state *state = tevent_req_data(
2350 req, struct recovery_state);
2351 struct ctdb_req_control request;
2352 struct ctdb_vnn_map *vnnmap;
2357 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2359 TALLOC_FREE(subreq);
2364 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2368 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2369 " ret=%d\n", pnn, ret2);
2371 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2374 tevent_req_error(req, ret);
2378 D_ERR("Set recovery mode to ACTIVE\n");
2380 /* Calculate new VNNMAP */
2382 for (i=0; i<state->nodemap->num; i++) {
2383 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2386 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2393 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2396 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2397 if (tevent_req_nomem(vnnmap, req)) {
2401 vnnmap->size = (count == 0 ? 1 : count);
2402 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2403 if (tevent_req_nomem(vnnmap->map, req)) {
2408 vnnmap->map[0] = state->destnode;
2411 for (i=0; i<state->nodemap->num; i++) {
2412 if (state->nodemap->node[i].flags &
2413 NODE_FLAGS_INACTIVE) {
2416 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2420 vnnmap->map[count] = state->nodemap->node[i].pnn;
2425 vnnmap->generation = state->generation;
2427 talloc_free(state->vnnmap);
2428 state->vnnmap = vnnmap;
2430 ctdb_req_control_start_recovery(&request);
2431 subreq = ctdb_client_control_multi_send(state, state->ev,
2433 state->pnn_list, state->count,
2434 TIMEOUT(), &request);
2435 if (tevent_req_nomem(subreq, req)) {
2438 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2441 static void recovery_start_recovery_done(struct tevent_req *subreq)
2443 struct tevent_req *req = tevent_req_callback_data(
2444 subreq, struct tevent_req);
2445 struct recovery_state *state = tevent_req_data(
2446 req, struct recovery_state);
2447 struct ctdb_req_control request;
2452 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2454 TALLOC_FREE(subreq);
2459 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2463 D_ERR("failed to run start_recovery event on node %u,"
2464 " ret=%d\n", pnn, ret2);
2466 D_ERR("failed to run start_recovery event, ret=%d\n",
2469 tevent_req_error(req, ret);
2473 D_ERR("start_recovery event finished\n");
2475 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2476 subreq = ctdb_client_control_multi_send(state, state->ev,
2478 state->pnn_list, state->count,
2479 TIMEOUT(), &request);
2480 if (tevent_req_nomem(subreq, req)) {
2483 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2486 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2488 struct tevent_req *req = tevent_req_callback_data(
2489 subreq, struct tevent_req);
2490 struct recovery_state *state = tevent_req_data(
2491 req, struct recovery_state);
2496 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2498 TALLOC_FREE(subreq);
2503 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2507 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2510 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2512 tevent_req_error(req, ret);
2516 D_NOTICE("updated VNNMAP\n");
2518 subreq = db_recovery_send(state, state->ev, state->client,
2519 state->dbmap, state->tun_list,
2520 state->pnn_list, state->count,
2521 state->caps, state->ban_credits,
2522 state->vnnmap->generation);
2523 if (tevent_req_nomem(subreq, req)) {
2526 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2529 static void recovery_db_recovery_done(struct tevent_req *subreq)
2531 struct tevent_req *req = tevent_req_callback_data(
2532 subreq, struct tevent_req);
2533 struct recovery_state *state = tevent_req_data(
2534 req, struct recovery_state);
2535 struct ctdb_req_control request;
2539 status = db_recovery_recv(subreq, &count);
2540 TALLOC_FREE(subreq);
2542 D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2545 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2548 /* Bans are not enabled */
2549 if (state->tun_list->enable_bans == 0) {
2550 tevent_req_error(req, EIO);
2554 for (i=0; i<state->count; i++) {
2556 pnn = state->pnn_list[i];
2557 if (state->ban_credits[pnn] > max_credits) {
2559 max_credits = state->ban_credits[pnn];
2563 /* If pulling database fails multiple times */
2564 if (max_credits >= NUM_RETRIES) {
2565 struct ctdb_req_message message;
2567 D_ERR("Assigning banning credits to node %u\n",
2570 message.srvid = CTDB_SRVID_BANNING;
2571 message.data.pnn = max_pnn;
2573 subreq = ctdb_client_message_send(
2574 state, state->ev, state->client,
2575 ctdb_client_pnn(state->client),
2577 if (tevent_req_nomem(subreq, req)) {
2580 tevent_req_set_callback(subreq, recovery_failed_done,
2583 tevent_req_error(req, EIO);
2588 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2589 subreq = ctdb_client_control_multi_send(state, state->ev,
2591 state->pnn_list, state->count,
2592 TIMEOUT(), &request);
2593 if (tevent_req_nomem(subreq, req)) {
2596 tevent_req_set_callback(subreq, recovery_normal_done, req);
2599 static void recovery_failed_done(struct tevent_req *subreq)
2601 struct tevent_req *req = tevent_req_callback_data(
2602 subreq, struct tevent_req);
2606 status = ctdb_client_message_recv(subreq, &ret);
2607 TALLOC_FREE(subreq);
2609 D_ERR("failed to assign banning credits, ret=%d\n", ret);
2612 tevent_req_error(req, EIO);
2615 static void recovery_normal_done(struct tevent_req *subreq)
2617 struct tevent_req *req = tevent_req_callback_data(
2618 subreq, struct tevent_req);
2619 struct recovery_state *state = tevent_req_data(
2620 req, struct recovery_state);
2621 struct ctdb_req_control request;
2626 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2628 TALLOC_FREE(subreq);
2633 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2637 D_ERR("failed to set recovery mode NORMAL on node %u,"
2638 " ret=%d\n", pnn, ret2);
2640 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2643 tevent_req_error(req, ret);
2647 D_ERR("Set recovery mode to NORMAL\n");
2649 ctdb_req_control_end_recovery(&request);
2650 subreq = ctdb_client_control_multi_send(state, state->ev,
2652 state->pnn_list, state->count,
2653 TIMEOUT(), &request);
2654 if (tevent_req_nomem(subreq, req)) {
2657 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2660 static void recovery_end_recovery_done(struct tevent_req *subreq)
2662 struct tevent_req *req = tevent_req_callback_data(
2663 subreq, struct tevent_req);
2664 struct recovery_state *state = tevent_req_data(
2665 req, struct recovery_state);
2670 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2672 TALLOC_FREE(subreq);
2677 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2681 D_ERR("failed to run recovered event on node %u,"
2682 " ret=%d\n", pnn, ret2);
2684 D_ERR("failed to run recovered event, ret=%d\n", ret);
2686 tevent_req_error(req, ret);
2690 D_ERR("recovered event finished\n");
2692 tevent_req_done(req);
2695 static void recovery_recv(struct tevent_req *req, int *perr)
2697 generic_recv(req, perr);
2700 static void usage(const char *progname)
2702 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2708 * Arguments - log fd, write fd, socket path, generation
2710 int main(int argc, char *argv[])
2713 const char *sockpath;
2714 TALLOC_CTX *mem_ctx;
2715 struct tevent_context *ev;
2716 struct ctdb_client_context *client;
2718 struct tevent_req *req;
2719 uint32_t generation;
2726 write_fd = atoi(argv[1]);
2728 generation = (uint32_t)strtoul(argv[3], NULL, 0);
2730 mem_ctx = talloc_new(NULL);
2731 if (mem_ctx == NULL) {
2732 fprintf(stderr, "recovery: talloc_new() failed\n");
2736 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2738 fprintf(stderr, "recovery: Unable to initialize logging\n");
2742 ev = tevent_context_init(mem_ctx);
2744 D_ERR("tevent_context_init() failed\n");
2748 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2750 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2754 req = recovery_send(mem_ctx, ev, client, generation);
2756 D_ERR("database_recover_send() failed\n");
2760 if (! tevent_req_poll(req, ev)) {
2761 D_ERR("tevent_req_poll() failed\n");
2765 recovery_recv(req, &ret);
2768 D_ERR("database recovery failed, ret=%d\n", ret);
2772 sys_write(write_fd, &ret, sizeof(ret));
2776 TALLOC_FREE(mem_ctx);