}
state.num_buffers += 1;
+ LOG("Wrote %d buffers of recovery records for %s\n",
+ state.num_buffers, recdb_name(recdb));
+
return state.num_buffers;
}
return generic_recv(req, perr);
}
+/*
+ * Push database to specified nodes (old style)
+ */
+
+struct push_database_old_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct recdb_context *recdb;
+ uint32_t *pnn_list;
+ int count;
+ struct ctdb_rec_buffer *recbuf;
+ int index;
+};
+
+static void push_database_old_push_done(struct tevent_req *subreq);
+
+static struct tevent_req *push_database_old_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t *pnn_list, int count,
+ struct recdb_context *recdb)
+{
+ struct tevent_req *req, *subreq;
+ struct push_database_old_state *state;
+ struct ctdb_req_control request;
+ uint32_t pnn;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct push_database_old_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->recdb = recdb;
+ state->pnn_list = pnn_list;
+ state->count = count;
+ state->index = 0;
+
+ state->recbuf = recdb_records(recdb, state,
+ ctdb_client_pnn(client));
+ if (tevent_req_nomem(state->recbuf, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ pnn = state->pnn_list[state->index];
+
+ ctdb_req_control_push_db(&request, state->recbuf);
+ subreq = ctdb_client_control_send(state, ev, client, pnn,
+ TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, push_database_old_push_done, req);
+
+ return req;
+}
+
+static void push_database_old_push_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_old_state *state = tevent_req_data(
+ req, struct push_database_old_state);
+ struct ctdb_req_control request;
+ uint32_t pnn;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
+ recdb_name(state->recdb), state->pnn_list[state->index],
+ ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->index += 1;
+ if (state->index == state->count) {
+ TALLOC_FREE(state->recbuf);
+ tevent_req_done(req);
+ return;
+ }
+
+ pnn = state->pnn_list[state->index];
+
+ ctdb_req_control_push_db(&request, state->recbuf);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ pnn, TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, push_database_old_push_done, req);
+}
+
+static bool push_database_old_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
+/*
+ * Push database to specified nodes (new style)
+ */
+
+struct push_database_new_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct recdb_context *recdb;
+ uint32_t *pnn_list;
+ int count;
+ uint64_t srvid;
+ uint32_t dmaster;
+ int fd;
+ int num_buffers;
+ int num_buffers_sent;
+ int num_records;
+};
+
+static void push_database_new_started(struct tevent_req *subreq);
+static void push_database_new_send_msg(struct tevent_req *req);
+static void push_database_new_send_done(struct tevent_req *subreq);
+static void push_database_new_confirmed(struct tevent_req *subreq);
+
+static struct tevent_req *push_database_new_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t *pnn_list, int count,
+ struct recdb_context *recdb,
+ int max_size)
+{
+ struct tevent_req *req, *subreq;
+ struct push_database_new_state *state;
+ struct ctdb_req_control request;
+ struct ctdb_pulldb_ext pulldb_ext;
+ char *filename;
+ off_t offset;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct push_database_new_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->recdb = recdb;
+ state->pnn_list = pnn_list;
+ state->count = count;
+
+ state->srvid = srvid_next();
+ state->dmaster = ctdb_client_pnn(client);
+ state->num_buffers_sent = 0;
+ state->num_records = 0;
+
+ filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
+ if (tevent_req_nomem(filename, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ state->fd = open(filename, O_RDWR|O_CREAT, 0644);
+ if (state->fd == -1) {
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, ev);
+ }
+ unlink(filename);
+ talloc_free(filename);
+
+ state->num_buffers = recdb_file(recdb, state, state->dmaster,
+ state->fd, max_size);
+ if (state->num_buffers == -1) {
+ tevent_req_error(req, ENOMEM);
+ return tevent_req_post(req, ev);
+ }
+
+ offset = lseek(state->fd, 0, SEEK_SET);
+ if (offset != 0) {
+ tevent_req_error(req, EIO);
+ return tevent_req_post(req, ev);
+ }
+
+ pulldb_ext.db_id = recdb_id(recdb);
+ pulldb_ext.srvid = state->srvid;
+
+ ctdb_req_control_db_push_start(&request, &pulldb_ext);
+ subreq = ctdb_client_control_multi_send(state, ev, client,
+ pnn_list, count,
+ TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, push_database_new_started, req);
+
+ return req;
+}
+
+static void push_database_new_started(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_new_state *state = tevent_req_data(
+ req, struct push_database_new_state);
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state,
+ &err_list, NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->pnn_list,
+ state->count,
+ err_list, &pnn);
+ if (ret2 != 0) {
+ LOG("control DB_PUSH_START failed for db %s "
+ "on node %u, ret=%d\n",
+ recdb_name(state->recdb), pnn, ret2);
+ } else {
+ LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
+ recdb_name(state->recdb), ret);
+ }
+ talloc_free(err_list);
+
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ push_database_new_send_msg(req);
+}
+
+static void push_database_new_send_msg(struct tevent_req *req)
+{
+ struct push_database_new_state *state = tevent_req_data(
+ req, struct push_database_new_state);
+ struct tevent_req *subreq;
+ struct ctdb_rec_buffer *recbuf;
+ struct ctdb_req_message message;
+ TDB_DATA data;
+ int ret;
+
+ if (state->num_buffers_sent == state->num_buffers) {
+ struct ctdb_req_control request;
+
+ ctdb_req_control_db_push_confirm(&request,
+ recdb_id(state->recdb));
+ subreq = ctdb_client_control_multi_send(state, state->ev,
+ state->client,
+ state->pnn_list,
+ state->count,
+ TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, push_database_new_confirmed,
+ req);
+ return;
+ }
+
+ ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ data.dsize = ctdb_rec_buffer_len(recbuf);
+ data.dptr = talloc_size(state, data.dsize);
+ if (tevent_req_nomem(data.dptr, req)) {
+ return;
+ }
+
+ ctdb_rec_buffer_push(recbuf, data.dptr);
+
+ message.srvid = state->srvid;
+ message.data.data = data;
+
+ LOG("Pushing buffer %d with %d records for %s\n",
+ state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
+
+ subreq = ctdb_client_message_multi_send(state, state->ev,
+ state->client,
+ state->pnn_list, state->count,
+ &message);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, push_database_new_send_done, req);
+
+ state->num_records += recbuf->count;
+
+ talloc_free(data.dptr);
+ talloc_free(recbuf);
+}
+
+static void push_database_new_send_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_new_state *state = tevent_req_data(
+ req, struct push_database_new_state);
+ bool status;
+ int ret;
+
+ status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ LOG("Sending recovery records failed for %s\n",
+ recdb_name(state->recdb));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->num_buffers_sent += 1;
+
+ push_database_new_send_msg(req);
+}
+
+static void push_database_new_confirmed(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_new_state *state = tevent_req_data(
+ req, struct push_database_new_state);
+ struct ctdb_reply_control **reply;
+ int *err_list;
+ bool status;
+ int ret, i;
+ uint32_t num_records;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state,
+ &err_list, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->pnn_list,
+ state->count, err_list,
+ &pnn);
+ if (ret2 != 0) {
+ LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
+ " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
+ } else {
+ LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
+ recdb_name(state->recdb), ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ for (i=0; i<state->count; i++) {
+ ret = ctdb_reply_control_db_push_confirm(reply[i],
+ &num_records);
+ if (ret != 0) {
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ if (num_records != state->num_records) {
+ LOG("Node %u received %d of %d records for %s\n",
+ state->pnn_list[i], num_records,
+ state->num_records, recdb_name(state->recdb));
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+ }
+
+ talloc_free(reply);
+
+ LOG("Pushed %d records for db %s\n",
+ state->num_records, recdb_name(state->recdb));
+
+ tevent_req_done(req);
+}
+
+static bool push_database_new_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
+/*
+ * wrapper for push_database_old and push_database_new
+ */
+
+struct push_database_state {
+ bool old_done, new_done;
+};
+
+static void push_database_old_done(struct tevent_req *subreq);
+static void push_database_new_done(struct tevent_req *subreq);
+
+static struct tevent_req *push_database_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t *pnn_list, int count, uint32_t *caps,
+ struct ctdb_tunable_list *tun_list,
+ struct recdb_context *recdb)
+{
+ struct tevent_req *req, *subreq;
+ struct push_database_state *state;
+ uint32_t *old_list, *new_list;
+ int old_count, new_count;
+ int i;
+
+ req = tevent_req_create(mem_ctx, &state, struct push_database_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->old_done = false;
+ state->new_done = false;
+
+ old_count = 0;
+ new_count = 0;
+ old_list = talloc_array(state, uint32_t, count);
+ new_list = talloc_array(state, uint32_t, count);
+ if (tevent_req_nomem(old_list, req) ||
+ tevent_req_nomem(new_list,req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ for (i=0; i<count; i++) {
+ uint32_t pnn = pnn_list[i];
+
+ if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
+ new_list[new_count] = pnn;
+ new_count += 1;
+ } else {
+ old_list[old_count] = pnn;
+ old_count += 1;
+ }
+ }
+
+ if (old_count > 0) {
+ subreq = push_database_old_send(state, ev, client,
+ old_list, old_count, recdb);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, push_database_old_done, req);
+ } else {
+ state->old_done = true;
+ }
+
+ if (new_count > 0) {
+ subreq = push_database_new_send(state, ev, client,
+ new_list, new_count, recdb,
+ tun_list->rec_buffer_size_limit);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, push_database_new_done, req);
+ } else {
+ state->new_done = true;
+ }
+
+ return req;
+}
+
+static void push_database_old_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_state *state = tevent_req_data(
+ req, struct push_database_state);
+ bool status;
+ int ret;
+
+ status = push_database_old_recv(subreq, &ret);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->old_done = true;
+
+ if (state->old_done && state->new_done) {
+ tevent_req_done(req);
+ }
+}
+
+static void push_database_new_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_state *state = tevent_req_data(
+ req, struct push_database_state);
+ bool status;
+ int ret;
+
+ status = push_database_new_recv(subreq, &ret);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->new_done = true;
+
+ if (state->old_done && state->new_done) {
+ tevent_req_done(req);
+ }
+}
+
+static bool push_database_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
/*
* Collect databases using highest sequence number
*/
const char *db_name, *db_path;
struct recdb_context *recdb;
- struct ctdb_rec_buffer *recbuf;
-
};
static void recover_db_name_done(struct tevent_req *subreq);
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
- struct ctdb_req_control request;
int *err_list;
int ret;
bool status;
return;
}
- state->recbuf = recdb_records(state->recdb, state, state->destnode);
- if (tevent_req_nomem(state->recbuf, req)) {
- return;
- }
-
- TALLOC_FREE(state->recdb);
-
- ctdb_req_control_push_db(&request, state->recbuf);
- subreq = ctdb_client_control_multi_send(state, state->ev,
- state->client,
- state->pnn_list, state->count,
- TIMEOUT(), &request);
+ subreq = push_database_send(state, state->ev, state->client,
+ state->pnn_list, state->count,
+ state->caps, state->tun_list,
+ state->recdb);
if (tevent_req_nomem(subreq, req)) {
return;
}
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_req_control request;
- int *err_list;
int ret;
bool status;
- status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
- NULL);
+ status = push_database_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
- int ret2;
- uint32_t pnn;
-
- ret2 = ctdb_client_control_multi_error(state->pnn_list,
- state->count,
- err_list, &pnn);
- if (ret2 != 0) {
- LOG("control PUSHDB failed for db %s on node %u,"
- " ret=%d\n", state->db_name, pnn, ret2);
- } else {
- LOG("control PUSHDB failed for db %s, ret=%d\n",
- state->db_name, ret);
- }
tevent_req_error(req, ret);
return;
}
- TALLOC_FREE(state->recbuf);
+ TALLOC_FREE(state->recdb);
ctdb_req_control_db_transaction_commit(&request, &state->transdb);
subreq = ctdb_client_control_multi_send(state, state->ev,