+
+/**
+ * check whether a transaction is active on a given db on a given node
+ */
+int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
+ uint32_t destnode,
+ uint32_t db_id)
+{
+ int32_t status;
+ int ret;
+ TDB_DATA indata;
+
+ indata.dptr = (uint8_t *)&db_id;
+ indata.dsize = sizeof(db_id);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_TRANS2_ACTIVE,
+ 0, indata, NULL, NULL, &status,
+ NULL, NULL);
+
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
+ return -1;
+ }
+
+ return status;
+}
+
+
+struct ctdb_transaction_handle {
+ struct ctdb_db_context *ctdb_db;
+ bool in_replay;
+ /*
+ * we store the reads and writes done under a transaction:
+ * - one list stores both reads and writes (m_all),
+ * - the other just writes (m_write)
+ */
+ struct ctdb_marshall_buffer *m_all;
+ struct ctdb_marshall_buffer *m_write;
+};
+
+/* start a transaction on a database */
+static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
+{
+ tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
+ return 0;
+}
+
+/* start a transaction on a database */
+static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
+{
+ struct ctdb_record_handle *rh;
+ TDB_DATA key;
+ TDB_DATA data;
+ struct ctdb_ltdb_header header;
+ TALLOC_CTX *tmp_ctx;
+ const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
+ int ret;
+ struct ctdb_db_context *ctdb_db = h->ctdb_db;
+ pid_t pid;
+ int32_t status;
+
+ key.dptr = discard_const(keyname);
+ key.dsize = strlen(keyname);
+
+ if (!ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
+ return -1;
+ }
+
+again:
+ tmp_ctx = talloc_new(h);
+
+ rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
+ if (rh == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
+ CTDB_CURRENT_NODE,
+ ctdb_db->db_id);
+ if (status == 1) {
+ unsigned long int usec = (1000 + random()) % 100000;
+ DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
+ "on db_id[0x%08x]. waiting for %lu "
+ "microseconds\n",
+ ctdb_db->db_id, usec));
+ talloc_free(tmp_ctx);
+ usleep(usec);
+ goto again;
+ }
+
+ /*
+ * store the pid in the database:
+ * it is not enough that the node is dmaster...
+ */
+ pid = getpid();
+ data.dptr = (unsigned char *)&pid;
+ data.dsize = sizeof(pid_t);
+ rh->header.rsn++;
+ rh->header.dmaster = ctdb_db->ctdb->pnn;
+ ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
+ "transaction record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ talloc_free(rh);
+
+ ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
+ "lock record inside transaction\n"));
+ tdb_transaction_cancel(ctdb_db->ltdb->tdb);
+ talloc_free(tmp_ctx);
+ goto again;
+ }
+
+ if (header.dmaster != ctdb_db->ctdb->pnn) {
+ DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
+ "transaction lock record\n"));
+ tdb_transaction_cancel(ctdb_db->ltdb->tdb);
+ talloc_free(tmp_ctx);
+ goto again;
+ }
+
+ if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
+ DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
+ "the transaction lock record\n"));
+ tdb_transaction_cancel(ctdb_db->ltdb->tdb);
+ talloc_free(tmp_ctx);
+ goto again;
+ }
+
+ talloc_free(tmp_ctx);
+
+ return 0;
+}
+
+
+/* start a transaction on a database */
+struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx)
+{
+ struct ctdb_transaction_handle *h;
+ int ret;
+
+ h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
+ if (h == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
+ return NULL;
+ }
+
+ h->ctdb_db = ctdb_db;
+
+ ret = ctdb_transaction_fetch_start(h);
+ if (ret != 0) {
+ talloc_free(h);
+ return NULL;
+ }
+
+ talloc_set_destructor(h, ctdb_transaction_destructor);
+
+ return h;
+}
+
+
+
+/*
+ fetch a record inside a transaction
+ */
+int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data)
+{
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ ZERO_STRUCT(header);
+
+ ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
+ if (ret == -1 && header.dmaster == (uint32_t)-1) {
+ /* record doesn't exist yet */
+ *data = tdb_null;
+ ret = 0;
+ }
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (!h->in_replay) {
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ stores a record inside a transaction
+ */
+int ctdb_transaction_store(struct ctdb_transaction_handle *h,
+ TDB_DATA key, TDB_DATA data)
+{
+ TALLOC_CTX *tmp_ctx = talloc_new(h);
+ struct ctdb_ltdb_header header;
+ TDB_DATA olddata;
+ int ret;
+
+ ZERO_STRUCT(header);
+
+ /* we need the header so we can update the RSN */
+ ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
+ if (ret == -1 && header.dmaster == (uint32_t)-1) {
+ /* the record doesn't exist - create one with us as dmaster.
+ This is only safe because we are in a transaction and this
+ is a persistent database */
+ ZERO_STRUCT(header);
+ } else if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
+ talloc_free(tmp_ctx);
+ return ret;
+ }
+
+ if (data.dsize == olddata.dsize &&
+ memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
+ /* save writing the same data */
+ talloc_free(tmp_ctx);
+ return 0;
+ }
+
+ header.dmaster = h->ctdb_db->ctdb->pnn;
+ header.rsn++;
+
+ if (!h->in_replay) {
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+ }
+
+ h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
+ if (h->m_write == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
+
+ talloc_free(tmp_ctx);
+
+ return ret;
+}
+
+/*
+ replay a transaction
+ */
+static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
+{
+ int ret, i;
+ struct ctdb_rec_data *rec = NULL;
+
+ h->in_replay = true;
+ talloc_free(h->m_write);
+ h->m_write = NULL;
+
+ ret = ctdb_transaction_fetch_start(h);
+ if (ret != 0) {
+ return ret;
+ }
+
+ for (i=0;i<h->m_all->count;i++) {
+ TDB_DATA key, data;
+
+ rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
+ if (rec == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
+ goto failed;
+ }
+
+ if (rec->reqid == 0) {
+ /* its a store */
+ if (ctdb_transaction_store(h, key, data) != 0) {
+ goto failed;
+ }
+ } else {
+ TDB_DATA data2;
+ TALLOC_CTX *tmp_ctx = talloc_new(h);
+
+ if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
+ talloc_free(tmp_ctx);
+ goto failed;
+ }
+ if (data2.dsize != data.dsize ||
+ memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
+ /* the record has changed on us - we have to give up */
+ talloc_free(tmp_ctx);
+ goto failed;
+ }
+ talloc_free(tmp_ctx);
+ }
+ }
+
+ return 0;
+
+failed:
+ tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
+ return -1;
+}
+
+
+/*
+ commit a transaction
+ */
+int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+{
+ int ret, retries=0;
+ int32_t status;
+ struct ctdb_context *ctdb = h->ctdb_db->ctdb;
+ struct timeval timeout;
+ enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
+
+ talloc_set_destructor(h, NULL);
+
+ /* our commit strategy is quite complex.
+
+ - we first try to commit the changes to all other nodes
+
+ - if that works, then we commit locally and we are done
+
+ - if a commit on another node fails, then we need to cancel
+ the transaction, then restart the transaction (thus
+ opening a window of time for a pending recovery to
+ complete), then replay the transaction, checking all the
+ reads and writes (checking that reads give the same data,
+ and writes succeed). Then we retry the transaction to the
+ other nodes
+ */
+
+again:
+ if (h->m_write == NULL) {
+ /* no changes were made */
+ tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
+ talloc_free(h);
+ return 0;
+ }
+
+ /* tell ctdbd to commit to the other nodes */
+ timeout = timeval_current_ofs(1, 0);
+ ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
+ retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
+ ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
+ &timeout, NULL);
+ if (ret != 0 || status != 0) {
+ tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
+ DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
+ ", retrying after 1 second...\n",
+ (retries==0)?"":"retry "));
+ sleep(1);
+
+ if (ret != 0) {
+ failure_control = CTDB_CONTROL_TRANS2_ERROR;
+ } else {
+ /* work out what error code we will give if we
+ have to fail the operation */
+ switch ((enum ctdb_trans2_commit_error)status) {
+ case CTDB_TRANS2_COMMIT_SUCCESS:
+ case CTDB_TRANS2_COMMIT_SOMEFAIL:
+ case CTDB_TRANS2_COMMIT_TIMEOUT:
+ failure_control = CTDB_CONTROL_TRANS2_ERROR;
+ break;
+ case CTDB_TRANS2_COMMIT_ALLFAIL:
+ failure_control = CTDB_CONTROL_TRANS2_FINISHED;
+ break;
+ }
+ }
+
+ if (++retries == 100) {
+ DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
+ h->ctdb_db->db_id, retries, (unsigned)failure_control));
+ ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
+ failure_control, CTDB_CTRL_FLAG_NOREPLY,
+ tdb_null, NULL, NULL, NULL, NULL, NULL);
+ talloc_free(h);
+ return -1;
+ }
+
+ if (ctdb_replay_transaction(h) != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
+ "transaction on db 0x%08x, "
+ "failure control =%u\n",
+ h->ctdb_db->db_id,
+ (unsigned)failure_control));
+ ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
+ failure_control, CTDB_CTRL_FLAG_NOREPLY,
+ tdb_null, NULL, NULL, NULL, NULL, NULL);
+ talloc_free(h);
+ return -1;
+ }
+ goto again;
+ } else {
+ failure_control = CTDB_CONTROL_TRANS2_ERROR;
+ }
+
+ /* do the real commit locally */
+ ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
+ "on db id 0x%08x locally, "
+ "failure_control=%u\n",
+ h->ctdb_db->db_id,
+ (unsigned)failure_control));
+ ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
+ failure_control, CTDB_CTRL_FLAG_NOREPLY,
+ tdb_null, NULL, NULL, NULL, NULL, NULL);
+ talloc_free(h);
+ return ret;
+ }
+
+ /* tell ctdbd that we are finished with our local commit */
+ ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
+ CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
+ tdb_null, NULL, NULL, NULL, NULL, NULL);
+ talloc_free(h);
+ return 0;
+}
+
+/*
+ recovery daemon ping to main daemon
+ */
+int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
+{
+ int ret;
+ int32_t res;
+
+ ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
+ ctdb, NULL, &res, NULL, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/* when forking the main daemon and the child process needs to connect back
+ * to the daemon as a client process, this function can be used to change
+ * the ctdb context from daemon into client mode
+ */
+int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
+{
+ int ret;
+ va_list ap;
+
+ /* Add extra information so we can identify this in the logs */
+ va_start(ap, fmt);
+ debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
+ va_end(ap);
+
+ /* shutdown the transport */
+ if (ctdb->methods) {
+ ctdb->methods->shutdown(ctdb);
+ }
+
+ /* get a new event context */
+ talloc_free(ctdb->ev);
+ ctdb->ev = event_context_init(ctdb);
+ tevent_loop_allow_nesting(ctdb->ev);
+
+ close(ctdb->daemon.sd);
+ ctdb->daemon.sd = -1;
+
+ /* the client does not need to be realtime */
+ if (ctdb->do_setsched) {
+ ctdb_restore_scheduler(ctdb);
+ }
+
+ /* initialise ctdb */
+ ret = ctdb_socket_connect(ctdb);
+ if (ret != 0) {
+ DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ get the status of running the monitor eventscripts: NULL means never run.
+ */
+int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
+ struct timeval timeout, uint32_t destnode,
+ TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
+ struct ctdb_scripts_wire **script_status)
+{
+ int ret;
+ TDB_DATA outdata, indata;
+ int32_t res;
+ uint32_t uinttype = type;
+
+ indata.dptr = (uint8_t *)&uinttype;
+ indata.dsize = sizeof(uinttype);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
+ mem_ctx, &outdata, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
+ return -1;
+ }
+
+ if (outdata.dsize == 0) {
+ *script_status = NULL;
+ } else {
+ *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
+ talloc_free(outdata.dptr);
+ }
+
+ return 0;
+}
+
+/*
+ tell the main daemon how long it took to lock the reclock file
+ */
+int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
+{
+ int ret;
+ int32_t res;
+ TDB_DATA data;
+
+ data.dptr = (uint8_t *)&latency;
+ data.dsize = sizeof(latency);
+
+ ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
+ ctdb, NULL, &res, NULL, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ get the name of the reclock file
+ */
+int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
+ uint32_t destnode, TALLOC_CTX *mem_ctx,
+ const char **name)
+{
+ int ret;
+ int32_t res;
+ TDB_DATA data;
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null,
+ mem_ctx, &data, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ return -1;
+ }
+
+ if (data.dsize == 0) {
+ *name = NULL;
+ } else {
+ *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
+ }
+ talloc_free(data.dptr);
+
+ return 0;
+}
+
+/*
+ set the reclock filename for a node
+ */
+int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
+{
+ int ret;
+ TDB_DATA data;
+ int32_t res;
+
+ if (reclock == NULL) {
+ data.dsize = 0;
+ data.dptr = NULL;
+ } else {
+ data.dsize = strlen(reclock) + 1;
+ data.dptr = discard_const(reclock);
+ }
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_RECLOCK_FILE, 0, data,
+ NULL, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ stop a node
+ */
+int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
+{
+ int ret;
+ int32_t res;
+
+ ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null,
+ ctdb, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,("Failed to stop node\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ continue a node
+ */
+int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
+{
+ int ret;
+
+ ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null,
+ ctdb, NULL, NULL, &timeout, NULL);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,("Failed to continue node\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ set the natgw state for a node
+ */
+int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
+{
+ int ret;
+ TDB_DATA data;
+ int32_t res;
+
+ data.dsize = sizeof(natgwstate);
+ data.dptr = (uint8_t *)&natgwstate;
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_NATGWSTATE, 0, data,
+ NULL, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ set the lmaster role for a node
+ */
+int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
+{
+ int ret;
+ TDB_DATA data;
+ int32_t res;
+
+ data.dsize = sizeof(lmasterrole);
+ data.dptr = (uint8_t *)&lmasterrole;
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_LMASTERROLE, 0, data,
+ NULL, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ set the recmaster role for a node
+ */
+int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
+{
+ int ret;
+ TDB_DATA data;
+ int32_t res;
+
+ data.dsize = sizeof(recmasterrole);
+ data.dptr = (uint8_t *)&recmasterrole;
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_RECMASTERROLE, 0, data,
+ NULL, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/* enable an eventscript
+ */
+int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
+{
+ int ret;
+ TDB_DATA data;
+ int32_t res;
+
+ data.dsize = strlen(script) + 1;
+ data.dptr = discard_const(script);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_ENABLE_SCRIPT, 0, data,
+ NULL, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/* disable an eventscript
+ */
+int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
+{
+ int ret;
+ TDB_DATA data;
+ int32_t res;
+
+ data.dsize = strlen(script) + 1;
+ data.dptr = discard_const(script);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_DISABLE_SCRIPT, 0, data,
+ NULL, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
+{
+ int ret;
+ TDB_DATA data;
+ int32_t res;
+
+ data.dsize = sizeof(*bantime);
+ data.dptr = (uint8_t *)bantime;
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_BAN_STATE, 0, data,
+ NULL, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
+{
+ int ret;
+ TDB_DATA outdata;
+ int32_t res;
+ TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
+ tmp_ctx, &outdata, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
+ talloc_free(tmp_ctx);
+
+ return 0;
+}
+
+
+int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
+{
+ int ret;
+ int32_t res;
+ TDB_DATA data;
+ TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+
+ data.dptr = (uint8_t*)db_prio;
+ data.dsize = sizeof(*db_prio);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
+ tmp_ctx, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ talloc_free(tmp_ctx);
+
+ return 0;
+}
+
+int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
+{
+ int ret;
+ int32_t res;
+ TDB_DATA data;
+ TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+
+ data.dptr = (uint8_t*)&db_id;
+ data.dsize = sizeof(db_id);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
+ tmp_ctx, NULL, &res, &timeout, NULL);
+ if (ret != 0 || res < 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ if (priority) {
+ *priority = res;
+ }
+
+ talloc_free(tmp_ctx);
+
+ return 0;
+}
+
+int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
+{
+ int ret;
+ TDB_DATA outdata;
+ int32_t res;
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null,
+ mem_ctx, &outdata, &res, &timeout, NULL);
+ if (ret != 0 || res != 0 || outdata.dsize == 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
+ return -1;
+ }
+
+ *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
+ talloc_free(outdata.dptr);
+
+ return 0;
+}
+
+struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
+{
+ if (h == NULL) {
+ return NULL;
+ }
+
+ return &h->header;
+}
+
+
+struct ctdb_client_control_state *
+ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ struct ctdb_client_control_state *handle;
+ struct ctdb_marshall_buffer *m;
+ struct ctdb_rec_data *rec;
+ TDB_DATA outdata;
+
+ m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
+ if (m == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
+ return NULL;
+ }
+
+ m->db_id = ctdb_db->db_id;
+
+ rec = ctdb_marshall_record(m, 0, key, header, data);
+ if (rec == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
+ talloc_free(m);
+ return NULL;
+ }
+ m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
+ if (m == NULL) {
+ DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
+ talloc_free(m);
+ return NULL;
+ }
+ m->count++;
+ memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
+
+
+ outdata.dptr = (uint8_t *)m;
+ outdata.dsize = talloc_get_size(m);
+
+ handle = ctdb_control_send(ctdb, destnode, 0,
+ CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
+ mem_ctx, &timeout, NULL);
+ talloc_free(m);
+ return handle;
+}
+
+int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
+{
+ int ret;
+ int32_t res;
+
+ ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
+ if ( (ret != 0) || (res != 0) ){
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ struct ctdb_client_control_state *state;
+
+ state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
+ return ctdb_ctrl_updaterecord_recv(ctdb, state);
+}
+
+
+
+
+
+
+/*
+ set a database to be readonly
+ */
+struct ctdb_client_control_state *
+ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
+{
+ TDB_DATA data;
+
+ data.dptr = (uint8_t *)&dbid;
+ data.dsize = sizeof(dbid);
+
+ return ctdb_control_send(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_DB_READONLY, 0, data,
+ ctdb, NULL, NULL);
+}
+
+int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
+{
+ int ret;
+ int32_t res;
+
+ ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed ret:%d res:%d\n", ret, res));
+ return -1;
+ }
+
+ return 0;
+}
+
+int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
+{
+ struct ctdb_client_control_state *state;
+
+ state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
+ return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
+}