2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
35 #include "lib/util/smb_strtox.h"
37 #include "protocol/protocol.h"
38 #include "protocol/protocol_api.h"
39 #include "client/client.h"
41 #include "common/logging.h"
43 static int recover_timeout = 30;
47 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
53 static bool generic_recv(struct tevent_req *req, int *perr)
57 if (tevent_req_is_unix_error(req, &err)) {
67 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
69 static uint64_t srvid_next(void)
76 * Node related functions
82 uint32_t *ban_credits;
87 static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
89 struct node_list *nlist;
92 nlist = talloc_zero(mem_ctx, struct node_list);
97 nlist->pnn_list = talloc_array(nlist, uint32_t, size);
98 nlist->caps = talloc_zero_array(nlist, uint32_t, size);
99 nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
101 if (nlist->pnn_list == NULL ||
102 nlist->caps == NULL ||
103 nlist->ban_credits == NULL) {
109 for (i=0; i<nlist->size; i++) {
110 nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
116 static bool node_list_add(struct node_list *nlist, uint32_t pnn)
120 if (nlist->count == nlist->size) {
124 for (i=0; i<nlist->count; i++) {
125 if (nlist->pnn_list[i] == pnn) {
130 nlist->pnn_list[nlist->count] = pnn;
136 static uint32_t *node_list_lmaster(struct node_list *nlist,
138 unsigned int *pnn_count)
141 unsigned int count, i;
143 pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
144 if (pnn_list == NULL) {
149 for (i=0; i<nlist->count; i++) {
150 if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
154 pnn_list[count] = nlist->pnn_list[i];
162 static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
166 for (i=0; i<nlist->count; i++) {
167 if (nlist->pnn_list[i] == pnn) {
168 nlist->ban_credits[i] += 1;
175 * Database list functions
177 * Simple, naive implementation that could be updated to a db_hash or similar
181 struct db *prev, *next;
186 unsigned int num_nodes;
190 unsigned int num_dbs;
192 unsigned int num_nodes;
195 static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
199 l = talloc_zero(mem_ctx, struct db_list);
200 l->num_nodes = num_nodes;
205 static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
209 if (dblist == NULL) {
214 while (db != NULL && db->db_id != db_id) {
221 static int db_list_add(struct db_list *dblist,
226 struct db *db = NULL;
228 if (dblist == NULL) {
232 db = talloc_zero(dblist, struct db);
238 db->db_flags = db_flags;
239 db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
240 if (db->pnn_list == NULL) {
244 db->pnn_list[0] = node;
247 DLIST_ADD_END(dblist->db, db);
253 static int db_list_check_and_add(struct db_list *dblist,
258 struct db *db = NULL;
262 * These flags are masked out because they are only set on a
263 * node when a client attaches to that node, so they might not
264 * be set yet. They can't be passed as part of the attach, so
265 * they're no use here.
267 db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
269 if (dblist == NULL) {
273 db = db_list_find(dblist, db_id);
275 ret = db_list_add(dblist, db_id, db_flags, node);
279 if (db->db_flags != db_flags) {
280 D_ERR("Incompatible database flags for 0x%"PRIx32" "
281 "(0x%"PRIx32" != 0x%"PRIx32")\n",
288 if (db->num_nodes >= dblist->num_nodes) {
292 db->pnn_list[db->num_nodes] = node;
299 * Create database on nodes where it is missing
302 struct db_create_missing_state {
303 struct tevent_context *ev;
304 struct ctdb_client_context *client;
306 struct node_list *nlist;
309 uint32_t *missing_pnn_list;
310 int missing_num_nodes;
313 static void db_create_missing_done(struct tevent_req *subreq);
315 static struct tevent_req *db_create_missing_send(
317 struct tevent_context *ev,
318 struct ctdb_client_context *client,
319 struct node_list *nlist,
323 struct tevent_req *req, *subreq;
324 struct db_create_missing_state *state;
325 struct ctdb_req_control request;
328 req = tevent_req_create(mem_ctx,
330 struct db_create_missing_state);
336 state->client = client;
337 state->nlist = nlist;
338 state->db_name = db_name;
340 if (nlist->count == db->num_nodes) {
341 tevent_req_done(req);
342 return tevent_req_post(req, ev);
345 state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
346 if (tevent_req_nomem(state->missing_pnn_list, req)) {
347 return tevent_req_post(req, ev);
350 for (i = 0; i < nlist->count; i++) {
351 uint32_t pnn = nlist->pnn_list[i] ;
353 for (j = 0; j < db->num_nodes; j++) {
354 if (pnn == db->pnn_list[j]) {
359 if (j < db->num_nodes) {
363 DBG_INFO("Create database %s on node %u\n",
366 state->missing_pnn_list[state->missing_num_nodes] = pnn;
367 state->missing_num_nodes++;
370 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
371 ctdb_req_control_db_attach_persistent(&request, db_name);
372 } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
373 ctdb_req_control_db_attach_replicated(&request, db_name);
375 ctdb_req_control_db_attach(&request, db_name);
377 request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
378 subreq = ctdb_client_control_multi_send(state,
381 state->missing_pnn_list,
382 state->missing_num_nodes,
385 if (tevent_req_nomem(subreq, req)) {
386 return tevent_req_post(req, ev);
388 tevent_req_set_callback(subreq, db_create_missing_done, req);
393 static void db_create_missing_done(struct tevent_req *subreq)
395 struct tevent_req *req = tevent_req_callback_data(
396 subreq, struct tevent_req);
397 struct db_create_missing_state *state = tevent_req_data(
398 req, struct db_create_missing_state);
403 status = ctdb_client_control_multi_recv(subreq,
413 ret2 = ctdb_client_control_multi_error(
414 state->missing_pnn_list,
415 state->missing_num_nodes,
419 D_ERR("control DB_ATTACH failed for db %s"
420 " on node %u, ret=%d\n",
424 node_list_ban_credits(state->nlist, pnn);
426 D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
430 tevent_req_error(req, ret);
434 tevent_req_done(req);
437 static bool db_create_missing_recv(struct tevent_req *req, int *perr)
439 return generic_recv(req, perr);
443 * Recovery database functions
446 struct recdb_context {
454 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
457 uint32_t hash_size, bool persistent)
459 static char *db_dir_state = NULL;
460 struct recdb_context *recdb;
461 unsigned int tdb_flags;
463 recdb = talloc(mem_ctx, struct recdb_context);
468 if (db_dir_state == NULL) {
469 db_dir_state = getenv("CTDB_DBDIR_STATE");
472 recdb->db_name = db_name;
473 recdb->db_id = db_id;
474 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
475 db_dir_state != NULL ?
477 dirname(discard_const(db_path)),
479 if (recdb->db_path == NULL) {
483 unlink(recdb->db_path);
485 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
486 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
487 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
488 if (recdb->db == NULL) {
490 D_ERR("failed to create recovery db %s\n", recdb->db_path);
494 recdb->persistent = persistent;
499 static uint32_t recdb_id(struct recdb_context *recdb)
504 static const char *recdb_name(struct recdb_context *recdb)
506 return recdb->db_name;
509 static const char *recdb_path(struct recdb_context *recdb)
511 return recdb->db_path;
514 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
516 return recdb->db->tdb;
519 static bool recdb_persistent(struct recdb_context *recdb)
521 return recdb->persistent;
524 struct recdb_add_traverse_state {
525 struct recdb_context *recdb;
529 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
530 TDB_DATA key, TDB_DATA data,
533 struct recdb_add_traverse_state *state =
534 (struct recdb_add_traverse_state *)private_data;
535 struct ctdb_ltdb_header *hdr;
539 /* header is not marshalled separately in the pulldb control */
540 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
544 hdr = (struct ctdb_ltdb_header *)data.dptr;
546 /* fetch the existing record, if any */
547 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
549 if (prev_data.dptr != NULL) {
550 struct ctdb_ltdb_header prev_hdr;
552 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
553 free(prev_data.dptr);
554 if (hdr->rsn < prev_hdr.rsn ||
555 (hdr->rsn == prev_hdr.rsn &&
556 prev_hdr.dmaster != state->mypnn)) {
561 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
568 static bool recdb_add(struct recdb_context *recdb, int mypnn,
569 struct ctdb_rec_buffer *recbuf)
571 struct recdb_add_traverse_state state;
577 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
585 /* This function decides which records from recdb are retained */
586 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
587 uint32_t reqid, uint32_t dmaster,
588 TDB_DATA key, TDB_DATA data)
590 struct ctdb_ltdb_header *header;
593 /* Skip empty records */
594 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
598 /* update the dmaster field to point to us */
599 header = (struct ctdb_ltdb_header *)data.dptr;
601 header->dmaster = dmaster;
602 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
605 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
613 struct recdb_file_traverse_state {
614 struct ctdb_rec_buffer *recbuf;
615 struct recdb_context *recdb;
623 unsigned int num_buffers;
626 static int recdb_file_traverse(struct tdb_context *tdb,
627 TDB_DATA key, TDB_DATA data,
630 struct recdb_file_traverse_state *state =
631 (struct recdb_file_traverse_state *)private_data;
634 ret = recbuf_filter_add(state->recbuf, state->persistent,
635 state->reqid, state->dmaster, key, data);
637 state->failed = true;
641 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
642 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
644 D_ERR("Failed to collect recovery records for %s\n",
645 recdb_name(state->recdb));
646 state->failed = true;
650 state->num_buffers += 1;
652 TALLOC_FREE(state->recbuf);
653 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
654 recdb_id(state->recdb));
655 if (state->recbuf == NULL) {
656 state->failed = true;
664 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
665 uint32_t dmaster, int fd, int max_size)
667 struct recdb_file_traverse_state state;
670 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
671 if (state.recbuf == NULL) {
675 state.mem_ctx = mem_ctx;
676 state.dmaster = dmaster;
678 state.persistent = recdb_persistent(recdb);
679 state.failed = false;
681 state.max_size = max_size;
682 state.num_buffers = 0;
684 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
685 if (ret == -1 || state.failed) {
686 TALLOC_FREE(state.recbuf);
690 ret = ctdb_rec_buffer_write(state.recbuf, fd);
692 D_ERR("Failed to collect recovery records for %s\n",
694 TALLOC_FREE(state.recbuf);
697 state.num_buffers += 1;
699 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
700 state.num_buffers, recdb_name(recdb));
702 return state.num_buffers;
706 * Pull database from a single node
709 struct pull_database_state {
710 struct tevent_context *ev;
711 struct ctdb_client_context *client;
712 struct recdb_context *recdb;
715 unsigned int num_records;
719 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
721 static void pull_database_register_done(struct tevent_req *subreq);
722 static void pull_database_unregister_done(struct tevent_req *subreq);
723 static void pull_database_done(struct tevent_req *subreq);
725 static struct tevent_req *pull_database_send(
727 struct tevent_context *ev,
728 struct ctdb_client_context *client,
730 struct recdb_context *recdb)
732 struct tevent_req *req, *subreq;
733 struct pull_database_state *state;
735 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
741 state->client = client;
742 state->recdb = recdb;
744 state->srvid = srvid_next();
746 subreq = ctdb_client_set_message_handler_send(
747 state, state->ev, state->client,
748 state->srvid, pull_database_handler,
750 if (tevent_req_nomem(subreq, req)) {
751 return tevent_req_post(req, ev);
754 tevent_req_set_callback(subreq, pull_database_register_done, req);
759 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
762 struct tevent_req *req = talloc_get_type_abort(
763 private_data, struct tevent_req);
764 struct pull_database_state *state = tevent_req_data(
765 req, struct pull_database_state);
766 struct ctdb_rec_buffer *recbuf;
771 if (srvid != state->srvid) {
775 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
777 D_ERR("Invalid data received for DB_PULL messages\n");
781 if (recbuf->db_id != recdb_id(state->recdb)) {
783 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
784 recbuf->db_id, recdb_name(state->recdb));
788 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
792 D_ERR("Failed to add records to recdb for %s\n",
793 recdb_name(state->recdb));
797 state->num_records += recbuf->count;
801 static void pull_database_register_done(struct tevent_req *subreq)
803 struct tevent_req *req = tevent_req_callback_data(
804 subreq, struct tevent_req);
805 struct pull_database_state *state = tevent_req_data(
806 req, struct pull_database_state);
807 struct ctdb_req_control request;
808 struct ctdb_pulldb_ext pulldb_ext;
812 status = ctdb_client_set_message_handler_recv(subreq, &ret);
815 D_ERR("Failed to set message handler for DB_PULL for %s\n",
816 recdb_name(state->recdb));
817 tevent_req_error(req, ret);
821 pulldb_ext.db_id = recdb_id(state->recdb);
822 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
823 pulldb_ext.srvid = state->srvid;
825 ctdb_req_control_db_pull(&request, &pulldb_ext);
826 subreq = ctdb_client_control_send(state, state->ev, state->client,
827 state->pnn, TIMEOUT(), &request);
828 if (tevent_req_nomem(subreq, req)) {
831 tevent_req_set_callback(subreq, pull_database_done, req);
834 static void pull_database_done(struct tevent_req *subreq)
836 struct tevent_req *req = tevent_req_callback_data(
837 subreq, struct tevent_req);
838 struct pull_database_state *state = tevent_req_data(
839 req, struct pull_database_state);
840 struct ctdb_reply_control *reply;
841 uint32_t num_records;
845 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
848 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
849 recdb_name(state->recdb), state->pnn, ret);
854 ret = ctdb_reply_control_db_pull(reply, &num_records);
856 if (num_records != state->num_records) {
857 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
858 num_records, state->num_records,
859 recdb_name(state->recdb));
864 D_INFO("Pulled %d records for db %s from node %d\n",
865 state->num_records, recdb_name(state->recdb), state->pnn);
869 subreq = ctdb_client_remove_message_handler_send(
870 state, state->ev, state->client,
872 if (tevent_req_nomem(subreq, req)) {
875 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
878 static void pull_database_unregister_done(struct tevent_req *subreq)
880 struct tevent_req *req = tevent_req_callback_data(
881 subreq, struct tevent_req);
882 struct pull_database_state *state = tevent_req_data(
883 req, struct pull_database_state);
887 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
890 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
891 recdb_name(state->recdb));
892 tevent_req_error(req, ret);
896 if (state->result != 0) {
897 tevent_req_error(req, state->result);
901 tevent_req_done(req);
904 static bool pull_database_recv(struct tevent_req *req, int *perr)
906 return generic_recv(req, perr);
910 * Push database to specified nodes (new style)
913 struct push_database_state {
914 struct tevent_context *ev;
915 struct ctdb_client_context *client;
916 struct recdb_context *recdb;
923 int num_buffers_sent;
924 unsigned int num_records;
927 static void push_database_started(struct tevent_req *subreq);
928 static void push_database_send_msg(struct tevent_req *req);
929 static void push_database_send_done(struct tevent_req *subreq);
930 static void push_database_confirmed(struct tevent_req *subreq);
932 static struct tevent_req *push_database_send(
934 struct tevent_context *ev,
935 struct ctdb_client_context *client,
938 struct recdb_context *recdb,
941 struct tevent_req *req, *subreq;
942 struct push_database_state *state;
943 struct ctdb_req_control request;
944 struct ctdb_pulldb_ext pulldb_ext;
948 req = tevent_req_create(mem_ctx, &state,
949 struct push_database_state);
955 state->client = client;
956 state->recdb = recdb;
957 state->pnn_list = pnn_list;
958 state->count = count;
960 state->srvid = srvid_next();
961 state->dmaster = ctdb_client_pnn(client);
962 state->num_buffers_sent = 0;
963 state->num_records = 0;
965 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
966 if (tevent_req_nomem(filename, req)) {
967 return tevent_req_post(req, ev);
970 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
971 if (state->fd == -1) {
972 tevent_req_error(req, errno);
973 return tevent_req_post(req, ev);
976 talloc_free(filename);
978 state->num_buffers = recdb_file(recdb, state, state->dmaster,
979 state->fd, max_size);
980 if (state->num_buffers == -1) {
981 tevent_req_error(req, ENOMEM);
982 return tevent_req_post(req, ev);
985 offset = lseek(state->fd, 0, SEEK_SET);
987 tevent_req_error(req, EIO);
988 return tevent_req_post(req, ev);
991 pulldb_ext.db_id = recdb_id(recdb);
992 pulldb_ext.srvid = state->srvid;
994 ctdb_req_control_db_push_start(&request, &pulldb_ext);
995 subreq = ctdb_client_control_multi_send(state, ev, client,
997 TIMEOUT(), &request);
998 if (tevent_req_nomem(subreq, req)) {
999 return tevent_req_post(req, ev);
1001 tevent_req_set_callback(subreq, push_database_started, req);
1006 static void push_database_started(struct tevent_req *subreq)
1008 struct tevent_req *req = tevent_req_callback_data(
1009 subreq, struct tevent_req);
1010 struct push_database_state *state = tevent_req_data(
1011 req, struct push_database_state);
1016 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1018 TALLOC_FREE(subreq);
1023 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1027 D_ERR("control DB_PUSH_START failed for db %s"
1028 " on node %u, ret=%d\n",
1029 recdb_name(state->recdb), pnn, ret2);
1031 D_ERR("control DB_PUSH_START failed for db %s,"
1033 recdb_name(state->recdb), ret);
1035 talloc_free(err_list);
1037 tevent_req_error(req, ret);
1041 push_database_send_msg(req);
1044 static void push_database_send_msg(struct tevent_req *req)
1046 struct push_database_state *state = tevent_req_data(
1047 req, struct push_database_state);
1048 struct tevent_req *subreq;
1049 struct ctdb_rec_buffer *recbuf;
1050 struct ctdb_req_message message;
1055 if (state->num_buffers_sent == state->num_buffers) {
1056 struct ctdb_req_control request;
1058 ctdb_req_control_db_push_confirm(&request,
1059 recdb_id(state->recdb));
1060 subreq = ctdb_client_control_multi_send(state, state->ev,
1064 TIMEOUT(), &request);
1065 if (tevent_req_nomem(subreq, req)) {
1068 tevent_req_set_callback(subreq, push_database_confirmed, req);
1072 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
1074 tevent_req_error(req, ret);
1078 data.dsize = ctdb_rec_buffer_len(recbuf);
1079 data.dptr = talloc_size(state, data.dsize);
1080 if (tevent_req_nomem(data.dptr, req)) {
1084 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
1086 message.srvid = state->srvid;
1087 message.data.data = data;
1089 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1090 state->num_buffers_sent, recbuf->count,
1091 recdb_name(state->recdb));
1093 subreq = ctdb_client_message_multi_send(state, state->ev,
1095 state->pnn_list, state->count,
1097 if (tevent_req_nomem(subreq, req)) {
1100 tevent_req_set_callback(subreq, push_database_send_done, req);
1102 state->num_records += recbuf->count;
1104 talloc_free(data.dptr);
1105 talloc_free(recbuf);
1108 static void push_database_send_done(struct tevent_req *subreq)
1110 struct tevent_req *req = tevent_req_callback_data(
1111 subreq, struct tevent_req);
1112 struct push_database_state *state = tevent_req_data(
1113 req, struct push_database_state);
1117 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1118 TALLOC_FREE(subreq);
1120 D_ERR("Sending recovery records failed for %s\n",
1121 recdb_name(state->recdb));
1122 tevent_req_error(req, ret);
1126 state->num_buffers_sent += 1;
1128 push_database_send_msg(req);
1131 static void push_database_confirmed(struct tevent_req *subreq)
1133 struct tevent_req *req = tevent_req_callback_data(
1134 subreq, struct tevent_req);
1135 struct push_database_state *state = tevent_req_data(
1136 req, struct push_database_state);
1137 struct ctdb_reply_control **reply;
1142 uint32_t num_records;
1144 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1146 TALLOC_FREE(subreq);
1151 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1152 state->count, err_list,
1155 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1156 " on node %u, ret=%d\n",
1157 recdb_name(state->recdb), pnn, ret2);
1159 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1161 recdb_name(state->recdb), ret);
1163 tevent_req_error(req, ret);
1167 for (i=0; i<state->count; i++) {
1168 ret = ctdb_reply_control_db_push_confirm(reply[i],
1171 tevent_req_error(req, EPROTO);
1175 if (num_records != state->num_records) {
1176 D_ERR("Node %u received %d of %d records for %s\n",
1177 state->pnn_list[i], num_records,
1178 state->num_records, recdb_name(state->recdb));
1179 tevent_req_error(req, EPROTO);
1186 D_INFO("Pushed %d records for db %s\n",
1187 state->num_records, recdb_name(state->recdb));
1189 tevent_req_done(req);
1192 static bool push_database_recv(struct tevent_req *req, int *perr)
1194 return generic_recv(req, perr);
1198 * Collect databases using highest sequence number
1201 struct collect_highseqnum_db_state {
1202 struct tevent_context *ev;
1203 struct ctdb_client_context *client;
1204 struct node_list *nlist;
1206 struct recdb_context *recdb;
1211 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1212 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1214 static struct tevent_req *collect_highseqnum_db_send(
1215 TALLOC_CTX *mem_ctx,
1216 struct tevent_context *ev,
1217 struct ctdb_client_context *client,
1218 struct node_list *nlist,
1220 struct recdb_context *recdb)
1222 struct tevent_req *req, *subreq;
1223 struct collect_highseqnum_db_state *state;
1224 struct ctdb_req_control request;
1226 req = tevent_req_create(mem_ctx, &state,
1227 struct collect_highseqnum_db_state);
1233 state->client = client;
1234 state->nlist = nlist;
1235 state->db_id = db_id;
1236 state->recdb = recdb;
1238 ctdb_req_control_get_db_seqnum(&request, db_id);
1239 subreq = ctdb_client_control_multi_send(mem_ctx,
1246 if (tevent_req_nomem(subreq, req)) {
1247 return tevent_req_post(req, ev);
1249 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1255 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1257 struct tevent_req *req = tevent_req_callback_data(
1258 subreq, struct tevent_req);
1259 struct collect_highseqnum_db_state *state = tevent_req_data(
1260 req, struct collect_highseqnum_db_state);
1261 struct ctdb_reply_control **reply;
1266 uint64_t seqnum, max_seqnum;
1268 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1270 TALLOC_FREE(subreq);
1275 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1276 state->nlist->count,
1280 D_ERR("control GET_DB_SEQNUM failed for db %s"
1281 " on node %u, ret=%d\n",
1282 recdb_name(state->recdb), pnn, ret2);
1284 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1286 recdb_name(state->recdb), ret);
1288 tevent_req_error(req, ret);
1293 state->max_pnn = state->nlist->pnn_list[0];
1294 for (i=0; i<state->nlist->count; i++) {
1295 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1297 tevent_req_error(req, EPROTO);
1301 if (max_seqnum < seqnum) {
1302 max_seqnum = seqnum;
1303 state->max_pnn = state->nlist->pnn_list[i];
1309 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1310 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1312 subreq = pull_database_send(state,
1317 if (tevent_req_nomem(subreq, req)) {
1320 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1324 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1326 struct tevent_req *req = tevent_req_callback_data(
1327 subreq, struct tevent_req);
1328 struct collect_highseqnum_db_state *state = tevent_req_data(
1329 req, struct collect_highseqnum_db_state);
1333 status = pull_database_recv(subreq, &ret);
1334 TALLOC_FREE(subreq);
1336 node_list_ban_credits(state->nlist, state->max_pnn);
1337 tevent_req_error(req, ret);
1341 tevent_req_done(req);
1344 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1346 return generic_recv(req, perr);
1350 * Collect all databases
1353 struct collect_all_db_state {
1354 struct tevent_context *ev;
1355 struct ctdb_client_context *client;
1356 struct node_list *nlist;
1358 struct recdb_context *recdb;
1360 struct ctdb_pulldb pulldb;
1364 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1366 static struct tevent_req *collect_all_db_send(
1367 TALLOC_CTX *mem_ctx,
1368 struct tevent_context *ev,
1369 struct ctdb_client_context *client,
1370 struct node_list *nlist,
1372 struct recdb_context *recdb)
1374 struct tevent_req *req, *subreq;
1375 struct collect_all_db_state *state;
1377 req = tevent_req_create(mem_ctx, &state,
1378 struct collect_all_db_state);
1384 state->client = client;
1385 state->nlist = nlist;
1386 state->db_id = db_id;
1387 state->recdb = recdb;
1390 subreq = pull_database_send(state,
1393 nlist->pnn_list[state->index],
1395 if (tevent_req_nomem(subreq, req)) {
1396 return tevent_req_post(req, ev);
1398 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1403 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1405 struct tevent_req *req = tevent_req_callback_data(
1406 subreq, struct tevent_req);
1407 struct collect_all_db_state *state = tevent_req_data(
1408 req, struct collect_all_db_state);
1412 status = pull_database_recv(subreq, &ret);
1413 TALLOC_FREE(subreq);
1415 node_list_ban_credits(state->nlist,
1416 state->nlist->pnn_list[state->index]);
1417 tevent_req_error(req, ret);
1422 if (state->index == state->nlist->count) {
1423 tevent_req_done(req);
1427 subreq = pull_database_send(state,
1430 state->nlist->pnn_list[state->index],
1432 if (tevent_req_nomem(subreq, req)) {
1435 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1438 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1440 return generic_recv(req, perr);
1445 * For each database do the following:
1446 * - Get DB name from all nodes
1447 * - Attach database on missing nodes
1449 * - Freeze database on all nodes
1450 * - Start transaction on all nodes
1451 * - Collect database from all nodes
1452 * - Wipe database on all nodes
1453 * - Push database to all nodes
1454 * - Commit transaction on all nodes
1455 * - Thaw database on all nodes
1458 struct recover_db_state {
1459 struct tevent_context *ev;
1460 struct ctdb_client_context *client;
1461 struct ctdb_tunable_list *tun_list;
1462 struct node_list *nlist;
1466 struct ctdb_transdb transdb;
1468 const char *db_name, *db_path;
1469 struct recdb_context *recdb;
1472 static void recover_db_name_done(struct tevent_req *subreq);
1473 static void recover_db_create_missing_done(struct tevent_req *subreq);
1474 static void recover_db_path_done(struct tevent_req *subreq);
1475 static void recover_db_freeze_done(struct tevent_req *subreq);
1476 static void recover_db_transaction_started(struct tevent_req *subreq);
1477 static void recover_db_collect_done(struct tevent_req *subreq);
1478 static void recover_db_wipedb_done(struct tevent_req *subreq);
1479 static void recover_db_pushdb_done(struct tevent_req *subreq);
1480 static void recover_db_transaction_committed(struct tevent_req *subreq);
1481 static void recover_db_thaw_done(struct tevent_req *subreq);
1483 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1484 struct tevent_context *ev,
1485 struct ctdb_client_context *client,
1486 struct ctdb_tunable_list *tun_list,
1487 struct node_list *nlist,
1488 uint32_t generation,
1491 struct tevent_req *req, *subreq;
1492 struct recover_db_state *state;
1493 struct ctdb_req_control request;
1495 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1501 state->client = client;
1502 state->tun_list = tun_list;
1503 state->nlist = nlist;
1506 state->destnode = ctdb_client_pnn(client);
1507 state->transdb.db_id = db->db_id;
1508 state->transdb.tid = generation;
1510 ctdb_req_control_get_dbname(&request, db->db_id);
1511 subreq = ctdb_client_control_multi_send(state,
1514 state->db->pnn_list,
1515 state->db->num_nodes,
1518 if (tevent_req_nomem(subreq, req)) {
1519 return tevent_req_post(req, ev);
1521 tevent_req_set_callback(subreq, recover_db_name_done, req);
1526 static void recover_db_name_done(struct tevent_req *subreq)
1528 struct tevent_req *req = tevent_req_callback_data(
1529 subreq, struct tevent_req);
1530 struct recover_db_state *state = tevent_req_data(
1531 req, struct recover_db_state);
1532 struct ctdb_reply_control **reply;
1538 status = ctdb_client_control_multi_recv(subreq,
1543 TALLOC_FREE(subreq);
1548 ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
1549 state->db->num_nodes,
1553 D_ERR("control GET_DBNAME failed on node %u,"
1558 D_ERR("control GET_DBNAME failed, ret=%d\n",
1561 tevent_req_error(req, ret);
1565 for (i = 0; i < state->db->num_nodes; i++) {
1566 const char *db_name;
1569 pnn = state->nlist->pnn_list[i];
1571 ret = ctdb_reply_control_get_dbname(reply[i],
1575 D_ERR("control GET_DBNAME failed on node %u "
1576 "for db=0x%x, ret=%d\n",
1580 tevent_req_error(req, EPROTO);
1584 if (state->db_name == NULL) {
1585 state->db_name = db_name;
1589 if (strcmp(state->db_name, db_name) != 0) {
1590 D_ERR("Incompatible database name for 0x%"PRIx32" "
1591 "(%s != %s) on node %"PRIu32"\n",
1596 node_list_ban_credits(state->nlist, pnn);
1597 tevent_req_error(req, ret);
1604 subreq = db_create_missing_send(state,
1611 if (tevent_req_nomem(subreq, req)) {
1614 tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
1617 static void recover_db_create_missing_done(struct tevent_req *subreq)
1619 struct tevent_req *req = tevent_req_callback_data(
1620 subreq, struct tevent_req);
1621 struct recover_db_state *state = tevent_req_data(
1622 req, struct recover_db_state);
1623 struct ctdb_req_control request;
1627 /* Could sanity check the db_id here */
1628 status = db_create_missing_recv(subreq, &ret);
1629 TALLOC_FREE(subreq);
1631 tevent_req_error(req, ret);
1635 ctdb_req_control_getdbpath(&request, state->db->db_id);
1636 subreq = ctdb_client_control_send(state, state->ev, state->client,
1637 state->destnode, TIMEOUT(),
1639 if (tevent_req_nomem(subreq, req)) {
1642 tevent_req_set_callback(subreq, recover_db_path_done, req);
1645 static void recover_db_path_done(struct tevent_req *subreq)
1647 struct tevent_req *req = tevent_req_callback_data(
1648 subreq, struct tevent_req);
1649 struct recover_db_state *state = tevent_req_data(
1650 req, struct recover_db_state);
1651 struct ctdb_reply_control *reply;
1652 struct ctdb_req_control request;
1656 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1657 TALLOC_FREE(subreq);
1659 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1660 state->db_name, ret);
1661 tevent_req_error(req, ret);
1665 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1667 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1668 state->db_name, ret);
1669 tevent_req_error(req, EPROTO);
1675 ctdb_req_control_db_freeze(&request, state->db->db_id);
1676 subreq = ctdb_client_control_multi_send(state,
1679 state->nlist->pnn_list,
1680 state->nlist->count,
1683 if (tevent_req_nomem(subreq, req)) {
1686 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1689 static void recover_db_freeze_done(struct tevent_req *subreq)
1691 struct tevent_req *req = tevent_req_callback_data(
1692 subreq, struct tevent_req);
1693 struct recover_db_state *state = tevent_req_data(
1694 req, struct recover_db_state);
1695 struct ctdb_req_control request;
1700 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1702 TALLOC_FREE(subreq);
1707 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1708 state->nlist->count,
1712 D_ERR("control FREEZE_DB failed for db %s"
1713 " on node %u, ret=%d\n",
1714 state->db_name, pnn, ret2);
1716 node_list_ban_credits(state->nlist, pnn);
1718 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1719 state->db_name, ret);
1721 tevent_req_error(req, ret);
1725 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1726 subreq = ctdb_client_control_multi_send(state,
1729 state->nlist->pnn_list,
1730 state->nlist->count,
1733 if (tevent_req_nomem(subreq, req)) {
1736 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1739 static void recover_db_transaction_started(struct tevent_req *subreq)
1741 struct tevent_req *req = tevent_req_callback_data(
1742 subreq, struct tevent_req);
1743 struct recover_db_state *state = tevent_req_data(
1744 req, struct recover_db_state);
1750 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1752 TALLOC_FREE(subreq);
1757 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1758 state->nlist->count,
1762 D_ERR("control TRANSACTION_DB failed for db=%s"
1763 " on node %u, ret=%d\n",
1764 state->db_name, pnn, ret2);
1766 D_ERR("control TRANSACTION_DB failed for db=%s,"
1767 " ret=%d\n", state->db_name, ret);
1769 tevent_req_error(req, ret);
1773 flags = state->db->db_flags;
1774 state->recdb = recdb_create(state,
1778 state->tun_list->database_hash_size,
1779 flags & CTDB_DB_FLAGS_PERSISTENT);
1780 if (tevent_req_nomem(state->recdb, req)) {
1784 if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
1785 (flags & CTDB_DB_FLAGS_REPLICATED)) {
1786 subreq = collect_highseqnum_db_send(state,
1793 subreq = collect_all_db_send(state,
1800 if (tevent_req_nomem(subreq, req)) {
1803 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1806 static void recover_db_collect_done(struct tevent_req *subreq)
1808 struct tevent_req *req = tevent_req_callback_data(
1809 subreq, struct tevent_req);
1810 struct recover_db_state *state = tevent_req_data(
1811 req, struct recover_db_state);
1812 struct ctdb_req_control request;
1816 if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1817 (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1818 status = collect_highseqnum_db_recv(subreq, &ret);
1820 status = collect_all_db_recv(subreq, &ret);
1822 TALLOC_FREE(subreq);
1824 tevent_req_error(req, ret);
1828 ctdb_req_control_wipe_database(&request, &state->transdb);
1829 subreq = ctdb_client_control_multi_send(state,
1832 state->nlist->pnn_list,
1833 state->nlist->count,
1836 if (tevent_req_nomem(subreq, req)) {
1839 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1842 static void recover_db_wipedb_done(struct tevent_req *subreq)
1844 struct tevent_req *req = tevent_req_callback_data(
1845 subreq, struct tevent_req);
1846 struct recover_db_state *state = tevent_req_data(
1847 req, struct recover_db_state);
1852 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1854 TALLOC_FREE(subreq);
1859 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1860 state->nlist->count,
1864 D_ERR("control WIPEDB failed for db %s on node %u,"
1865 " ret=%d\n", state->db_name, pnn, ret2);
1867 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1868 state->db_name, ret);
1870 tevent_req_error(req, ret);
1874 subreq = push_database_send(state,
1877 state->nlist->pnn_list,
1878 state->nlist->count,
1880 state->tun_list->rec_buffer_size_limit);
1881 if (tevent_req_nomem(subreq, req)) {
1884 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1887 static void recover_db_pushdb_done(struct tevent_req *subreq)
1889 struct tevent_req *req = tevent_req_callback_data(
1890 subreq, struct tevent_req);
1891 struct recover_db_state *state = tevent_req_data(
1892 req, struct recover_db_state);
1893 struct ctdb_req_control request;
1897 status = push_database_recv(subreq, &ret);
1898 TALLOC_FREE(subreq);
1900 tevent_req_error(req, ret);
1904 TALLOC_FREE(state->recdb);
1906 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1907 subreq = ctdb_client_control_multi_send(state,
1910 state->nlist->pnn_list,
1911 state->nlist->count,
1914 if (tevent_req_nomem(subreq, req)) {
1917 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1920 static void recover_db_transaction_committed(struct tevent_req *subreq)
1922 struct tevent_req *req = tevent_req_callback_data(
1923 subreq, struct tevent_req);
1924 struct recover_db_state *state = tevent_req_data(
1925 req, struct recover_db_state);
1926 struct ctdb_req_control request;
1931 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1933 TALLOC_FREE(subreq);
1938 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1939 state->nlist->count,
1943 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1944 " on node %u, ret=%d\n",
1945 state->db_name, pnn, ret2);
1947 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1948 " ret=%d\n", state->db_name, ret);
1950 tevent_req_error(req, ret);
1954 ctdb_req_control_db_thaw(&request, state->db->db_id);
1955 subreq = ctdb_client_control_multi_send(state,
1958 state->nlist->pnn_list,
1959 state->nlist->count,
1962 if (tevent_req_nomem(subreq, req)) {
1965 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1968 static void recover_db_thaw_done(struct tevent_req *subreq)
1970 struct tevent_req *req = tevent_req_callback_data(
1971 subreq, struct tevent_req);
1972 struct recover_db_state *state = tevent_req_data(
1973 req, struct recover_db_state);
1978 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1980 TALLOC_FREE(subreq);
1985 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1986 state->nlist->count,
1990 D_ERR("control DB_THAW failed for db %s on node %u,"
1991 " ret=%d\n", state->db_name, pnn, ret2);
1993 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1994 state->db_name, ret);
1996 tevent_req_error(req, ret);
2000 tevent_req_done(req);
2003 static bool recover_db_recv(struct tevent_req *req)
2005 return generic_recv(req, NULL);
2010 * Start database recovery for each database
2012 * Try to recover each database 5 times before failing recovery.
2015 struct db_recovery_state {
2016 struct tevent_context *ev;
2017 struct db_list *dblist;
2018 unsigned int num_replies;
2019 unsigned int num_failed;
2022 struct db_recovery_one_state {
2023 struct tevent_req *req;
2024 struct ctdb_client_context *client;
2025 struct db_list *dblist;
2026 struct ctdb_tunable_list *tun_list;
2027 struct node_list *nlist;
2028 uint32_t generation;
2033 static void db_recovery_one_done(struct tevent_req *subreq);
2035 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
2036 struct tevent_context *ev,
2037 struct ctdb_client_context *client,
2038 struct db_list *dblist,
2039 struct ctdb_tunable_list *tun_list,
2040 struct node_list *nlist,
2041 uint32_t generation)
2043 struct tevent_req *req, *subreq;
2044 struct db_recovery_state *state;
2047 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
2053 state->dblist = dblist;
2054 state->num_replies = 0;
2055 state->num_failed = 0;
2057 if (dblist->num_dbs == 0) {
2058 tevent_req_done(req);
2059 return tevent_req_post(req, ev);
2062 for (db = dblist->db; db != NULL; db = db->next) {
2063 struct db_recovery_one_state *substate;
2065 substate = talloc_zero(state, struct db_recovery_one_state);
2066 if (tevent_req_nomem(substate, req)) {
2067 return tevent_req_post(req, ev);
2070 substate->req = req;
2071 substate->client = client;
2072 substate->dblist = dblist;
2073 substate->tun_list = tun_list;
2074 substate->nlist = nlist;
2075 substate->generation = generation;
2078 subreq = recover_db_send(state,
2085 if (tevent_req_nomem(subreq, req)) {
2086 return tevent_req_post(req, ev);
2088 tevent_req_set_callback(subreq, db_recovery_one_done,
2090 D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
2096 static void db_recovery_one_done(struct tevent_req *subreq)
2098 struct db_recovery_one_state *substate = tevent_req_callback_data(
2099 subreq, struct db_recovery_one_state);
2100 struct tevent_req *req = substate->req;
2101 struct db_recovery_state *state = tevent_req_data(
2102 req, struct db_recovery_state);
2105 status = recover_db_recv(subreq);
2106 TALLOC_FREE(subreq);
2109 talloc_free(substate);
2113 substate->num_fails += 1;
2114 if (substate->num_fails < NUM_RETRIES) {
2115 subreq = recover_db_send(state,
2120 substate->generation,
2122 if (tevent_req_nomem(subreq, req)) {
2125 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2126 D_NOTICE("recover database 0x%08x, attempt %d\n",
2127 substate->db->db_id, substate->num_fails+1);
2132 state->num_failed += 1;
2135 state->num_replies += 1;
2137 if (state->num_replies == state->dblist->num_dbs) {
2138 tevent_req_done(req);
2142 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2144 struct db_recovery_state *state = tevent_req_data(
2145 req, struct db_recovery_state);
2148 if (tevent_req_is_unix_error(req, &err)) {
2153 *count = state->num_replies - state->num_failed;
2155 if (state->num_failed > 0) {
2162 struct ban_node_state {
2163 struct tevent_context *ev;
2164 struct ctdb_client_context *client;
2165 struct ctdb_tunable_list *tun_list;
2166 struct node_list *nlist;
2172 static bool ban_node_check(struct tevent_req *req);
2173 static void ban_node_check_done(struct tevent_req *subreq);
2174 static void ban_node_done(struct tevent_req *subreq);
2176 static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
2177 struct tevent_context *ev,
2178 struct ctdb_client_context *client,
2179 struct ctdb_tunable_list *tun_list,
2180 struct node_list *nlist)
2182 struct tevent_req *req;
2183 struct ban_node_state *state;
2186 req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
2192 state->client = client;
2193 state->tun_list = tun_list;
2194 state->nlist = nlist;
2195 state->destnode = ctdb_client_pnn(client);
2197 /* Bans are not enabled */
2198 if (state->tun_list->enable_bans == 0) {
2199 D_ERR("Bans are not enabled\n");
2200 tevent_req_done(req);
2201 return tevent_req_post(req, ev);
2204 ok = ban_node_check(req);
2206 return tevent_req_post(req, ev);
2212 static bool ban_node_check(struct tevent_req *req)
2214 struct tevent_req *subreq;
2215 struct ban_node_state *state = tevent_req_data(
2216 req, struct ban_node_state);
2217 struct ctdb_req_control request;
2218 unsigned max_credits = 0, i;
2220 for (i=0; i<state->nlist->count; i++) {
2221 if (state->nlist->ban_credits[i] > max_credits) {
2222 state->max_pnn = state->nlist->pnn_list[i];
2223 max_credits = state->nlist->ban_credits[i];
2227 if (max_credits < NUM_RETRIES) {
2228 tevent_req_done(req);
2232 ctdb_req_control_get_nodemap(&request);
2233 subreq = ctdb_client_control_send(state,
2239 if (tevent_req_nomem(subreq, req)) {
2242 tevent_req_set_callback(subreq, ban_node_check_done, req);
2247 static void ban_node_check_done(struct tevent_req *subreq)
2249 struct tevent_req *req = tevent_req_callback_data(
2250 subreq, struct tevent_req);
2251 struct ban_node_state *state = tevent_req_data(
2252 req, struct ban_node_state);
2253 struct ctdb_reply_control *reply;
2254 struct ctdb_node_map *nodemap;
2255 struct ctdb_req_control request;
2256 struct ctdb_ban_state ban;
2261 ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
2262 TALLOC_FREE(subreq);
2264 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2265 state->max_pnn, ret);
2266 tevent_req_error(req, ret);
2270 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2272 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2273 tevent_req_error(req, ret);
2277 for (i=0; i<nodemap->num; i++) {
2278 if (nodemap->node[i].pnn != state->max_pnn) {
2282 /* If the node became inactive, reset ban_credits */
2283 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2286 for (j=0; j<state->nlist->count; j++) {
2287 if (state->nlist->pnn_list[j] ==
2289 state->nlist->ban_credits[j] = 0;
2293 state->max_pnn = CTDB_UNKNOWN_PNN;
2297 talloc_free(nodemap);
2300 /* If node becomes inactive during recovery, pick next */
2301 if (state->max_pnn == CTDB_UNKNOWN_PNN) {
2302 (void) ban_node_check(req);
2306 ban = (struct ctdb_ban_state) {
2307 .pnn = state->max_pnn,
2308 .time = state->tun_list->recovery_ban_period,
2311 D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
2313 ctdb_req_control_set_ban_state(&request, &ban);
2314 subreq = ctdb_client_control_send(state,
2320 if (tevent_req_nomem(subreq, req)) {
2323 tevent_req_set_callback(subreq, ban_node_done, req);
2326 static void ban_node_done(struct tevent_req *subreq)
2328 struct tevent_req *req = tevent_req_callback_data(
2329 subreq, struct tevent_req);
2330 struct ban_node_state *state = tevent_req_data(
2331 req, struct ban_node_state);
2332 struct ctdb_reply_control *reply;
2336 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2337 TALLOC_FREE(subreq);
2339 tevent_req_error(req, ret);
2343 ret = ctdb_reply_control_set_ban_state(reply);
2345 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2346 tevent_req_error(req, ret);
2351 tevent_req_done(req);
2354 static bool ban_node_recv(struct tevent_req *req, int *perr)
2356 if (tevent_req_is_unix_error(req, perr)) {
2364 * Run the parallel database recovery
2367 * - Get nodemap from all nodes
2368 * - Get capabilities from all nodes
2370 * - Set RECOVERY_ACTIVE
2371 * - Send START_RECOVERY
2372 * - Update vnnmap on all nodes
2373 * - Run database recovery
2374 * - Set RECOVERY_NORMAL
2375 * - Send END_RECOVERY
2378 struct recovery_state {
2379 struct tevent_context *ev;
2380 struct ctdb_client_context *client;
2381 uint32_t generation;
2383 struct node_list *nlist;
2384 struct ctdb_tunable_list *tun_list;
2385 struct ctdb_vnn_map *vnnmap;
2386 struct db_list *dblist;
2389 static void recovery_tunables_done(struct tevent_req *subreq);
2390 static void recovery_nodemap_done(struct tevent_req *subreq);
2391 static void recovery_nodemap_verify(struct tevent_req *subreq);
2392 static void recovery_capabilities_done(struct tevent_req *subreq);
2393 static void recovery_dbmap_done(struct tevent_req *subreq);
2394 static void recovery_active_done(struct tevent_req *subreq);
2395 static void recovery_start_recovery_done(struct tevent_req *subreq);
2396 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2397 static void recovery_db_recovery_done(struct tevent_req *subreq);
2398 static void recovery_failed_done(struct tevent_req *subreq);
2399 static void recovery_normal_done(struct tevent_req *subreq);
2400 static void recovery_end_recovery_done(struct tevent_req *subreq);
2402 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2403 struct tevent_context *ev,
2404 struct ctdb_client_context *client,
2405 uint32_t generation)
2407 struct tevent_req *req, *subreq;
2408 struct recovery_state *state;
2409 struct ctdb_req_control request;
2411 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2417 state->client = client;
2418 state->generation = generation;
2419 state->destnode = ctdb_client_pnn(client);
2421 ctdb_req_control_get_all_tunables(&request);
2422 subreq = ctdb_client_control_send(state, state->ev, state->client,
2423 state->destnode, TIMEOUT(),
2425 if (tevent_req_nomem(subreq, req)) {
2426 return tevent_req_post(req, ev);
2428 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2433 static void recovery_tunables_done(struct tevent_req *subreq)
2435 struct tevent_req *req = tevent_req_callback_data(
2436 subreq, struct tevent_req);
2437 struct recovery_state *state = tevent_req_data(
2438 req, struct recovery_state);
2439 struct ctdb_reply_control *reply;
2440 struct ctdb_req_control request;
2444 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2445 TALLOC_FREE(subreq);
2447 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2448 tevent_req_error(req, ret);
2452 ret = ctdb_reply_control_get_all_tunables(reply, state,
2455 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2456 tevent_req_error(req, EPROTO);
2462 recover_timeout = state->tun_list->recover_timeout;
2464 ctdb_req_control_get_nodemap(&request);
2465 subreq = ctdb_client_control_send(state, state->ev, state->client,
2466 state->destnode, TIMEOUT(),
2468 if (tevent_req_nomem(subreq, req)) {
2471 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2474 static void recovery_nodemap_done(struct tevent_req *subreq)
2476 struct tevent_req *req = tevent_req_callback_data(
2477 subreq, struct tevent_req);
2478 struct recovery_state *state = tevent_req_data(
2479 req, struct recovery_state);
2480 struct ctdb_reply_control *reply;
2481 struct ctdb_req_control request;
2482 struct ctdb_node_map *nodemap;
2487 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2488 TALLOC_FREE(subreq);
2490 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2491 state->destnode, ret);
2492 tevent_req_error(req, ret);
2496 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2498 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2499 tevent_req_error(req, ret);
2503 state->nlist = node_list_init(state, nodemap->num);
2504 if (tevent_req_nomem(state->nlist, req)) {
2508 for (i=0; i<nodemap->num; i++) {
2511 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2515 ok = node_list_add(state->nlist, nodemap->node[i].pnn);
2517 tevent_req_error(req, EINVAL);
2522 talloc_free(nodemap);
2525 /* Verify flags by getting local node information from each node */
2526 ctdb_req_control_get_nodemap(&request);
2527 subreq = ctdb_client_control_multi_send(state,
2530 state->nlist->pnn_list,
2531 state->nlist->count,
2534 if (tevent_req_nomem(subreq, req)) {
2537 tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
2540 static void recovery_nodemap_verify(struct tevent_req *subreq)
2542 struct tevent_req *req = tevent_req_callback_data(
2543 subreq, struct tevent_req);
2544 struct recovery_state *state = tevent_req_data(
2545 req, struct recovery_state);
2546 struct ctdb_req_control request;
2547 struct ctdb_reply_control **reply;
2548 struct node_list *nlist;
2554 status = ctdb_client_control_multi_recv(subreq,
2559 TALLOC_FREE(subreq);
2564 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2565 state->nlist->count,
2569 D_ERR("control GET_NODEMAP failed on node %u,"
2570 " ret=%d\n", pnn, ret2);
2572 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2574 tevent_req_error(req, ret);
2578 nlist = node_list_init(state, state->nlist->size);
2579 if (tevent_req_nomem(nlist, req)) {
2583 for (i=0; i<state->nlist->count; i++) {
2584 struct ctdb_node_map *nodemap = NULL;
2585 uint32_t pnn, flags;
2589 pnn = state->nlist->pnn_list[i];
2590 ret = ctdb_reply_control_get_nodemap(reply[i],
2594 D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
2595 tevent_req_error(req, EPROTO);
2599 flags = NODE_FLAGS_DISCONNECTED;
2600 for (j=0; j<nodemap->num; j++) {
2601 if (nodemap->node[j].pnn == pnn) {
2602 flags = nodemap->node[j].flags;
2607 TALLOC_FREE(nodemap);
2609 if (flags & NODE_FLAGS_INACTIVE) {
2613 ok = node_list_add(nlist, pnn);
2615 tevent_req_error(req, EINVAL);
2622 talloc_free(state->nlist);
2623 state->nlist = nlist;
2625 ctdb_req_control_get_capabilities(&request);
2626 subreq = ctdb_client_control_multi_send(state,
2629 state->nlist->pnn_list,
2630 state->nlist->count,
2633 if (tevent_req_nomem(subreq, req)) {
2636 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2639 static void recovery_capabilities_done(struct tevent_req *subreq)
2641 struct tevent_req *req = tevent_req_callback_data(
2642 subreq, struct tevent_req);
2643 struct recovery_state *state = tevent_req_data(
2644 req, struct recovery_state);
2645 struct ctdb_reply_control **reply;
2646 struct ctdb_req_control request;
2652 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2654 TALLOC_FREE(subreq);
2659 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2660 state->nlist->count,
2664 D_ERR("control GET_CAPABILITIES failed on node %u,"
2665 " ret=%d\n", pnn, ret2);
2667 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2670 tevent_req_error(req, ret);
2674 for (i=0; i<state->nlist->count; i++) {
2677 ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
2679 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2680 state->nlist->pnn_list[i]);
2681 tevent_req_error(req, EPROTO);
2685 state->nlist->caps[i] = caps;
2690 ctdb_req_control_get_dbmap(&request);
2691 subreq = ctdb_client_control_multi_send(state,
2694 state->nlist->pnn_list,
2695 state->nlist->count,
2698 if (tevent_req_nomem(subreq, req)) {
2701 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2704 static void recovery_dbmap_done(struct tevent_req *subreq)
2706 struct tevent_req *req = tevent_req_callback_data(
2707 subreq, struct tevent_req);
2708 struct recovery_state *state = tevent_req_data(
2709 req, struct recovery_state);
2710 struct ctdb_reply_control **reply;
2711 struct ctdb_req_control request;
2717 status = ctdb_client_control_multi_recv(subreq,
2722 TALLOC_FREE(subreq);
2727 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2728 state->nlist->count,
2732 D_ERR("control GET_DBMAP failed on node %u,"
2733 " ret=%d\n", pnn, ret2);
2735 D_ERR("control GET_DBMAP failed, ret=%d\n",
2738 tevent_req_error(req, ret);
2742 state->dblist = db_list_init(state, state->nlist->count);
2743 if (tevent_req_nomem(state->dblist, req)) {
2744 D_ERR("memory allocation error\n");
2748 for (i = 0; i < state->nlist->count; i++) {
2749 struct ctdb_dbid_map *dbmap = NULL;
2752 pnn = state->nlist->pnn_list[i];
2754 ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
2756 D_ERR("control GET_DBMAP failed on node %u\n",
2758 tevent_req_error(req, EPROTO);
2762 for (j = 0; j < dbmap->num; j++) {
2763 ret = db_list_check_and_add(state->dblist,
2764 dbmap->dbs[j].db_id,
2765 dbmap->dbs[j].flags,
2768 D_ERR("failed to add database list entry, "
2771 tevent_req_error(req, ret);
2779 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2780 subreq = ctdb_client_control_multi_send(state,
2783 state->nlist->pnn_list,
2784 state->nlist->count,
2787 if (tevent_req_nomem(subreq, req)) {
2790 tevent_req_set_callback(subreq, recovery_active_done, req);
2793 static void recovery_active_done(struct tevent_req *subreq)
2795 struct tevent_req *req = tevent_req_callback_data(
2796 subreq, struct tevent_req);
2797 struct recovery_state *state = tevent_req_data(
2798 req, struct recovery_state);
2799 struct ctdb_req_control request;
2800 struct ctdb_vnn_map *vnnmap;
2805 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2807 TALLOC_FREE(subreq);
2812 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2813 state->nlist->count,
2817 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2818 " ret=%d\n", pnn, ret2);
2820 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2823 tevent_req_error(req, ret);
2827 D_ERR("Set recovery mode to ACTIVE\n");
2829 /* Calculate new VNNMAP */
2830 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2831 if (tevent_req_nomem(vnnmap, req)) {
2835 vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
2836 if (tevent_req_nomem(vnnmap->map, req)) {
2840 if (vnnmap->size == 0) {
2841 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2842 vnnmap->map[0] = state->destnode;
2846 vnnmap->generation = state->generation;
2848 state->vnnmap = vnnmap;
2850 ctdb_req_control_start_recovery(&request);
2851 subreq = ctdb_client_control_multi_send(state,
2854 state->nlist->pnn_list,
2855 state->nlist->count,
2858 if (tevent_req_nomem(subreq, req)) {
2861 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2864 static void recovery_start_recovery_done(struct tevent_req *subreq)
2866 struct tevent_req *req = tevent_req_callback_data(
2867 subreq, struct tevent_req);
2868 struct recovery_state *state = tevent_req_data(
2869 req, struct recovery_state);
2870 struct ctdb_req_control request;
2875 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2877 TALLOC_FREE(subreq);
2882 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2883 state->nlist->count,
2887 D_ERR("failed to run start_recovery event on node %u,"
2888 " ret=%d\n", pnn, ret2);
2890 D_ERR("failed to run start_recovery event, ret=%d\n",
2893 tevent_req_error(req, ret);
2897 D_ERR("start_recovery event finished\n");
2899 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2900 subreq = ctdb_client_control_multi_send(state,
2903 state->nlist->pnn_list,
2904 state->nlist->count,
2907 if (tevent_req_nomem(subreq, req)) {
2910 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2913 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2915 struct tevent_req *req = tevent_req_callback_data(
2916 subreq, struct tevent_req);
2917 struct recovery_state *state = tevent_req_data(
2918 req, struct recovery_state);
2923 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2925 TALLOC_FREE(subreq);
2930 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2931 state->nlist->count,
2935 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2938 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2940 tevent_req_error(req, ret);
2944 D_NOTICE("updated VNNMAP\n");
2946 subreq = db_recovery_send(state,
2952 state->vnnmap->generation);
2953 if (tevent_req_nomem(subreq, req)) {
2956 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2959 static void recovery_db_recovery_done(struct tevent_req *subreq)
2961 struct tevent_req *req = tevent_req_callback_data(
2962 subreq, struct tevent_req);
2963 struct recovery_state *state = tevent_req_data(
2964 req, struct recovery_state);
2965 struct ctdb_req_control request;
2969 status = db_recovery_recv(subreq, &count);
2970 TALLOC_FREE(subreq);
2972 D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
2975 subreq = ban_node_send(state,
2980 if (tevent_req_nomem(subreq, req)) {
2983 tevent_req_set_callback(subreq, recovery_failed_done, req);
2987 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2988 subreq = ctdb_client_control_multi_send(state,
2991 state->nlist->pnn_list,
2992 state->nlist->count,
2995 if (tevent_req_nomem(subreq, req)) {
2998 tevent_req_set_callback(subreq, recovery_normal_done, req);
3001 static void recovery_failed_done(struct tevent_req *subreq)
3003 struct tevent_req *req = tevent_req_callback_data(
3004 subreq, struct tevent_req);
3008 status = ban_node_recv(subreq, &ret);
3009 TALLOC_FREE(subreq);
3011 D_ERR("failed to ban node, ret=%d\n", ret);
3014 tevent_req_error(req, EIO);
3017 static void recovery_normal_done(struct tevent_req *subreq)
3019 struct tevent_req *req = tevent_req_callback_data(
3020 subreq, struct tevent_req);
3021 struct recovery_state *state = tevent_req_data(
3022 req, struct recovery_state);
3023 struct ctdb_req_control request;
3028 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3030 TALLOC_FREE(subreq);
3035 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3036 state->nlist->count,
3040 D_ERR("failed to set recovery mode NORMAL on node %u,"
3041 " ret=%d\n", pnn, ret2);
3043 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3046 tevent_req_error(req, ret);
3050 D_ERR("Set recovery mode to NORMAL\n");
3052 ctdb_req_control_end_recovery(&request);
3053 subreq = ctdb_client_control_multi_send(state,
3056 state->nlist->pnn_list,
3057 state->nlist->count,
3060 if (tevent_req_nomem(subreq, req)) {
3063 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
3066 static void recovery_end_recovery_done(struct tevent_req *subreq)
3068 struct tevent_req *req = tevent_req_callback_data(
3069 subreq, struct tevent_req);
3070 struct recovery_state *state = tevent_req_data(
3071 req, struct recovery_state);
3076 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3078 TALLOC_FREE(subreq);
3083 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3084 state->nlist->count,
3088 D_ERR("failed to run recovered event on node %u,"
3089 " ret=%d\n", pnn, ret2);
3091 D_ERR("failed to run recovered event, ret=%d\n", ret);
3093 tevent_req_error(req, ret);
3097 D_ERR("recovered event finished\n");
3099 tevent_req_done(req);
3102 static void recovery_recv(struct tevent_req *req, int *perr)
3104 generic_recv(req, perr);
3107 static void usage(const char *progname)
3109 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3115 * Arguments - log fd, write fd, socket path, generation
3117 int main(int argc, char *argv[])
3120 const char *sockpath;
3121 TALLOC_CTX *mem_ctx = NULL;
3122 struct tevent_context *ev;
3123 struct ctdb_client_context *client;
3126 struct tevent_req *req;
3127 uint32_t generation;
3134 write_fd = atoi(argv[1]);
3136 generation = (uint32_t)smb_strtoul(argv[3],
3142 fprintf(stderr, "recovery: unable to initialize generation\n");
3146 mem_ctx = talloc_new(NULL);
3147 if (mem_ctx == NULL) {
3148 fprintf(stderr, "recovery: talloc_new() failed\n");
3152 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
3154 fprintf(stderr, "recovery: Unable to initialize logging\n");
3158 ev = tevent_context_init(mem_ctx);
3160 D_ERR("tevent_context_init() failed\n");
3164 status = logging_setup_sighup_handler(ev, mem_ctx, NULL, NULL);
3166 D_ERR("logging_setup_sighup_handler() failed\n");
3170 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
3172 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
3176 req = recovery_send(mem_ctx, ev, client, generation);
3178 D_ERR("database_recover_send() failed\n");
3182 if (! tevent_req_poll(req, ev)) {
3183 D_ERR("tevent_req_poll() failed\n");
3187 recovery_recv(req, &ret);
3190 D_ERR("database recovery failed, ret=%d\n", ret);
3194 sys_write(write_fd, &ret, sizeof(ret));
3198 TALLOC_FREE(mem_ctx);