2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/time.h"
31 #include "lib/util/tevent_unix.h"
33 #include "protocol/protocol.h"
34 #include "protocol/protocol_api.h"
35 #include "client/client.h"
37 static int recover_timeout = 120;
39 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
41 static void LOG(const char *fmt, ...)
46 vfprintf(stderr, fmt, ap);
54 static ssize_t sys_write(int fd, const void *buf, size_t count)
59 ret = write(fd, buf, count);
60 #if defined(EWOULDBLOCK)
61 } while (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
63 } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
68 static bool generic_recv(struct tevent_req *req, int *perr)
72 if (tevent_req_is_unix_error(req, &err)) {
82 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
84 static uint64_t srvid_next(void)
91 * Recovery database functions
94 struct recdb_context {
102 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
105 uint32_t hash_size, bool persistent)
107 static char *db_dir_state = NULL;
108 struct recdb_context *recdb;
109 unsigned int tdb_flags;
111 recdb = talloc(mem_ctx, struct recdb_context);
116 if (db_dir_state == NULL) {
117 db_dir_state = getenv("CTDB_DBDIR_STATE");
120 recdb->db_name = db_name;
121 recdb->db_id = db_id;
122 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
123 db_dir_state != NULL ?
125 dirname(discard_const(db_path)),
127 if (recdb->db_path == NULL) {
131 unlink(recdb->db_path);
133 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
134 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
135 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
136 if (recdb->db == NULL) {
138 LOG("failed to create recovery db %s\n", recdb->db_path);
141 recdb->persistent = persistent;
146 static uint32_t recdb_id(struct recdb_context *recdb)
151 static const char *recdb_name(struct recdb_context *recdb)
153 return recdb->db_name;
156 static const char *recdb_path(struct recdb_context *recdb)
158 return recdb->db_path;
161 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
163 return recdb->db->tdb;
166 static bool recdb_persistent(struct recdb_context *recdb)
168 return recdb->persistent;
171 struct recdb_add_traverse_state {
172 struct recdb_context *recdb;
176 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
177 TDB_DATA key, TDB_DATA data,
180 struct recdb_add_traverse_state *state =
181 (struct recdb_add_traverse_state *)private_data;
182 struct ctdb_ltdb_header *hdr;
186 /* header is not marshalled separately in the pulldb control */
187 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
191 hdr = (struct ctdb_ltdb_header *)data.dptr;
193 /* fetch the existing record, if any */
194 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
196 if (prev_data.dptr != NULL) {
197 struct ctdb_ltdb_header prev_hdr;
199 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
200 free(prev_data.dptr);
201 if (hdr->rsn < prev_hdr.rsn ||
202 (hdr->rsn == prev_hdr.rsn &&
203 prev_hdr.dmaster != state->mypnn)) {
208 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
215 static bool recdb_add(struct recdb_context *recdb, int mypnn,
216 struct ctdb_rec_buffer *recbuf)
218 struct recdb_add_traverse_state state;
224 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
232 /* This function decides which records from recdb are retained */
233 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
234 uint32_t reqid, uint32_t dmaster,
235 TDB_DATA key, TDB_DATA data)
237 struct ctdb_ltdb_header *header;
241 * skip empty records - but NOT for persistent databases:
243 * The record-by-record mode of recovery deletes empty records.
244 * For persistent databases, this can lead to data corruption
245 * by deleting records that should be there:
247 * - Assume the cluster has been running for a while.
249 * - A record R in a persistent database has been created and
250 * deleted a couple of times, the last operation being deletion,
251 * leaving an empty record with a high RSN, say 10.
253 * - Now a node N is turned off.
255 * - This leaves the local database copy of D on N with the empty
256 * copy of R and RSN 10. On all other nodes, the recovery has deleted
257 * the copy of record R.
259 * - Now the record is created again while node N is turned off.
260 * This creates R with RSN = 1 on all nodes except for N.
262 * - Now node N is turned on again. The following recovery will chose
263 * the older empty copy of R due to RSN 10 > RSN 1.
265 * ==> Hence the record is gone after the recovery.
267 * On databases like Samba's registry, this can damage the higher-level
268 * data structures built from the various tdb-level records.
270 if (!persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
274 /* update the dmaster field to point to us */
275 header = (struct ctdb_ltdb_header *)data.dptr;
277 header->dmaster = dmaster;
278 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
281 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
289 struct recdb_records_traverse_state {
290 struct ctdb_rec_buffer *recbuf;
297 static int recdb_records_traverse(struct tdb_context *tdb,
298 TDB_DATA key, TDB_DATA data,
301 struct recdb_records_traverse_state *state =
302 (struct recdb_records_traverse_state *)private_data;
305 ret = recbuf_filter_add(state->recbuf, state->persistent,
306 state->reqid, state->dmaster, key, data);
308 state->failed = true;
315 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
319 struct recdb_records_traverse_state state;
322 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
323 if (state.recbuf == NULL) {
326 state.dmaster = dmaster;
328 state.persistent = recdb_persistent(recdb);
329 state.failed = false;
331 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
333 if (ret == -1 || state.failed) {
334 LOG("Failed to marshall recovery records for %s\n",
336 TALLOC_FREE(state.recbuf);
343 struct recdb_file_traverse_state {
344 struct ctdb_rec_buffer *recbuf;
345 struct recdb_context *recdb;
356 static int recdb_file_traverse(struct tdb_context *tdb,
357 TDB_DATA key, TDB_DATA data,
360 struct recdb_file_traverse_state *state =
361 (struct recdb_file_traverse_state *)private_data;
364 ret = recbuf_filter_add(state->recbuf, state->persistent,
365 state->reqid, state->dmaster, key, data);
367 state->failed = true;
371 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
372 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
374 LOG("Failed to collect recovery records for %s\n",
375 recdb_name(state->recdb));
376 state->failed = true;
380 state->num_buffers += 1;
382 TALLOC_FREE(state->recbuf);
383 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
384 recdb_id(state->recdb));
385 if (state->recbuf == NULL) {
386 state->failed = true;
394 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
395 uint32_t dmaster, int fd, int max_size)
397 struct recdb_file_traverse_state state;
400 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
401 if (state.recbuf == NULL) {
405 state.mem_ctx = mem_ctx;
406 state.dmaster = dmaster;
408 state.persistent = recdb_persistent(recdb);
409 state.failed = false;
411 state.max_size = max_size;
412 state.num_buffers = 0;
414 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
415 if (ret == -1 || state.failed) {
416 TALLOC_FREE(state.recbuf);
420 ret = ctdb_rec_buffer_write(state.recbuf, fd);
422 LOG("Failed to collect recovery records for %s\n",
424 TALLOC_FREE(state.recbuf);
427 state.num_buffers += 1;
429 LOG("Wrote %d buffers of recovery records for %s\n",
430 state.num_buffers, recdb_name(recdb));
432 return state.num_buffers;
436 * Pull database from a single node
439 struct pull_database_state {
440 struct tevent_context *ev;
441 struct ctdb_client_context *client;
442 struct recdb_context *recdb;
448 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
450 static void pull_database_register_done(struct tevent_req *subreq);
451 static void pull_database_old_done(struct tevent_req *subreq);
452 static void pull_database_unregister_done(struct tevent_req *subreq);
453 static void pull_database_new_done(struct tevent_req *subreq);
455 static struct tevent_req *pull_database_send(
457 struct tevent_context *ev,
458 struct ctdb_client_context *client,
459 uint32_t pnn, uint32_t caps,
460 struct recdb_context *recdb)
462 struct tevent_req *req, *subreq;
463 struct pull_database_state *state;
464 struct ctdb_req_control request;
466 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
472 state->client = client;
473 state->recdb = recdb;
475 state->srvid = srvid_next();
477 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
478 subreq = ctdb_client_set_message_handler_send(
479 state, state->ev, state->client,
480 state->srvid, pull_database_handler,
482 if (tevent_req_nomem(subreq, req)) {
483 return tevent_req_post(req, ev);
486 tevent_req_set_callback(subreq, pull_database_register_done,
490 struct ctdb_pulldb pulldb;
492 pulldb.db_id = recdb_id(recdb);
493 pulldb.lmaster = CTDB_LMASTER_ANY;
495 ctdb_req_control_pull_db(&request, &pulldb);
496 subreq = ctdb_client_control_send(state, state->ev,
500 if (tevent_req_nomem(subreq, req)) {
501 return tevent_req_post(req, ev);
503 tevent_req_set_callback(subreq, pull_database_old_done, req);
509 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
512 struct tevent_req *req = talloc_get_type_abort(
513 private_data, struct tevent_req);
514 struct pull_database_state *state = tevent_req_data(
515 req, struct pull_database_state);
516 struct ctdb_rec_buffer *recbuf;
520 if (srvid != state->srvid) {
524 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
526 LOG("Invalid data received for DB_PULL messages\n");
530 if (recbuf->db_id != recdb_id(state->recdb)) {
532 LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
533 recbuf->db_id, recdb_name(state->recdb));
537 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
541 LOG("Failed to add records to recdb for %s\n",
542 recdb_name(state->recdb));
546 state->num_records += recbuf->count;
550 static void pull_database_register_done(struct tevent_req *subreq)
552 struct tevent_req *req = tevent_req_callback_data(
553 subreq, struct tevent_req);
554 struct pull_database_state *state = tevent_req_data(
555 req, struct pull_database_state);
556 struct ctdb_req_control request;
557 struct ctdb_pulldb_ext pulldb_ext;
561 status = ctdb_client_set_message_handler_recv(subreq, &ret);
564 LOG("failed to set message handler for DB_PULL for %s\n",
565 recdb_name(state->recdb));
566 tevent_req_error(req, ret);
570 pulldb_ext.db_id = recdb_id(state->recdb);
571 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
572 pulldb_ext.srvid = state->srvid;
574 ctdb_req_control_db_pull(&request, &pulldb_ext);
575 subreq = ctdb_client_control_send(state, state->ev, state->client,
576 state->pnn, TIMEOUT(), &request);
577 if (tevent_req_nomem(subreq, req)) {
580 tevent_req_set_callback(subreq, pull_database_new_done, req);
583 static void pull_database_old_done(struct tevent_req *subreq)
585 struct tevent_req *req = tevent_req_callback_data(
586 subreq, struct tevent_req);
587 struct pull_database_state *state = tevent_req_data(
588 req, struct pull_database_state);
589 struct ctdb_reply_control *reply;
590 struct ctdb_rec_buffer *recbuf;
594 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
597 LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
598 recdb_name(state->recdb), state->pnn, ret);
599 tevent_req_error(req, ret);
603 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
606 tevent_req_error(req, ret);
610 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
614 tevent_req_error(req, EIO);
618 state->num_records = recbuf->count;
621 LOG("Pulled %d records for db %s from node %d\n",
622 state->num_records, recdb_name(state->recdb), state->pnn);
624 tevent_req_done(req);
627 static void pull_database_new_done(struct tevent_req *subreq)
629 struct tevent_req *req = tevent_req_callback_data(
630 subreq, struct tevent_req);
631 struct pull_database_state *state = tevent_req_data(
632 req, struct pull_database_state);
633 struct ctdb_reply_control *reply;
634 uint32_t num_records;
638 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
641 LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
642 recdb_name(state->recdb), state->pnn, ret);
643 tevent_req_error(req, ret);
647 ret = ctdb_reply_control_db_pull(reply, &num_records);
649 if (num_records != state->num_records) {
650 LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
651 num_records, state->num_records, recdb_name(state->recdb));
652 tevent_req_error(req, EIO);
656 LOG("Pulled %d records for db %s from node %d\n",
657 state->num_records, recdb_name(state->recdb), state->pnn);
659 subreq = ctdb_client_remove_message_handler_send(
660 state, state->ev, state->client,
662 if (tevent_req_nomem(subreq, req)) {
665 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
668 static void pull_database_unregister_done(struct tevent_req *subreq)
670 struct tevent_req *req = tevent_req_callback_data(
671 subreq, struct tevent_req);
672 struct pull_database_state *state = tevent_req_data(
673 req, struct pull_database_state);
677 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
680 LOG("failed to remove message handler for DB_PULL for %s\n",
681 recdb_name(state->recdb));
682 tevent_req_error(req, ret);
686 tevent_req_done(req);
689 static bool pull_database_recv(struct tevent_req *req, int *perr)
691 return generic_recv(req, perr);
695 * Push database to specified nodes (old style)
698 struct push_database_old_state {
699 struct tevent_context *ev;
700 struct ctdb_client_context *client;
701 struct recdb_context *recdb;
704 struct ctdb_rec_buffer *recbuf;
708 static void push_database_old_push_done(struct tevent_req *subreq);
710 static struct tevent_req *push_database_old_send(
712 struct tevent_context *ev,
713 struct ctdb_client_context *client,
714 uint32_t *pnn_list, int count,
715 struct recdb_context *recdb)
717 struct tevent_req *req, *subreq;
718 struct push_database_old_state *state;
719 struct ctdb_req_control request;
722 req = tevent_req_create(mem_ctx, &state,
723 struct push_database_old_state);
729 state->client = client;
730 state->recdb = recdb;
731 state->pnn_list = pnn_list;
732 state->count = count;
735 state->recbuf = recdb_records(recdb, state,
736 ctdb_client_pnn(client));
737 if (tevent_req_nomem(state->recbuf, req)) {
738 return tevent_req_post(req, ev);
741 pnn = state->pnn_list[state->index];
743 ctdb_req_control_push_db(&request, state->recbuf);
744 subreq = ctdb_client_control_send(state, ev, client, pnn,
745 TIMEOUT(), &request);
746 if (tevent_req_nomem(subreq, req)) {
747 return tevent_req_post(req, ev);
749 tevent_req_set_callback(subreq, push_database_old_push_done, req);
754 static void push_database_old_push_done(struct tevent_req *subreq)
756 struct tevent_req *req = tevent_req_callback_data(
757 subreq, struct tevent_req);
758 struct push_database_old_state *state = tevent_req_data(
759 req, struct push_database_old_state);
760 struct ctdb_req_control request;
765 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
768 LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
769 recdb_name(state->recdb), state->pnn_list[state->index],
771 tevent_req_error(req, ret);
776 if (state->index == state->count) {
777 TALLOC_FREE(state->recbuf);
778 tevent_req_done(req);
782 pnn = state->pnn_list[state->index];
784 ctdb_req_control_push_db(&request, state->recbuf);
785 subreq = ctdb_client_control_send(state, state->ev, state->client,
786 pnn, TIMEOUT(), &request);
787 if (tevent_req_nomem(subreq, req)) {
790 tevent_req_set_callback(subreq, push_database_old_push_done, req);
793 static bool push_database_old_recv(struct tevent_req *req, int *perr)
795 return generic_recv(req, perr);
799 * Push database to specified nodes (new style)
802 struct push_database_new_state {
803 struct tevent_context *ev;
804 struct ctdb_client_context *client;
805 struct recdb_context *recdb;
812 int num_buffers_sent;
816 static void push_database_new_started(struct tevent_req *subreq);
817 static void push_database_new_send_msg(struct tevent_req *req);
818 static void push_database_new_send_done(struct tevent_req *subreq);
819 static void push_database_new_confirmed(struct tevent_req *subreq);
821 static struct tevent_req *push_database_new_send(
823 struct tevent_context *ev,
824 struct ctdb_client_context *client,
825 uint32_t *pnn_list, int count,
826 struct recdb_context *recdb,
829 struct tevent_req *req, *subreq;
830 struct push_database_new_state *state;
831 struct ctdb_req_control request;
832 struct ctdb_pulldb_ext pulldb_ext;
836 req = tevent_req_create(mem_ctx, &state,
837 struct push_database_new_state);
843 state->client = client;
844 state->recdb = recdb;
845 state->pnn_list = pnn_list;
846 state->count = count;
848 state->srvid = srvid_next();
849 state->dmaster = ctdb_client_pnn(client);
850 state->num_buffers_sent = 0;
851 state->num_records = 0;
853 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
854 if (tevent_req_nomem(filename, req)) {
855 return tevent_req_post(req, ev);
858 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
859 if (state->fd == -1) {
860 tevent_req_error(req, errno);
861 return tevent_req_post(req, ev);
864 talloc_free(filename);
866 state->num_buffers = recdb_file(recdb, state, state->dmaster,
867 state->fd, max_size);
868 if (state->num_buffers == -1) {
869 tevent_req_error(req, ENOMEM);
870 return tevent_req_post(req, ev);
873 offset = lseek(state->fd, 0, SEEK_SET);
875 tevent_req_error(req, EIO);
876 return tevent_req_post(req, ev);
879 pulldb_ext.db_id = recdb_id(recdb);
880 pulldb_ext.srvid = state->srvid;
882 ctdb_req_control_db_push_start(&request, &pulldb_ext);
883 subreq = ctdb_client_control_multi_send(state, ev, client,
885 TIMEOUT(), &request);
886 if (tevent_req_nomem(subreq, req)) {
887 return tevent_req_post(req, ev);
889 tevent_req_set_callback(subreq, push_database_new_started, req);
894 static void push_database_new_started(struct tevent_req *subreq)
896 struct tevent_req *req = tevent_req_callback_data(
897 subreq, struct tevent_req);
898 struct push_database_new_state *state = tevent_req_data(
899 req, struct push_database_new_state);
904 status = ctdb_client_control_multi_recv(subreq, &ret, state,
911 ret2 = ctdb_client_control_multi_error(state->pnn_list,
915 LOG("control DB_PUSH_START failed for db %s "
916 "on node %u, ret=%d\n",
917 recdb_name(state->recdb), pnn, ret2);
919 LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
920 recdb_name(state->recdb), ret);
922 talloc_free(err_list);
924 tevent_req_error(req, ret);
928 push_database_new_send_msg(req);
931 static void push_database_new_send_msg(struct tevent_req *req)
933 struct push_database_new_state *state = tevent_req_data(
934 req, struct push_database_new_state);
935 struct tevent_req *subreq;
936 struct ctdb_rec_buffer *recbuf;
937 struct ctdb_req_message message;
941 if (state->num_buffers_sent == state->num_buffers) {
942 struct ctdb_req_control request;
944 ctdb_req_control_db_push_confirm(&request,
945 recdb_id(state->recdb));
946 subreq = ctdb_client_control_multi_send(state, state->ev,
950 TIMEOUT(), &request);
951 if (tevent_req_nomem(subreq, req)) {
954 tevent_req_set_callback(subreq, push_database_new_confirmed,
959 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
961 tevent_req_error(req, ret);
965 data.dsize = ctdb_rec_buffer_len(recbuf);
966 data.dptr = talloc_size(state, data.dsize);
967 if (tevent_req_nomem(data.dptr, req)) {
971 ctdb_rec_buffer_push(recbuf, data.dptr);
973 message.srvid = state->srvid;
974 message.data.data = data;
976 LOG("Pushing buffer %d with %d records for %s\n",
977 state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
979 subreq = ctdb_client_message_multi_send(state, state->ev,
981 state->pnn_list, state->count,
983 if (tevent_req_nomem(subreq, req)) {
986 tevent_req_set_callback(subreq, push_database_new_send_done, req);
988 state->num_records += recbuf->count;
990 talloc_free(data.dptr);
994 static void push_database_new_send_done(struct tevent_req *subreq)
996 struct tevent_req *req = tevent_req_callback_data(
997 subreq, struct tevent_req);
998 struct push_database_new_state *state = tevent_req_data(
999 req, struct push_database_new_state);
1003 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1004 TALLOC_FREE(subreq);
1006 LOG("Sending recovery records failed for %s\n",
1007 recdb_name(state->recdb));
1008 tevent_req_error(req, ret);
1012 state->num_buffers_sent += 1;
1014 push_database_new_send_msg(req);
1017 static void push_database_new_confirmed(struct tevent_req *subreq)
1019 struct tevent_req *req = tevent_req_callback_data(
1020 subreq, struct tevent_req);
1021 struct push_database_new_state *state = tevent_req_data(
1022 req, struct push_database_new_state);
1023 struct ctdb_reply_control **reply;
1027 uint32_t num_records;
1029 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1031 TALLOC_FREE(subreq);
1036 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1037 state->count, err_list,
1040 LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
1041 " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1043 LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
1044 recdb_name(state->recdb), ret);
1046 tevent_req_error(req, ret);
1050 for (i=0; i<state->count; i++) {
1051 ret = ctdb_reply_control_db_push_confirm(reply[i],
1054 tevent_req_error(req, EPROTO);
1058 if (num_records != state->num_records) {
1059 LOG("Node %u received %d of %d records for %s\n",
1060 state->pnn_list[i], num_records,
1061 state->num_records, recdb_name(state->recdb));
1062 tevent_req_error(req, EPROTO);
1069 LOG("Pushed %d records for db %s\n",
1070 state->num_records, recdb_name(state->recdb));
1072 tevent_req_done(req);
1075 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1077 return generic_recv(req, perr);
1081 * wrapper for push_database_old and push_database_new
1084 struct push_database_state {
1085 bool old_done, new_done;
1088 static void push_database_old_done(struct tevent_req *subreq);
1089 static void push_database_new_done(struct tevent_req *subreq);
1091 static struct tevent_req *push_database_send(
1092 TALLOC_CTX *mem_ctx,
1093 struct tevent_context *ev,
1094 struct ctdb_client_context *client,
1095 uint32_t *pnn_list, int count, uint32_t *caps,
1096 struct ctdb_tunable_list *tun_list,
1097 struct recdb_context *recdb)
1099 struct tevent_req *req, *subreq;
1100 struct push_database_state *state;
1101 uint32_t *old_list, *new_list;
1102 int old_count, new_count;
1105 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1110 state->old_done = false;
1111 state->new_done = false;
1115 old_list = talloc_array(state, uint32_t, count);
1116 new_list = talloc_array(state, uint32_t, count);
1117 if (tevent_req_nomem(old_list, req) ||
1118 tevent_req_nomem(new_list,req)) {
1119 return tevent_req_post(req, ev);
1122 for (i=0; i<count; i++) {
1123 uint32_t pnn = pnn_list[i];
1125 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1126 new_list[new_count] = pnn;
1129 old_list[old_count] = pnn;
1134 if (old_count > 0) {
1135 subreq = push_database_old_send(state, ev, client,
1136 old_list, old_count, recdb);
1137 if (tevent_req_nomem(subreq, req)) {
1138 return tevent_req_post(req, ev);
1140 tevent_req_set_callback(subreq, push_database_old_done, req);
1142 state->old_done = true;
1145 if (new_count > 0) {
1146 subreq = push_database_new_send(state, ev, client,
1147 new_list, new_count, recdb,
1148 tun_list->rec_buffer_size_limit);
1149 if (tevent_req_nomem(subreq, req)) {
1150 return tevent_req_post(req, ev);
1152 tevent_req_set_callback(subreq, push_database_new_done, req);
1154 state->new_done = true;
1160 static void push_database_old_done(struct tevent_req *subreq)
1162 struct tevent_req *req = tevent_req_callback_data(
1163 subreq, struct tevent_req);
1164 struct push_database_state *state = tevent_req_data(
1165 req, struct push_database_state);
1169 status = push_database_old_recv(subreq, &ret);
1171 tevent_req_error(req, ret);
1175 state->old_done = true;
1177 if (state->old_done && state->new_done) {
1178 tevent_req_done(req);
1182 static void push_database_new_done(struct tevent_req *subreq)
1184 struct tevent_req *req = tevent_req_callback_data(
1185 subreq, struct tevent_req);
1186 struct push_database_state *state = tevent_req_data(
1187 req, struct push_database_state);
1191 status = push_database_new_recv(subreq, &ret);
1193 tevent_req_error(req, ret);
1197 state->new_done = true;
1199 if (state->old_done && state->new_done) {
1200 tevent_req_done(req);
1204 static bool push_database_recv(struct tevent_req *req, int *perr)
1206 return generic_recv(req, perr);
1210 * Collect databases using highest sequence number
1213 struct collect_highseqnum_db_state {
1214 struct tevent_context *ev;
1215 struct ctdb_client_context *client;
1220 struct recdb_context *recdb;
1224 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1225 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1227 static struct tevent_req *collect_highseqnum_db_send(
1228 TALLOC_CTX *mem_ctx,
1229 struct tevent_context *ev,
1230 struct ctdb_client_context *client,
1231 uint32_t *pnn_list, int count, uint32_t *caps,
1232 uint32_t db_id, struct recdb_context *recdb)
1234 struct tevent_req *req, *subreq;
1235 struct collect_highseqnum_db_state *state;
1236 struct ctdb_req_control request;
1238 req = tevent_req_create(mem_ctx, &state,
1239 struct collect_highseqnum_db_state);
1245 state->client = client;
1246 state->pnn_list = pnn_list;
1247 state->count = count;
1249 state->db_id = db_id;
1250 state->recdb = recdb;
1252 ctdb_req_control_get_db_seqnum(&request, db_id);
1253 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1254 state->pnn_list, state->count,
1255 TIMEOUT(), &request);
1256 if (tevent_req_nomem(subreq, req)) {
1257 return tevent_req_post(req, ev);
1259 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1265 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1267 struct tevent_req *req = tevent_req_callback_data(
1268 subreq, struct tevent_req);
1269 struct collect_highseqnum_db_state *state = tevent_req_data(
1270 req, struct collect_highseqnum_db_state);
1271 struct ctdb_reply_control **reply;
1275 uint64_t seqnum, max_seqnum;
1277 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1279 TALLOC_FREE(subreq);
1284 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1285 state->count, err_list,
1288 LOG("control GET_DB_SEQNUM failed for %s on node %u,"
1289 " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1291 LOG("control GET_DB_SEQNUM failed for %s, ret=%d\n",
1292 recdb_name(state->recdb), ret);
1294 tevent_req_error(req, ret);
1299 state->max_pnn = state->pnn_list[0];
1300 for (i=0; i<state->count; i++) {
1301 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1303 tevent_req_error(req, EPROTO);
1307 if (max_seqnum < seqnum) {
1308 max_seqnum = seqnum;
1309 state->max_pnn = state->pnn_list[i];
1315 LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1316 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1318 subreq = pull_database_send(state, state->ev, state->client,
1320 state->caps[state->max_pnn],
1322 if (tevent_req_nomem(subreq, req)) {
1325 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1329 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1331 struct tevent_req *req = tevent_req_callback_data(
1332 subreq, struct tevent_req);
1336 status = pull_database_recv(subreq, &ret);
1337 TALLOC_FREE(subreq);
1339 tevent_req_error(req, ret);
1343 tevent_req_done(req);
1346 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1348 return generic_recv(req, perr);
1352 * Collect all databases
1355 struct collect_all_db_state {
1356 struct tevent_context *ev;
1357 struct ctdb_client_context *client;
1362 struct recdb_context *recdb;
1363 struct ctdb_pulldb pulldb;
1367 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1369 static struct tevent_req *collect_all_db_send(
1370 TALLOC_CTX *mem_ctx,
1371 struct tevent_context *ev,
1372 struct ctdb_client_context *client,
1373 uint32_t *pnn_list, int count, uint32_t *caps,
1374 uint32_t db_id, struct recdb_context *recdb)
1376 struct tevent_req *req, *subreq;
1377 struct collect_all_db_state *state;
1380 req = tevent_req_create(mem_ctx, &state,
1381 struct collect_all_db_state);
1387 state->client = client;
1388 state->pnn_list = pnn_list;
1389 state->count = count;
1391 state->db_id = db_id;
1392 state->recdb = recdb;
1395 pnn = state->pnn_list[state->index];
1397 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1398 if (tevent_req_nomem(subreq, req)) {
1399 return tevent_req_post(req, ev);
1401 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1406 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1408 struct tevent_req *req = tevent_req_callback_data(
1409 subreq, struct tevent_req);
1410 struct collect_all_db_state *state = tevent_req_data(
1411 req, struct collect_all_db_state);
1416 status = pull_database_recv(subreq, &ret);
1417 TALLOC_FREE(subreq);
1419 tevent_req_error(req, ret);
1424 if (state->index == state->count) {
1425 tevent_req_done(req);
1429 pnn = state->pnn_list[state->index];
1430 subreq = pull_database_send(state, state->ev, state->client,
1431 pnn, state->caps[pnn], state->recdb);
1432 if (tevent_req_nomem(subreq, req)) {
1435 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1438 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1440 return generic_recv(req, perr);
1445 * For each database do the following:
1448 * - Freeze database on all nodes
1449 * - Start transaction on all nodes
1450 * - Collect database from all nodes
1451 * - Wipe database on all nodes
1452 * - Push database to all nodes
1453 * - Commit transaction on all nodes
1454 * - Thaw database on all nodes
1457 struct recover_db_state {
1458 struct tevent_context *ev;
1459 struct ctdb_client_context *client;
1460 struct ctdb_tunable_list *tun_list;
1468 struct ctdb_transdb transdb;
1470 const char *db_name, *db_path;
1471 struct recdb_context *recdb;
1474 static void recover_db_name_done(struct tevent_req *subreq);
1475 static void recover_db_path_done(struct tevent_req *subreq);
1476 static void recover_db_freeze_done(struct tevent_req *subreq);
1477 static void recover_db_transaction_started(struct tevent_req *subreq);
1478 static void recover_db_collect_done(struct tevent_req *subreq);
1479 static void recover_db_wipedb_done(struct tevent_req *subreq);
1480 static void recover_db_pushdb_done(struct tevent_req *subreq);
1481 static void recover_db_transaction_committed(struct tevent_req *subreq);
1482 static void recover_db_thaw_done(struct tevent_req *subreq);
1484 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1485 struct tevent_context *ev,
1486 struct ctdb_client_context *client,
1487 struct ctdb_tunable_list *tun_list,
1488 uint32_t *pnn_list, int count,
1490 uint32_t generation,
1491 uint32_t db_id, bool persistent)
1493 struct tevent_req *req, *subreq;
1494 struct recover_db_state *state;
1495 struct ctdb_req_control request;
1497 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1503 state->client = client;
1504 state->tun_list = tun_list;
1505 state->pnn_list = pnn_list;
1506 state->count = count;
1508 state->db_id = db_id;
1509 state->persistent = persistent;
1511 state->destnode = ctdb_client_pnn(client);
1512 state->transdb.db_id = db_id;
1513 state->transdb.tid = generation;
1515 ctdb_req_control_get_dbname(&request, db_id);
1516 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1517 TIMEOUT(), &request);
1518 if (tevent_req_nomem(subreq, req)) {
1519 return tevent_req_post(req, ev);
1521 tevent_req_set_callback(subreq, recover_db_name_done, req);
1526 static void recover_db_name_done(struct tevent_req *subreq)
1528 struct tevent_req *req = tevent_req_callback_data(
1529 subreq, struct tevent_req);
1530 struct recover_db_state *state = tevent_req_data(
1531 req, struct recover_db_state);
1532 struct ctdb_reply_control *reply;
1533 struct ctdb_req_control request;
1537 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1538 TALLOC_FREE(subreq);
1540 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1542 tevent_req_error(req, ret);
1546 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1548 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1550 tevent_req_error(req, EPROTO);
1556 ctdb_req_control_getdbpath(&request, state->db_id);
1557 subreq = ctdb_client_control_send(state, state->ev, state->client,
1558 state->destnode, TIMEOUT(),
1560 if (tevent_req_nomem(subreq, req)) {
1563 tevent_req_set_callback(subreq, recover_db_path_done, req);
1566 static void recover_db_path_done(struct tevent_req *subreq)
1568 struct tevent_req *req = tevent_req_callback_data(
1569 subreq, struct tevent_req);
1570 struct recover_db_state *state = tevent_req_data(
1571 req, struct recover_db_state);
1572 struct ctdb_reply_control *reply;
1573 struct ctdb_req_control request;
1577 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1578 TALLOC_FREE(subreq);
1580 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1581 state->db_name, ret);
1582 tevent_req_error(req, ret);
1586 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1588 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1589 state->db_name, ret);
1590 tevent_req_error(req, EPROTO);
1596 ctdb_req_control_db_freeze(&request, state->db_id);
1597 subreq = ctdb_client_control_multi_send(state, state->ev,
1599 state->pnn_list, state->count,
1600 TIMEOUT(), &request);
1601 if (tevent_req_nomem(subreq, req)) {
1604 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1607 static void recover_db_freeze_done(struct tevent_req *subreq)
1609 struct tevent_req *req = tevent_req_callback_data(
1610 subreq, struct tevent_req);
1611 struct recover_db_state *state = tevent_req_data(
1612 req, struct recover_db_state);
1613 struct ctdb_req_control request;
1618 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1620 TALLOC_FREE(subreq);
1625 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1626 state->count, err_list,
1629 LOG("control FREEZE_DB failed for db %s on node %u,"
1630 " ret=%d\n", state->db_name, pnn, ret2);
1632 LOG("control FREEZE_DB failed for db %s, ret=%d\n",
1633 state->db_name, ret);
1635 tevent_req_error(req, ret);
1639 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1640 subreq = ctdb_client_control_multi_send(state, state->ev,
1642 state->pnn_list, state->count,
1643 TIMEOUT(), &request);
1644 if (tevent_req_nomem(subreq, req)) {
1647 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1650 static void recover_db_transaction_started(struct tevent_req *subreq)
1652 struct tevent_req *req = tevent_req_callback_data(
1653 subreq, struct tevent_req);
1654 struct recover_db_state *state = tevent_req_data(
1655 req, struct recover_db_state);
1660 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1662 TALLOC_FREE(subreq);
1667 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1671 LOG("control TRANSACTION_DB failed for db=%s,"
1672 " ret=%d\n", state->db_name, pnn, ret2);
1674 LOG("control TRANSACTION_DB failed for db=%s,"
1675 " ret=%d\n", state->db_name, ret);
1677 tevent_req_error(req, ret);
1681 state->recdb = recdb_create(state, state->db_id, state->db_name,
1683 state->tun_list->database_hash_size,
1685 if (tevent_req_nomem(state->recdb, req)) {
1689 if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1690 subreq = collect_highseqnum_db_send(
1691 state, state->ev, state->client,
1692 state->pnn_list, state->count, state->caps,
1693 state->db_id, state->recdb);
1695 subreq = collect_all_db_send(
1696 state, state->ev, state->client,
1697 state->pnn_list, state->count, state->caps,
1698 state->db_id, state->recdb);
1700 if (tevent_req_nomem(subreq, req)) {
1703 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1706 static void recover_db_collect_done(struct tevent_req *subreq)
1708 struct tevent_req *req = tevent_req_callback_data(
1709 subreq, struct tevent_req);
1710 struct recover_db_state *state = tevent_req_data(
1711 req, struct recover_db_state);
1712 struct ctdb_req_control request;
1716 if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1717 status = collect_highseqnum_db_recv(subreq, &ret);
1719 status = collect_all_db_recv(subreq, &ret);
1721 TALLOC_FREE(subreq);
1723 tevent_req_error(req, ret);
1727 ctdb_req_control_wipe_database(&request, &state->transdb);
1728 subreq = ctdb_client_control_multi_send(state, state->ev,
1730 state->pnn_list, state->count,
1731 TIMEOUT(), &request);
1732 if (tevent_req_nomem(subreq, req)) {
1735 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1738 static void recover_db_wipedb_done(struct tevent_req *subreq)
1740 struct tevent_req *req = tevent_req_callback_data(
1741 subreq, struct tevent_req);
1742 struct recover_db_state *state = tevent_req_data(
1743 req, struct recover_db_state);
1748 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1750 TALLOC_FREE(subreq);
1755 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1759 LOG("control WIPEDB failed for db %s on node %u,"
1760 " ret=%d\n", state->db_name, pnn, ret2);
1762 LOG("control WIPEDB failed for db %s, ret=%d\n",
1763 state->db_name, pnn, ret);
1765 tevent_req_error(req, ret);
1769 subreq = push_database_send(state, state->ev, state->client,
1770 state->pnn_list, state->count,
1771 state->caps, state->tun_list,
1773 if (tevent_req_nomem(subreq, req)) {
1776 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1779 static void recover_db_pushdb_done(struct tevent_req *subreq)
1781 struct tevent_req *req = tevent_req_callback_data(
1782 subreq, struct tevent_req);
1783 struct recover_db_state *state = tevent_req_data(
1784 req, struct recover_db_state);
1785 struct ctdb_req_control request;
1789 status = push_database_recv(subreq, &ret);
1790 TALLOC_FREE(subreq);
1792 tevent_req_error(req, ret);
1796 TALLOC_FREE(state->recdb);
1798 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1799 subreq = ctdb_client_control_multi_send(state, state->ev,
1801 state->pnn_list, state->count,
1802 TIMEOUT(), &request);
1803 if (tevent_req_nomem(subreq, req)) {
1806 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1809 static void recover_db_transaction_committed(struct tevent_req *subreq)
1811 struct tevent_req *req = tevent_req_callback_data(
1812 subreq, struct tevent_req);
1813 struct recover_db_state *state = tevent_req_data(
1814 req, struct recover_db_state);
1815 struct ctdb_req_control request;
1820 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1822 TALLOC_FREE(subreq);
1827 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1831 LOG("control DB_TRANSACTION_COMMIT failed for db %s"
1832 " on node %u, ret=%d\n", state->db_name, pnn, ret2);
1834 LOG("control DB_TRANSACTION_COMMIT failed for db %s,"
1835 " ret=%d\n", state->db_name, ret);
1837 tevent_req_error(req, ret);
1841 ctdb_req_control_db_thaw(&request, state->db_id);
1842 subreq = ctdb_client_control_multi_send(state, state->ev,
1844 state->pnn_list, state->count,
1845 TIMEOUT(), &request);
1846 if (tevent_req_nomem(subreq, req)) {
1849 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1852 static void recover_db_thaw_done(struct tevent_req *subreq)
1854 struct tevent_req *req = tevent_req_callback_data(
1855 subreq, struct tevent_req);
1856 struct recover_db_state *state = tevent_req_data(
1857 req, struct recover_db_state);
1862 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1864 TALLOC_FREE(subreq);
1869 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1873 LOG("control DB_THAW failed for db %s on node %u,"
1874 " ret=%d\n", state->db_name, pnn, ret2);
1876 LOG("control DB_THAW failed for db %s, ret=%d\n",
1877 state->db_name, ret);
1879 tevent_req_error(req, ret);
1883 tevent_req_done(req);
1886 static bool recover_db_recv(struct tevent_req *req)
1888 return generic_recv(req, NULL);
1893 * Start database recovery for each database
1895 * Try to recover each database 5 times before failing recovery.
1898 struct db_recovery_state {
1899 struct tevent_context *ev;
1900 struct ctdb_dbid_map *dbmap;
1905 struct db_recovery_one_state {
1906 struct tevent_req *req;
1907 struct ctdb_client_context *client;
1908 struct ctdb_dbid_map *dbmap;
1909 struct ctdb_tunable_list *tun_list;
1913 uint32_t generation;
1919 static void db_recovery_one_done(struct tevent_req *subreq);
1921 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1922 struct tevent_context *ev,
1923 struct ctdb_client_context *client,
1924 struct ctdb_dbid_map *dbmap,
1925 struct ctdb_tunable_list *tun_list,
1926 uint32_t *pnn_list, int count,
1928 uint32_t generation)
1930 struct tevent_req *req, *subreq;
1931 struct db_recovery_state *state;
1934 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1940 state->dbmap = dbmap;
1941 state->num_replies = 0;
1942 state->num_failed = 0;
1944 if (dbmap->num == 0) {
1945 tevent_req_done(req);
1946 return tevent_req_post(req, ev);
1949 for (i=0; i<dbmap->num; i++) {
1950 struct db_recovery_one_state *substate;
1952 substate = talloc_zero(state, struct db_recovery_one_state);
1953 if (tevent_req_nomem(substate, req)) {
1954 return tevent_req_post(req, ev);
1957 substate->req = req;
1958 substate->client = client;
1959 substate->dbmap = dbmap;
1960 substate->tun_list = tun_list;
1961 substate->pnn_list = pnn_list;
1962 substate->count = count;
1963 substate->caps = caps;
1964 substate->generation = generation;
1965 substate->db_id = dbmap->dbs[i].db_id;
1966 substate->persistent = dbmap->dbs[i].flags &
1967 CTDB_DB_FLAGS_PERSISTENT;
1969 subreq = recover_db_send(state, ev, client, tun_list,
1970 pnn_list, count, caps,
1971 generation, substate->db_id,
1972 substate->persistent);
1973 if (tevent_req_nomem(subreq, req)) {
1974 return tevent_req_post(req, ev);
1976 tevent_req_set_callback(subreq, db_recovery_one_done,
1978 LOG("recover database 0x%08x\n", substate->db_id);
1984 static void db_recovery_one_done(struct tevent_req *subreq)
1986 struct db_recovery_one_state *substate = tevent_req_callback_data(
1987 subreq, struct db_recovery_one_state);
1988 struct tevent_req *req = substate->req;
1989 struct db_recovery_state *state = tevent_req_data(
1990 req, struct db_recovery_state);
1993 status = recover_db_recv(subreq);
1994 TALLOC_FREE(subreq);
1997 talloc_free(substate);
2001 substate->num_fails += 1;
2002 if (substate->num_fails < 5) {
2003 subreq = recover_db_send(state, state->ev, substate->client,
2005 substate->pnn_list, substate->count,
2007 substate->generation, substate->db_id,
2008 substate->persistent);
2009 if (tevent_req_nomem(subreq, req)) {
2012 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2013 LOG("recover database 0x%08x, attempt %d\n", substate->db_id,
2014 substate->num_fails+1);
2019 state->num_failed += 1;
2022 state->num_replies += 1;
2024 if (state->num_replies == state->dbmap->num) {
2025 tevent_req_done(req);
2029 static bool db_recovery_recv(struct tevent_req *req, int *count)
2031 struct db_recovery_state *state = tevent_req_data(
2032 req, struct db_recovery_state);
2035 if (tevent_req_is_unix_error(req, &err)) {
2040 *count = state->num_replies - state->num_failed;
2042 if (state->num_failed > 0) {
2051 * Run the parallel database recovery
2056 * - Get capabilities from all nodes
2058 * - Set RECOVERY_ACTIVE
2059 * - Send START_RECOVERY
2060 * - Update vnnmap on all nodes
2061 * - Run database recovery
2062 * - Send END_RECOVERY
2063 * - Set RECOVERY_NORMAL
2066 struct recovery_state {
2067 struct tevent_context *ev;
2068 struct ctdb_client_context *client;
2069 uint32_t generation;
2073 struct ctdb_node_map *nodemap;
2075 struct ctdb_tunable_list *tun_list;
2076 struct ctdb_vnn_map *vnnmap;
2077 struct ctdb_dbid_map *dbmap;
2080 static void recovery_tunables_done(struct tevent_req *subreq);
2081 static void recovery_nodemap_done(struct tevent_req *subreq);
2082 static void recovery_vnnmap_done(struct tevent_req *subreq);
2083 static void recovery_capabilities_done(struct tevent_req *subreq);
2084 static void recovery_dbmap_done(struct tevent_req *subreq);
2085 static void recovery_active_done(struct tevent_req *subreq);
2086 static void recovery_start_recovery_done(struct tevent_req *subreq);
2087 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2088 static void recovery_db_recovery_done(struct tevent_req *subreq);
2089 static void recovery_normal_done(struct tevent_req *subreq);
2090 static void recovery_end_recovery_done(struct tevent_req *subreq);
2092 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2093 struct tevent_context *ev,
2094 struct ctdb_client_context *client,
2095 uint32_t generation)
2097 struct tevent_req *req, *subreq;
2098 struct recovery_state *state;
2099 struct ctdb_req_control request;
2101 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2107 state->client = client;
2108 state->generation = generation;
2109 state->destnode = ctdb_client_pnn(client);
2111 ctdb_req_control_get_all_tunables(&request);
2112 subreq = ctdb_client_control_send(state, state->ev, state->client,
2113 state->destnode, TIMEOUT(),
2115 if (tevent_req_nomem(subreq, req)) {
2116 return tevent_req_post(req, ev);
2118 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2123 static void recovery_tunables_done(struct tevent_req *subreq)
2125 struct tevent_req *req = tevent_req_callback_data(
2126 subreq, struct tevent_req);
2127 struct recovery_state *state = tevent_req_data(
2128 req, struct recovery_state);
2129 struct ctdb_reply_control *reply;
2130 struct ctdb_req_control request;
2134 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2135 TALLOC_FREE(subreq);
2137 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2138 tevent_req_error(req, ret);
2142 ret = ctdb_reply_control_get_all_tunables(reply, state,
2145 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2146 tevent_req_error(req, EPROTO);
2152 recover_timeout = state->tun_list->recover_timeout;
2154 ctdb_req_control_get_nodemap(&request);
2155 subreq = ctdb_client_control_send(state, state->ev, state->client,
2156 state->destnode, TIMEOUT(),
2158 if (tevent_req_nomem(subreq, req)) {
2161 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2164 static void recovery_nodemap_done(struct tevent_req *subreq)
2166 struct tevent_req *req = tevent_req_callback_data(
2167 subreq, struct tevent_req);
2168 struct recovery_state *state = tevent_req_data(
2169 req, struct recovery_state);
2170 struct ctdb_reply_control *reply;
2171 struct ctdb_req_control request;
2175 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2176 TALLOC_FREE(subreq);
2178 LOG("control GET_NODEMAP failed to node %u, ret=%d\n",
2179 state->destnode, ret);
2180 tevent_req_error(req, ret);
2184 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2186 LOG("control GET_NODEMAP failed, ret=%d\n", ret);
2187 tevent_req_error(req, ret);
2191 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2192 state, &state->pnn_list);
2193 if (state->count <= 0) {
2194 tevent_req_error(req, ENOMEM);
2198 ctdb_req_control_getvnnmap(&request);
2199 subreq = ctdb_client_control_send(state, state->ev, state->client,
2200 state->destnode, TIMEOUT(),
2202 if (tevent_req_nomem(subreq, req)) {
2205 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2208 static void recovery_vnnmap_done(struct tevent_req *subreq)
2210 struct tevent_req *req = tevent_req_callback_data(
2211 subreq, struct tevent_req);
2212 struct recovery_state *state = tevent_req_data(
2213 req, struct recovery_state);
2214 struct ctdb_reply_control *reply;
2215 struct ctdb_req_control request;
2219 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2220 TALLOC_FREE(subreq);
2222 LOG("control GETVNNMAP failed to node %u, ret=%d\n",
2223 state->destnode, ret);
2224 tevent_req_error(req, ret);
2228 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2230 LOG("control GETVNNMAP failed, ret=%d\n", ret);
2231 tevent_req_error(req, ret);
2235 ctdb_req_control_get_capabilities(&request);
2236 subreq = ctdb_client_control_multi_send(state, state->ev,
2238 state->pnn_list, state->count,
2239 TIMEOUT(), &request);
2240 if (tevent_req_nomem(subreq, req)) {
2243 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2246 static void recovery_capabilities_done(struct tevent_req *subreq)
2248 struct tevent_req *req = tevent_req_callback_data(
2249 subreq, struct tevent_req);
2250 struct recovery_state *state = tevent_req_data(
2251 req, struct recovery_state);
2252 struct ctdb_reply_control **reply;
2253 struct ctdb_req_control request;
2258 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2260 TALLOC_FREE(subreq);
2265 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2269 LOG("control GET_CAPABILITIES failed on node %u,"
2270 " ret=%d\n", pnn, ret2);
2272 LOG("control GET_CAPABILITIES failed, ret=%d\n", ret);
2274 tevent_req_error(req, ret);
2278 /* Make the array size same as nodemap */
2279 state->caps = talloc_zero_array(state, uint32_t,
2280 state->nodemap->num);
2281 if (tevent_req_nomem(state->caps, req)) {
2285 for (i=0; i<state->count; i++) {
2288 pnn = state->pnn_list[i];
2289 ret = ctdb_reply_control_get_capabilities(reply[i],
2292 LOG("control GET_CAPABILITIES failed on node %u\n", pnn);
2293 tevent_req_error(req, EPROTO);
2300 ctdb_req_control_get_dbmap(&request);
2301 subreq = ctdb_client_control_send(state, state->ev, state->client,
2302 state->destnode, TIMEOUT(),
2304 if (tevent_req_nomem(subreq, req)) {
2307 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2310 static void recovery_dbmap_done(struct tevent_req *subreq)
2312 struct tevent_req *req = tevent_req_callback_data(
2313 subreq, struct tevent_req);
2314 struct recovery_state *state = tevent_req_data(
2315 req, struct recovery_state);
2316 struct ctdb_reply_control *reply;
2317 struct ctdb_req_control request;
2321 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2322 TALLOC_FREE(subreq);
2324 LOG("control GET_DBMAP failed to node %u, ret=%d\n",
2325 state->destnode, ret);
2326 tevent_req_error(req, ret);
2330 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2332 LOG("control GET_DBMAP failed, ret=%d\n", ret);
2333 tevent_req_error(req, ret);
2337 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2338 subreq = ctdb_client_control_multi_send(state, state->ev,
2340 state->pnn_list, state->count,
2341 TIMEOUT(), &request);
2342 if (tevent_req_nomem(subreq, req)) {
2345 tevent_req_set_callback(subreq, recovery_active_done, req);
2348 static void recovery_active_done(struct tevent_req *subreq)
2350 struct tevent_req *req = tevent_req_callback_data(
2351 subreq, struct tevent_req);
2352 struct recovery_state *state = tevent_req_data(
2353 req, struct recovery_state);
2354 struct ctdb_req_control request;
2355 struct ctdb_vnn_map *vnnmap;
2360 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2362 TALLOC_FREE(subreq);
2367 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2371 LOG("failed to set recovery mode to ACTIVE on node %u,"
2372 " ret=%d\n", pnn, ret2);
2374 LOG("failed to set recovery mode to ACTIVE, ret=%d\n",
2377 tevent_req_error(req, ret);
2381 LOG("set recovery mode to ACTIVE\n");
2383 /* Calculate new VNNMAP */
2385 for (i=0; i<state->nodemap->num; i++) {
2386 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2389 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2396 LOG("no active lmasters found. Adding recmaster anyway\n");
2399 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2400 if (tevent_req_nomem(vnnmap, req)) {
2404 vnnmap->size = (count == 0 ? 1 : count);
2405 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2406 if (tevent_req_nomem(vnnmap->map, req)) {
2411 vnnmap->map[0] = state->destnode;
2414 for (i=0; i<state->nodemap->num; i++) {
2415 if (state->nodemap->node[i].flags &
2416 NODE_FLAGS_INACTIVE) {
2419 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2423 vnnmap->map[count] = state->nodemap->node[i].pnn;
2428 vnnmap->generation = state->generation;
2430 talloc_free(state->vnnmap);
2431 state->vnnmap = vnnmap;
2433 ctdb_req_control_start_recovery(&request);
2434 subreq = ctdb_client_control_multi_send(state, state->ev,
2436 state->pnn_list, state->count,
2437 TIMEOUT(), &request);
2438 if (tevent_req_nomem(subreq, req)) {
2441 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2444 static void recovery_start_recovery_done(struct tevent_req *subreq)
2446 struct tevent_req *req = tevent_req_callback_data(
2447 subreq, struct tevent_req);
2448 struct recovery_state *state = tevent_req_data(
2449 req, struct recovery_state);
2450 struct ctdb_req_control request;
2455 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2457 TALLOC_FREE(subreq);
2462 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2466 LOG("failed to run start_recovery event on node %u,"
2467 " ret=%d\n", pnn, ret2);
2469 LOG("failed to run start_recovery event, ret=%d\n",
2472 tevent_req_error(req, ret);
2476 LOG("start_recovery event finished\n");
2478 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2479 subreq = ctdb_client_control_multi_send(state, state->ev,
2481 state->pnn_list, state->count,
2482 TIMEOUT(), &request);
2483 if (tevent_req_nomem(subreq, req)) {
2486 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2489 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2491 struct tevent_req *req = tevent_req_callback_data(
2492 subreq, struct tevent_req);
2493 struct recovery_state *state = tevent_req_data(
2494 req, struct recovery_state);
2499 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2501 TALLOC_FREE(subreq);
2506 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2510 LOG("failed to update VNNMAP on node %u, ret=%d\n",
2513 LOG("failed to update VNNMAP, ret=%d\n", ret);
2515 tevent_req_error(req, ret);
2519 LOG("updated VNNMAP\n");
2521 subreq = db_recovery_send(state, state->ev, state->client,
2522 state->dbmap, state->tun_list,
2523 state->pnn_list, state->count,
2524 state->caps, state->vnnmap->generation);
2525 if (tevent_req_nomem(subreq, req)) {
2528 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2531 static void recovery_db_recovery_done(struct tevent_req *subreq)
2533 struct tevent_req *req = tevent_req_callback_data(
2534 subreq, struct tevent_req);
2535 struct recovery_state *state = tevent_req_data(
2536 req, struct recovery_state);
2537 struct ctdb_req_control request;
2541 status = db_recovery_recv(subreq, &count);
2542 TALLOC_FREE(subreq);
2544 LOG("%d databases recovered\n", count);
2547 tevent_req_error(req, EIO);
2551 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2552 subreq = ctdb_client_control_multi_send(state, state->ev,
2554 state->pnn_list, state->count,
2555 TIMEOUT(), &request);
2556 if (tevent_req_nomem(subreq, req)) {
2559 tevent_req_set_callback(subreq, recovery_normal_done, req);
2562 static void recovery_normal_done(struct tevent_req *subreq)
2564 struct tevent_req *req = tevent_req_callback_data(
2565 subreq, struct tevent_req);
2566 struct recovery_state *state = tevent_req_data(
2567 req, struct recovery_state);
2568 struct ctdb_req_control request;
2573 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2575 TALLOC_FREE(subreq);
2580 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2584 LOG("failed to set recovery mode to NORMAL on node %u,"
2585 " ret=%d\n", pnn, ret2);
2587 LOG("failed to set recovery mode to NORMAL, ret=%d\n",
2590 tevent_req_error(req, ret);
2594 LOG("set recovery mode to NORMAL\n");
2596 ctdb_req_control_end_recovery(&request);
2597 subreq = ctdb_client_control_multi_send(state, state->ev,
2599 state->pnn_list, state->count,
2600 TIMEOUT(), &request);
2601 if (tevent_req_nomem(subreq, req)) {
2604 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2607 static void recovery_end_recovery_done(struct tevent_req *subreq)
2609 struct tevent_req *req = tevent_req_callback_data(
2610 subreq, struct tevent_req);
2611 struct recovery_state *state = tevent_req_data(
2612 req, struct recovery_state);
2617 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2619 TALLOC_FREE(subreq);
2624 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2628 LOG("failed to run recovered event on node %u,"
2629 " ret=%d\n", pnn, ret2);
2631 LOG("failed to run recovered event, ret=%d\n", ret);
2633 tevent_req_error(req, ret);
2637 LOG("recovered event finished\n");
2639 tevent_req_done(req);
2642 static void recovery_recv(struct tevent_req *req, int *perr)
2644 generic_recv(req, perr);
2647 static void usage(const char *progname)
2649 fprintf(stderr, "\nUsage: %s <log-fd> <output-fd> <ctdb-socket-path> <generation>\n",
2655 * Arguments - log fd, write fd, socket path, generation
2657 int main(int argc, char *argv[])
2659 int log_fd, write_fd;
2660 const char *sockpath;
2661 TALLOC_CTX *mem_ctx;
2662 struct tevent_context *ev;
2663 struct ctdb_client_context *client;
2665 struct tevent_req *req;
2666 uint32_t generation;
2673 log_fd = atoi(argv[1]);
2674 if (log_fd != STDOUT_FILENO && log_fd != STDERR_FILENO) {
2675 close(STDOUT_FILENO);
2676 close(STDERR_FILENO);
2677 dup2(log_fd, STDOUT_FILENO);
2678 dup2(log_fd, STDERR_FILENO);
2682 write_fd = atoi(argv[2]);
2684 generation = (uint32_t)strtoul(argv[4], NULL, 0);
2686 mem_ctx = talloc_new(NULL);
2687 if (mem_ctx == NULL) {
2688 LOG("talloc_new() failed\n");
2692 ev = tevent_context_init(mem_ctx);
2694 LOG("tevent_context_init() failed\n");
2698 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2700 LOG("ctdb_client_init() failed, ret=%d\n", ret);
2704 req = recovery_send(mem_ctx, ev, client, generation);
2706 LOG("database_recover_send() failed\n");
2710 if (! tevent_req_poll(req, ev)) {
2711 LOG("tevent_req_poll() failed\n");
2715 recovery_recv(req, &ret);
2718 LOG("database recovery failed, ret=%d\n", ret);
2722 sys_write(write_fd, &ret, sizeof(ret));
2726 talloc_free(mem_ctx);