#include "common/logging.h"
#include "common/tunable.h"
#include "common/srvid.h"
+#include "common/system.h"
#include "ipalloc_read_known_ips.h"
}
+static int ltdb_store(struct database *db, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ int ret;
+ bool db_volatile = true;
+ bool keep = false;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
+ (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
+ db_volatile = false;
+ }
+
+ if (data.dsize > 0) {
+ keep = true;
+ } else {
+ if (db_volatile && header->rsn == 0) {
+ keep = true;
+ }
+ }
+
+ if (keep) {
+ TDB_DATA rec[2];
+
+ rec[0].dsize = ctdb_ltdb_header_len(header);
+ rec[0].dptr = (uint8_t *)header;
+
+ rec[1].dsize = data.dsize;
+ rec[1].dptr = data.dptr;
+
+ ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
+ } else {
+ if (header->rsn > 0) {
+ ret = tdb_delete(db->tdb, key);
+ } else {
+ ret = 0;
+ }
+ }
+
+ return ret;
+}
+
+static int ltdb_fetch(struct database *db, TDB_DATA key,
+ struct ctdb_ltdb_header *header,
+ TALLOC_CTX *mem_ctx, TDB_DATA *data)
+{
+ TDB_DATA rec;
+ size_t np;
+ int ret;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ rec = tdb_fetch(db->tdb, key);
+ ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
+ if (ret != 0) {
+ if (rec.dptr != NULL) {
+ free(rec.dptr);
+ }
+
+ *header = (struct ctdb_ltdb_header) {
+ .rsn = 0,
+ .dmaster = 0,
+ .flags = 0,
+ };
+
+ ret = ltdb_store(db, key, header, tdb_null);
+ if (ret != 0) {
+ return ret;
+ }
+
+ *data = tdb_null;
+ return 0;
+ }
+
+ data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
+ data->dptr = talloc_memdup(mem_ctx,
+ rec.dptr + ctdb_ltdb_header_len(header),
+ data->dsize);
+
+ free(rec.dptr);
+
+ if (data->dptr == NULL) {
+ return ENOMEM;
+ }
+
+ return 0;
+}
+
+static int database_seqnum(struct database *db, uint64_t *seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ size_t np;
+ int ret;
+
+ if (db->tdb == NULL) {
+ *seqnum = db->seq_num;
+ return 0;
+ }
+
+ key.dptr = discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ ret = ltdb_fetch(db, key, &header, db, &data);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (data.dsize == 0) {
+ *seqnum = 0;
+ return 0;
+ }
+
+ ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
+ talloc_free(data.dptr);
+ if (ret != 0) {
+ *seqnum = 0;
+ }
+
+ return ret;
+}
+
+static int ltdb_transaction_update(uint32_t reqid,
+ struct ctdb_ltdb_header *no_header,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct database *db = (struct database *)private_data;
+ TALLOC_CTX *tmp_ctx = talloc_new(db);
+ struct ctdb_ltdb_header header = { 0 }, oldheader;
+ TDB_DATA olddata;
+ int ret;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ ret = ctdb_ltdb_header_extract(&data, &header);
+ if (ret != 0) {
+ return ret;
+ }
+
+ ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (olddata.dsize > 0) {
+ if (oldheader.rsn > header.rsn ||
+ (oldheader.rsn == header.rsn &&
+ olddata.dsize != data.dsize)) {
+ return -1;
+ }
+ }
+
+ talloc_free(tmp_ctx);
+
+ ret = ltdb_store(db, key, &header, data);
+ return ret;
+}
+
+static int ltdb_transaction(struct database *db,
+ struct ctdb_rec_buffer *recbuf)
+{
+ int ret;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ ret = tdb_transaction_start(db->tdb);
+ if (ret == -1) {
+ return ret;
+ }
+
+ ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
+ if (ret != 0) {
+ tdb_transaction_cancel(db->tdb);
+ }
+
+ ret = tdb_transaction_commit(db->tdb);
+ return ret;
+}
+
static bool public_ips_parse(struct ctdbd_context *ctdb,
uint32_t numnodes)
{
static bool ctdbd_verify(struct ctdbd_context *ctdb)
{
struct node *node;
- int i;
+ unsigned int i;
if (ctdb->node_map->num_nodes == 0) {
return true;
struct ctdbd_context *ctdb = state->ctdb;
struct tevent_req *subreq;
bool recovery_disabled;
- int i;
+ unsigned int i;
recovery_disabled = false;
for (i=0; i<ctdb->node_map->num_nodes; i++) {
}
}
+static struct ctdb_req_header header_reply_call(
+ struct ctdb_req_header *header,
+ struct ctdbd_context *ctdb)
+{
+ struct ctdb_req_header reply_header;
+
+ reply_header = (struct ctdb_req_header) {
+ .ctdb_magic = CTDB_MAGIC,
+ .ctdb_version = CTDB_PROTOCOL,
+ .generation = ctdb->vnn_map->generation,
+ .operation = CTDB_REPLY_CALL,
+ .destnode = header->srcnode,
+ .srcnode = header->destnode,
+ .reqid = header->reqid,
+ };
+
+ return reply_header;
+}
+
static struct ctdb_req_header header_reply_control(
struct ctdb_req_header *header,
struct ctdbd_context *ctdb)
};
/*
- * Send replies to controls and messages
+ * Send replies to call, controls and messages
*/
static void client_reply_done(struct tevent_req *subreq);
+static void client_send_call(struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_reply_call *reply)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct tevent_req *subreq;
+ struct ctdb_req_header reply_header;
+ uint8_t *buf;
+ size_t datalen, buflen;
+ int ret;
+
+ reply_header = header_reply_call(header, ctdb);
+
+ datalen = ctdb_reply_call_len(&reply_header, reply);
+ ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, client_reply_done, req);
+
+ talloc_steal(subreq, buf);
+}
+
static void client_send_message(struct tevent_req *req,
struct ctdb_req_header *header,
struct ctdb_req_message_data *message)
struct ctdb_reply_control reply;
struct ctdb_dbid_map *dbmap;
struct database *db;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
struct ctdb_reply_control reply;
struct ctdb_node_map *nodemap;
struct node_map *node_map = ctdb->node_map;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
struct ctdb_reply_control reply;
struct ctdb_public_ip_list *ips = NULL;
struct ctdb_public_ip *t = NULL;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
struct ctdb_reply_control reply;
struct ctdb_public_ip_list *ips = NULL;
struct ctdb_public_ip *t = NULL;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
* no available IPs. Don't worry about interface
* states here - we're not faking down to that level.
*/
- if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
+ uint32_t flags = ctdb->node_map->node[header->destnode].flags;
+ if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
+ ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
/* No available IPs: return dummy empty struct */
ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
if (ips == NULL) {
struct ctdb_reply_control reply;
struct ctdb_node_map *nodemap;
struct node *node;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
reply.errmsg = "Failed to ban node";
}
+static void control_trans3_commit(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+ int ret;
+
+ reply.rdata.opcode = request->opcode;
+
+ db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
+ if (db == NULL) {
+ reply.status = -1;
+ reply.errmsg = "Unknown database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ if (! (db->flags &
+ (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
+ reply.status = -1;
+ reply.errmsg = "Transactions on volatile database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ ret = ltdb_transaction(db, request->rdata.data.recbuf);
+ if (ret != 0) {
+ reply.status = -1;
+ reply.errmsg = "Transaction failed";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ reply.status = 0;
+ reply.errmsg = NULL;
+ client_send_control(req, header, &reply);
+}
+
static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
struct ctdbd_context *ctdb = state->ctdb;
struct ctdb_reply_control reply;
struct database *db;
+ int ret;
reply.rdata.opcode = request->opcode;
reply.status = ENOENT;
reply.errmsg = "Database not found";
} else {
- reply.rdata.data.seqnum = db->seq_num;
- reply.status = 0;
- reply.errmsg = NULL;
+ uint64_t seqnum;
+
+ ret = database_seqnum(db, &seqnum);
+ if (ret == 0) {
+ reply.rdata.data.seqnum = seqnum;
+ reply.status = 0;
+ reply.errmsg = NULL;
+ } else {
+ reply.status = ret;
+ reply.errmsg = "Failed to get seqnum";
+ }
}
client_send_control(req, header, &reply);
{
struct ctdb_iface_list *iface_list;
struct interface *iface;
- int i;
+ unsigned int i;
iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
if (iface_list == NULL) {
client_send_control(req, header, &reply);
}
+struct traverse_start_ext_state {
+ struct tevent_req *req;
+ struct ctdb_req_header *header;
+ uint32_t reqid;
+ uint64_t srvid;
+ bool withemptyrecords;
+ int status;
+};
+
+static int traverse_start_ext_handler(struct tdb_context *tdb,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct traverse_start_ext_state *state =
+ (struct traverse_start_ext_state *)private_data;
+ struct ctdb_rec_data rec;
+ struct ctdb_req_message_data message;
+ size_t np;
+
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ return 0;
+ }
+
+ if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
+ (!state->withemptyrecords)) {
+ return 0;
+ }
+
+ rec = (struct ctdb_rec_data) {
+ .reqid = state->reqid,
+ .header = NULL,
+ .key = key,
+ .data = data,
+ };
+
+ message.srvid = state->srvid;
+ message.data.dsize = ctdb_rec_data_len(&rec);
+ message.data.dptr = talloc_size(state->req, message.data.dsize);
+ if (message.data.dptr == NULL) {
+ state->status = ENOMEM;
+ return 1;
+ }
+
+ ctdb_rec_data_push(&rec, message.data.dptr, &np);
+ client_send_message(state->req, state->header, &message);
+
+ talloc_free(message.data.dptr);
+
+ return 0;
+}
+
+static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+ struct ctdb_traverse_start_ext *ext;
+ struct traverse_start_ext_state t_state;
+ struct ctdb_rec_data rec;
+ struct ctdb_req_message_data message;
+ uint8_t buffer[32];
+ size_t np;
+ int ret;
+
+ reply.rdata.opcode = request->opcode;
+
+ ext = request->rdata.data.traverse_start_ext;
+
+ db = database_find(ctdb->db_map, ext->db_id);
+ if (db == NULL) {
+ reply.status = -1;
+ reply.errmsg = "Unknown database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ t_state = (struct traverse_start_ext_state) {
+ .req = req,
+ .header = header,
+ .reqid = ext->reqid,
+ .srvid = ext->srvid,
+ .withemptyrecords = ext->withemptyrecords,
+ };
+
+ ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
+ DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
+ if (t_state.status != 0) {
+ reply.status = -1;
+ reply.errmsg = "Memory error";
+ client_send_control(req, header, &reply);
+ }
+
+ reply.status = 0;
+ client_send_control(req, header, &reply);
+
+ rec = (struct ctdb_rec_data) {
+ .reqid = ext->reqid,
+ .header = NULL,
+ .key = tdb_null,
+ .data = tdb_null,
+ };
+
+ message.srvid = ext->srvid;
+ message.data.dsize = ctdb_rec_data_len(&rec);
+ ctdb_rec_data_push(&rec, buffer, &np);
+ message.data.dptr = buffer;
+ client_send_message(req, header, &message);
+}
+
static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
{
struct ctdb_reply_control reply;
+ D_DEBUG("Control %u not implemented\n", request->opcode);
+
reply.rdata.opcode = request->opcode;
reply.status = -1;
reply.errmsg = "Not implemented";
static void client_dead_handler(void *private_data);
static void client_process_packet(struct tevent_req *req,
uint8_t *buf, size_t buflen);
+static void client_process_call(struct tevent_req *req,
+ uint8_t *buf, size_t buflen);
static void client_process_message(struct tevent_req *req,
uint8_t *buf, size_t buflen);
static void client_process_control(struct tevent_req *req,
{
struct tevent_req *req;
struct client_state *state;
- struct ucred cr;
- socklen_t crl = sizeof(struct ucred);
int ret;
req = tevent_req_create(mem_ctx, &state, struct client_state);
state->ctdb = ctdb;
state->pnn = pnn;
- ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl);
- if (ret != 0) {
- tevent_req_error(req, ret);
- return tevent_req_post(req, ev);
- }
- state->pid = cr.pid;
+ (void) ctdb_get_peer_pid(fd, &state->pid);
ret = comm_setup(state, ev, fd, client_read_handler, req,
client_dead_handler, req, &state->comm);
struct ctdbd_context *ctdb = state->ctdb;
struct ctdb_req_header header;
size_t np;
- int ret, i;
+ unsigned int i;
+ int ret;
ret = ctdb_req_header_pull(buf, buflen, &header, &np);
if (ret != 0) {
}
switch (header.operation) {
+ case CTDB_REQ_CALL:
+ client_process_call(req, buf, buflen);
+ break;
+
case CTDB_REQ_MESSAGE:
client_process_message(req, buf, buflen);
break;
}
}
+static void client_process_call(struct tevent_req *req,
+ uint8_t *buf, size_t buflen)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ TALLOC_CTX *mem_ctx;
+ struct ctdb_req_header header;
+ struct ctdb_req_call request;
+ struct ctdb_reply_call reply;
+ struct database *db;
+ struct ctdb_ltdb_header hdr;
+ TDB_DATA data;
+ int ret;
+
+ mem_ctx = talloc_new(state);
+ if (tevent_req_nomem(mem_ctx, req)) {
+ return;
+ }
+
+ ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
+ if (ret != 0) {
+ talloc_free(mem_ctx);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ header_fix_pnn(&header, ctdb);
+
+ if (header.destnode >= ctdb->node_map->num_nodes) {
+ goto fail;
+ }
+
+ DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
+
+ db = database_find(ctdb->db_map, request.db_id);
+ if (db == NULL) {
+ goto fail;
+ }
+
+ ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
+ if (ret != 0) {
+ goto fail;
+ }
+
+ /* Fake migration */
+ if (hdr.dmaster != ctdb->node_map->pnn) {
+ hdr.dmaster = ctdb->node_map->pnn;
+
+ ret = ltdb_store(db, request.key, &hdr, data);
+ if (ret != 0) {
+ goto fail;
+ }
+ }
+
+ talloc_free(mem_ctx);
+
+ reply.status = 0;
+ reply.data = tdb_null;
+
+ client_send_call(req, &header, &reply);
+ return;
+
+fail:
+ talloc_free(mem_ctx);
+ reply.status = -1;
+ reply.data = tdb_null;
+
+ client_send_call(req, &header, &reply);
+}
+
static void client_process_message(struct tevent_req *req,
uint8_t *buf, size_t buflen)
{
message_disable_recoveries(mem_ctx, req, &header, &request);
} else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
message_takeover_run(mem_ctx, req, &header, &request);
+ } else {
+ D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
}
/* check srvid */
control_set_ban_state(mem_ctx, req, &header, &request);
break;
+ case CTDB_CONTROL_TRANS3_COMMIT:
+ control_trans3_commit(mem_ctx, req, &header, &request);
+ break;
+
case CTDB_CONTROL_GET_DB_SEQNUM:
control_get_db_seqnum(mem_ctx, req, &header, &request);
break;
control_set_db_readonly(mem_ctx, req, &header, &request);
break;
+ case CTDB_CONTROL_TRAVERSE_START_EXT:
+ control_traverse_start_ext(mem_ctx, req, &header, &request);
+ break;
+
case CTDB_CONTROL_SET_DB_STICKY:
control_set_db_sticky(mem_ctx, req, &header, &request);
break;