*/
#include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
#include "system/filesys.h"
#include "system/wait.h"
#include "db_wrap.h"
struct ctdb_persistent_state {
struct ctdb_context *ctdb;
+ struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
+ struct ctdb_client *client; /* used by trans3_commit */
struct ctdb_req_control *c;
const char *errormsg;
uint32_t num_pending;
{
struct ctdb_persistent_state *state = talloc_get_type(private_data,
struct ctdb_persistent_state);
+ enum ctdb_trans2_commit_error etype;
+
+ if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+ DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
+ "during recovery\n"));
+ return;
+ }
if (status != 0) {
DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
- status, errormsg));
+ status, errormsg?errormsg:"no error message given"));
state->status = status;
state->errormsg = errormsg;
state->num_failed++;
+
+ /*
+ * If a node failed to complete the update_record control,
+ * then either a recovery is already running or something
+ * bad is going on. So trigger a recovery and let the
+ * recovery finish the transaction, sending back the reply
+ * for the trans3_commit control to the client.
+ */
+ ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+ return;
}
+
state->num_pending--;
- if (state->num_pending == 0) {
- enum ctdb_trans2_commit_error etype;
- if (state->num_failed == state->num_sent) {
- etype = CTDB_TRANS2_COMMIT_ALLFAIL;
- } else if (state->num_failed != 0) {
- etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
- } else {
- etype = CTDB_TRANS2_COMMIT_SUCCESS;
- }
- ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
- talloc_free(state);
+
+ if (state->num_pending != 0) {
+ return;
+ }
+
+ if (state->num_failed == state->num_sent) {
+ etype = CTDB_TRANS2_COMMIT_ALLFAIL;
+ } else if (state->num_failed != 0) {
+ etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
+ } else {
+ etype = CTDB_TRANS2_COMMIT_SUCCESS;
}
+
+ ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
+ talloc_free(state);
}
/*
struct timeval t, void *private_data)
{
struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
-
+
+ if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+ DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
+ "timeout during recovery\n"));
+ return;
+ }
+
ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
"timeout in ctdb_persistent_state");
talloc_free(state);
}
+/**
+ * Finish pending trans3 commit controls, i.e. send
+ * reply to the client. This is called by the end-recovery
+ * control to fix the situation when a recovery interrupts
+ * the usual porgress of a transaction.
+ */
+void ctdb_persistent_finish_trans3_commits(struct ctdb_context *ctdb)
+{
+ struct ctdb_db_context *ctdb_db;
+
+ if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+ DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
+ "timeout during recovery\n"));
+ return;
+ }
+
+ for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+ struct ctdb_persistent_state *state;
+
+ if (ctdb_db->persistent_state == NULL) {
+ continue;
+ }
+
+ state = ctdb_db->persistent_state;
+
+ ctdb_request_control_reply(ctdb, state->c, NULL,
+ CTDB_TRANS2_COMMIT_SOMEFAIL,
+ "trans3 commit ended by recovery");
+
+ /* The destructor sets ctdb_db->persistent_state to NULL. */
+ talloc_free(state);
+ }
+}
+
/*
store a set of persistent records - called from a ctdb client when it has updated
some records in a persistent database. The client will have the record
struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
struct ctdb_db_context *ctdb_db;
- if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
- DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
- return -1;
- }
-
ctdb_db = find_ctdb_db(ctdb, m->db_id);
if (ctdb_db == NULL) {
DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
return -1;
}
+ if (ctdb_db->unhealthy_reason) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+
/* handling num_persistent_updates is a bit strange -
there are 3 cases
1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
break;
}
+ if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+ DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
+ return -1;
+ }
+
state = talloc_zero(ctdb, struct ctdb_persistent_state);
CTDB_NO_MEMORY(ctdb, state);
return 0;
}
+static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
+{
+ if (state->client != NULL) {
+ state->client->db_id = 0;
+ }
+
+ if (state->ctdb_db != NULL) {
+ state->ctdb_db->persistent_state = NULL;
+ }
+
+ return 0;
+}
+
+/*
+ * Store a set of persistent records.
+ * This is used to roll out a transaction to all nodes.
+ */
+int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ TDB_DATA recdata, bool *async_reply)
+{
+ struct ctdb_client *client;
+ struct ctdb_persistent_state *state;
+ int i;
+ struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
+ struct ctdb_db_context *ctdb_db;
+
+ if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+ DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
+ return -1;
+ }
+
+ client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
+ "to a client. Returning error\n"));
+ return -1;
+ }
+
+ if (client->db_id != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ERROR: trans3_commit: "
+ "client-db_id[0x%08x] != 0 "
+ "(client_id[0x%08x]): trans3_commit active?\n",
+ client->db_id, client->client_id));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, m->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
+ "Unknown database db_id[0x%08x]\n", m->db_id));
+ return -1;
+ }
+
+ if (ctdb_db->persistent_state != NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Error: "
+ "ctdb_control_trans3_commit "
+ "called while a transaction commit is "
+ "active. db_id[0x%08x]\n", m->db_id));
+ return -1;
+ }
+
+ ctdb_db->persistent_state = talloc_zero(ctdb_db,
+ struct ctdb_persistent_state);
+ CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
+
+ client->db_id = m->db_id;
+
+ state = ctdb_db->persistent_state;
+ state->ctdb = ctdb;
+ state->ctdb_db = ctdb_db;
+ state->c = c;
+ state->client = client;
+
+ talloc_set_destructor(state, ctdb_persistent_state_destructor);
+
+ for (i = 0; i < ctdb->vnn_map->size; i++) {
+ struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
+ int ret;
+
+ /* only send to active nodes */
+ if (node->flags & NODE_FLAGS_INACTIVE) {
+ continue;
+ }
+
+ ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
+ CTDB_CONTROL_UPDATE_RECORD,
+ c->client_id, 0, recdata,
+ ctdb_persistent_callback,
+ state);
+ if (ret == -1) {
+ DEBUG(DEBUG_ERR,("Unable to send "
+ "CTDB_CONTROL_UPDATE_RECORD "
+ "to pnn %u\n", node->pnn));
+ talloc_free(state);
+ return -1;
+ }
+
+ state->num_pending++;
+ state->num_sent++;
+ }
+
+ if (state->num_pending == 0) {
+ talloc_free(state);
+ return 0;
+ }
+
+ /* we need to wait for the replies */
+ *async_reply = true;
+
+ /* need to keep the control structure around */
+ talloc_steal(state, c);
+
+ /* but we won't wait forever */
+ event_add_timed(ctdb->ev, state,
+ timeval_current_ofs(ctdb->tunable.control_timeout, 0),
+ ctdb_persistent_store_timeout, state);
+
+ return 0;
+}
+
struct ctdb_persistent_write_state {
struct ctdb_db_context *ctdb_db;
static int childwrite_destructor(struct childwrite_handle *h)
{
- h->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
kill(h->child, SIGKILL);
return 0;
}
int ret;
char c;
- ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
- h->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
+ CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
/* the handle needs to go away when the context is gone - when
the handle goes away this implicitly closes the pipe, which
int ret;
pid_t parent = getpid();
- ctdb_db->ctdb->statistics.childwrite_calls++;
- ctdb_db->ctdb->statistics.pending_childwrite_calls++;
+ CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
+ CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
if (!(result = talloc_zero(state, struct childwrite_handle))) {
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
if (ret != 0) {
talloc_free(result);
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
- result->child = fork();
+ result->child = ctdb_fork(ctdb_db->ctdb);
if (result->child == (pid_t)-1) {
close(result->fd[0]);
close(result->fd[1]);
talloc_free(result);
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
char c = 0;
close(result->fd[0]);
+ debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
ret = ctdb_persistent_store(state);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
talloc_set_destructor(result, childwrite_destructor);
- DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
+ DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
- EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
+ EVENT_FD_READ, childwrite_handler,
(void *)result);
if (result->fde == NULL) {
talloc_free(result);
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
+ tevent_fd_set_auto_close(result->fde);
result->start_time = timeval_current();
return -1;
}
+ if (ctdb_db->unhealthy_reason) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+
state = talloc(ctdb, struct ctdb_persistent_write_state);
CTDB_NO_MEMORY(ctdb, state);
return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
}
+static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
+ uint32_t db_id,
+ uint64_t *seqnum)
+{
+ int32_t ret;
+ struct ctdb_db_context *ctdb_db;
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key;
+ TDB_DATA data;
+ TALLOC_CTX *mem_ctx = talloc_new(ctdb);
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
+ ret = -1;
+ goto done;
+ }
+
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+ ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
+ if (ret != 0) {
+ goto done;
+ }
+
+ if (data.dsize != sizeof(uint64_t)) {
+ *seqnum = 0;
+ goto done;
+ }
+
+ *seqnum = *(uint64_t *)data.dptr;
+
+done:
+ talloc_free(mem_ctx);
+ return ret;
+}
+
+/**
+ * Get the sequence number of a persistent database.
+ */
+int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
+ TDB_DATA indata,
+ TDB_DATA *outdata)
+{
+ uint32_t db_id;
+ int32_t ret;
+ uint64_t seqnum;
+
+ db_id = *(uint32_t *)indata.dptr;
+ ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
+ if (ret != 0) {
+ goto done;
+ }
+
+ outdata->dsize = sizeof(uint64_t);
+ outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
+ if (outdata->dptr == NULL) {
+ ret = -1;
+ goto done;
+ }
+
+ *(outdata->dptr) = seqnum;
+
+done:
+ return ret;
+}