*/
#include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
#include "system/filesys.h"
#include "system/wait.h"
#include "db_wrap.h"
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
struct ctdb_persistent_state *state;
int i;
+ struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
+ struct ctdb_db_context *ctdb_db;
+
+ ctdb_db = find_ctdb_db(ctdb, m->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
+ "Unknown database db_id[0x%08x]\n", m->db_id));
+ return -1;
+ }
if (client == NULL) {
DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
return -1;
}
+ if (ctdb_db->unhealthy_reason) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+
/* handling num_persistent_updates is a bit strange -
there are 3 cases
1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
*/
switch (c->opcode) {
case CTDB_CONTROL_PERSISTENT_STORE:
+ if (ctdb_db->transaction_active) {
+ DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
+ "transaction is active on database "
+ "db_id[0x%08x] - refusing persistent "
+ " store for client id[0x%08x]\n",
+ ctdb_db->db_id, client->client_id));
+ return -1;
+ }
if (client->num_persistent_updates > 0) {
client->num_persistent_updates--;
}
break;
case CTDB_CONTROL_TRANS2_COMMIT:
+ if (ctdb_db->transaction_active) {
+ DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
+ " already a transaction commit "
+ "active on db_id[0x%08x] - forbidding "
+ "client_id[0x%08x] to commit\n",
+ ctdb_db->db_id, client->client_id));
+ return -1;
+ }
+ if (client->db_id != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
+ "client-db_id[0x%08x] != 0 "
+ "(client_id[0x%08x])\n",
+ client->db_id, client->client_id));
+ return -1;
+ }
client->num_persistent_updates++;
+ ctdb_db->transaction_active = true;
+ client->db_id = m->db_id;
+ DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
+ " commit transaction on db id[0x%08x]\n",
+ client->client_id, client->db_id));
break;
case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
/* already updated from the first commit */
+ if (client->db_id != m->db_id) {
+ DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
+ "retry: client-db_id[0x%08x] != "
+ "db_id[0x%08x] (client_id[0x%08x])\n",
+ client->db_id,
+ m->db_id, client->client_id));
+ return -1;
+ }
+ DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
+ "transaction commit retry on "
+ "db_id[0x%08x]\n",
+ client->client_id, client->db_id));
break;
}
+ if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+ DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
+ return -1;
+ }
+
state = talloc_zero(ctdb, struct ctdb_persistent_state);
CTDB_NO_MEMORY(ctdb, state);
}
+/*
+ * Store a set of persistent records.
+ * This is used to roll out a transaction to all nodes.
+ */
+int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ TDB_DATA recdata, bool *async_reply)
+{
+ struct ctdb_client *client;
+ struct ctdb_persistent_state *state;
+ int i;
+ struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
+ struct ctdb_db_context *ctdb_db;
+
+ if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+ DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, m->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
+ "Unknown database db_id[0x%08x]\n", m->db_id));
+ return -1;
+ }
+
+ client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
+ "to a client. Returning error\n"));
+ return -1;
+ }
+
+ state = talloc_zero(ctdb, struct ctdb_persistent_state);
+ CTDB_NO_MEMORY(ctdb, state);
+
+ state->ctdb = ctdb;
+ state->c = c;
+
+ for (i = 0; i < ctdb->vnn_map->size; i++) {
+ struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
+ int ret;
+
+ /* only send to active nodes */
+ if (node->flags & NODE_FLAGS_INACTIVE) {
+ continue;
+ }
+
+ ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
+ CTDB_CONTROL_UPDATE_RECORD,
+ c->client_id, 0, recdata,
+ ctdb_persistent_callback,
+ state);
+ if (ret == -1) {
+ DEBUG(DEBUG_ERR,("Unable to send "
+ "CTDB_CONTROL_UPDATE_RECORD "
+ "to pnn %u\n", node->pnn));
+ talloc_free(state);
+ return -1;
+ }
+
+ state->num_pending++;
+ state->num_sent++;
+ }
+
+ if (state->num_pending == 0) {
+ talloc_free(state);
+ return 0;
+ }
+
+ /* we need to wait for the replies */
+ *async_reply = true;
+
+ /* need to keep the control structure around */
+ talloc_steal(state, c);
+
+ /* but we won't wait forever */
+ event_add_timed(ctdb->ev, state,
+ timeval_current_ofs(ctdb->tunable.control_timeout, 0),
+ ctdb_persistent_store_timeout, state);
+
+ return 0;
+}
+
+
struct ctdb_persistent_write_state {
struct ctdb_db_context *ctdb_db;
struct ctdb_marshall_buffer *m;
if (ret != 0) {
DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
- return -1;
+ goto failed;
}
}
static int childwrite_destructor(struct childwrite_handle *h)
{
- h->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
kill(h->child, SIGKILL);
return 0;
}
int ret;
char c;
- ctdb_latency(&h->ctdb->statistics.max_childwrite_latency, h->start_time);
- h->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
+ CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
/* the handle needs to go away when the context is gone - when
the handle goes away this implicitly closes the pipe, which
int ret;
pid_t parent = getpid();
- ctdb_db->ctdb->statistics.childwrite_calls++;
- ctdb_db->ctdb->statistics.pending_childwrite_calls++;
+ CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
+ CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
if (!(result = talloc_zero(state, struct childwrite_handle))) {
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
if (ret != 0) {
talloc_free(result);
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
close(result->fd[0]);
close(result->fd[1]);
talloc_free(result);
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
char c = 0;
close(result->fd[0]);
+ debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
ret = ctdb_persistent_store(state);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
}
close(result->fd[1]);
+ set_close_on_exec(result->fd[0]);
+
talloc_set_destructor(result, childwrite_destructor);
+ DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
+
result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
- EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
+ EVENT_FD_READ, childwrite_handler,
(void *)result);
if (result->fde == NULL) {
talloc_free(result);
- ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+ CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
return NULL;
}
+ tevent_fd_set_auto_close(result->fde);
result->start_time = timeval_current();
return -1;
}
+ if (ctdb_db->unhealthy_reason) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+
state = talloc(ctdb, struct ctdb_persistent_write_state);
CTDB_NO_MEMORY(ctdb, state);
struct ctdb_req_control *c)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+ struct ctdb_db_context *ctdb_db;
+
+ ctdb_db = find_ctdb_db(ctdb, client->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
+ "Unknown database 0x%08x\n", client->db_id));
+ return -1;
+ }
+ if (!ctdb_db->transaction_active) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
+ "Database 0x%08x has no transaction commit "
+ "started\n", client->db_id));
+ return -1;
+ }
+
+ ctdb_db->transaction_active = false;
+ client->db_id = 0;
if (client->num_persistent_updates == 0) {
DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
}
client->num_persistent_updates--;
+ DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
+ "transaction commit db_id[0x%08x]\n",
+ client->client_id, ctdb_db->db_id));
+
return 0;
}
struct ctdb_req_control *c)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
-
+ struct ctdb_db_context *ctdb_db;
+
+ ctdb_db = find_ctdb_db(ctdb, client->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
+ "Unknown database 0x%08x\n", client->db_id));
+ return -1;
+ }
+ if (!ctdb_db->transaction_active) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
+ "Database 0x%08x has no transaction commit "
+ "started\n", client->db_id));
+ return -1;
+ }
+
+ ctdb_db->transaction_active = false;
+ client->db_id = 0;
+
if (client->num_persistent_updates == 0) {
DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
} else {
client->num_persistent_updates--;
}
- DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
+ DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
+ " db_id[0x%08x] - forcing recovery\n",
+ ctdb_db->db_id));
client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
return 0;
}
+/**
+ * Tell whether a transaction is active on this node on the give DB.
+ */
+int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ uint32_t db_id)
+{
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
+ return -1;
+ }
+
+ if (client->db_id == db_id) {
+ return 0;
+ }
+
+ if (ctdb_db->transaction_active) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
/*
backwards compatibility:
return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
}
+static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
+ uint32_t db_id,
+ uint64_t *seqnum)
+{
+ int32_t ret;
+ struct ctdb_db_context *ctdb_db;
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key;
+ TDB_DATA data;
+ TALLOC_CTX *mem_ctx = talloc_new(ctdb);
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
+ ret = -1;
+ goto done;
+ }
+
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+ ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
+ if (ret != 0) {
+ goto done;
+ }
+
+ if (data.dsize != sizeof(uint64_t)) {
+ *seqnum = 0;
+ goto done;
+ }
+
+ *seqnum = *(uint64_t *)data.dptr;
+
+done:
+ talloc_free(mem_ctx);
+ return ret;
+}
+
+/**
+ * Get the sequence number of a persistent database.
+ */
+int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
+ TDB_DATA indata,
+ TDB_DATA *outdata)
+{
+ uint32_t db_id;
+ int32_t ret;
+ uint64_t seqnum;
+
+ db_id = *(uint32_t *)indata.dptr;
+ ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
+ if (ret != 0) {
+ goto done;
+ }
+
+ outdata->dsize = sizeof(uint64_t);
+ outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
+ if (outdata->dptr == NULL) {
+ ret = -1;
+ goto done;
+ }
+
+ *(outdata->dptr) = seqnum;
+
+done:
+ return ret;
+}