#include "replace.h"
#include "system/network.h"
#include "system/time.h"
+#include "system/filesys.h"
#include <popt.h>
#include <talloc.h>
#include "protocol/protocol.h"
#include "protocol/protocol_api.h"
+#include "protocol/protocol_util.h"
+#include "protocol/protocol_private.h"
#include "common/comm.h"
-#include "common/system.h"
#include "common/logging.h"
#include "common/tunable.h"
+#include "common/srvid.h"
+#include "common/system.h"
#include "ipalloc_read_known_ips.h"
};
struct database {
+ struct database *prev, *next;
const char *name;
+ const char *path;
+ struct tdb_context *tdb;
uint32_t id;
uint8_t flags;
uint64_t seq_num;
};
struct database_map {
- int num_dbs;
struct database *db;
+ const char *dbdir;
+};
+
+struct fake_control_failure {
+ struct fake_control_failure *prev, *next;
+ enum ctdb_controls opcode;
+ uint32_t pnn;
+ const char *error;
+ const char *comment;
};
-struct srvid_register_state {
- struct srvid_register_state *prev, *next;
+struct ctdb_client {
+ struct ctdb_client *prev, *next;
struct ctdbd_context *ctdb;
- uint64_t srvid;
+ pid_t pid;
+ void *state;
};
struct ctdbd_context {
struct interface_map *iface_map;
struct vnn_map *vnn_map;
struct database_map *db_map;
- struct srvid_register_state *rstate;
+ struct srvid_context *srv;
int num_clients;
struct timeval start_time;
struct timeval recovery_start_time;
int log_level;
enum ctdb_runstate runstate;
struct ctdb_tunable_list tun_list;
- int monitoring_mode;
char *reclock;
struct ctdb_public_ip_list *known_ips;
+ struct fake_control_failure *control_failures;
+ struct ctdb_client *client_list;
};
/*
char *ip;
ctdb_sock_addr saddr;
struct node *node;
+ int ret;
if (line[0] == '\n') {
break;
fprintf(stderr, "bad line (%s) - missing IP\n", line);
continue;
}
- if (!parse_ip(tok, NULL, CTDB_PORT, &saddr)) {
+ ret = ctdb_sock_addr_from_string(tok, &saddr, false);
+ if (ret != 0) {
fprintf(stderr, "bad line (%s) - invalid IP\n", line);
continue;
}
+ ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
ip = talloc_strdup(node_map, tok);
if (ip == NULL) {
goto fail;
}
node = &node_map->node[node_map->num_nodes];
- parse_ip(ip, NULL, CTDB_PORT, &node->addr);
+ ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
+ if (ret != 0) {
+ fprintf(stderr, "bad line (%s) - invalid IP\n", line);
+ continue;
+ }
+ ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
node->pnn = pnn;
node->flags = flags;
node->capabilities = capabilities;
ctdb_sock_addr addr;
uint32_t num;
struct ctdb_node_and_flags *n;
+ int ret;
- if (! parse_ip(nstr, NULL, CTDB_PORT, &addr)) {
+ ret = ctdb_sock_addr_from_string(nstr, &addr, false);
+ if (ret != 0) {
fprintf(stderr, "Invalid IP address %s\n", nstr);
return false;
}
+ ctdb_sock_addr_set_port(&addr, CTDB_PORT);
num = nodemap->num;
nodemap->node = talloc_realloc(nodemap, nodemap->node,
uint32_t pnn)
{
struct ctdb_node_map *nodemap;
- char nodepath[PATH_MAX];
- const char *nodes_list;
-
- /* read the nodes file */
- sprintf(nodepath, "CTDB_NODES_%u", pnn);
- nodes_list = getenv(nodepath);
- if (nodes_list == NULL) {
- nodes_list = getenv("CTDB_NODES");
- if (nodes_list == NULL) {
- DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
+ char nodes_list[PATH_MAX];
+ const char *ctdb_base;
+ int num;
+
+ ctdb_base = getenv("CTDB_BASE");
+ if (ctdb_base == NULL) {
+ D_ERR("CTDB_BASE is not set\n");
+ return NULL;
+ }
+
+ /* read optional node-specific nodes file */
+ num = snprintf(nodes_list, sizeof(nodes_list),
+ "%s/nodes.%d", ctdb_base, pnn);
+ if (num == sizeof(nodes_list)) {
+ D_ERR("nodes file path too long\n");
+ return NULL;
+ }
+ nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
+ if (nodemap != NULL) {
+ /* Fake a load failure for an empty nodemap */
+ if (nodemap->num == 0) {
+ talloc_free(nodemap);
+
+ D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
return NULL;
}
+
+ return nodemap;
}
- nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
- if (nodemap == NULL) {
- DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
- nodes_list));
+ /* read normal nodes file */
+ num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
+ if (num == sizeof(nodes_list)) {
+ D_ERR("nodes file path too long\n");
return NULL;
}
+ nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
+ if (nodemap != NULL) {
+ return nodemap;
+ }
- return nodemap;
+ DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
+ return NULL;
}
static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
return false;
}
-static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx)
+static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
+ const char *dbdir)
{
struct database_map *db_map;
return NULL;
}
+ db_map->dbdir = talloc_strdup(db_map, dbdir);
+ if (db_map->dbdir == NULL) {
+ talloc_free(db_map);
+ return NULL;
+ }
+
return db_map;
}
flags |= CTDB_DB_FLAGS_STICKY;
} else if (strcmp(tok, "READONLY") == 0) {
flags |= CTDB_DB_FLAGS_READONLY;
+ } else if (strcmp(tok, "REPLICATED") == 0) {
+ flags |= CTDB_DB_FLAGS_REPLICATED;
} else if (tok[0] >= '0'&& tok[0] <= '9') {
- if ((flags & CTDB_DB_FLAGS_PERSISTENT) == 0) {
+ uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
+ CTDB_DB_FLAGS_REPLICATED;
+
+ if ((flags & nv) == 0) {
fprintf(stderr,
"seq_num for volatile db\n");
goto fail;
tok = strtok(NULL, " \t");
}
- db_map->db = talloc_realloc(db_map, db_map->db,
- struct database,
- db_map->num_dbs + 1);
- if (db_map->db == NULL) {
+ db = talloc_zero(db_map, struct database);
+ if (db == NULL) {
goto fail;
}
- db = &db_map->db[db_map->num_dbs];
db->id = id;
- db->name = name;
+ db->name = talloc_steal(db, name);
+ db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
+ if (db->path == NULL) {
+ talloc_free(db);
+ goto fail;
+ }
db->flags = flags;
db->seq_num = seq_num;
- db_map->num_dbs += 1;
+ DLIST_ADD_END(db_map->db, db);
}
DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
}
-static struct database *database_find(struct database_map *map,
+static struct database *database_find(struct database_map *db_map,
uint32_t db_id)
{
- int i;
-
- for (i = 0; i < map->num_dbs; i++) {
- struct database *db = &map->db[i];
+ struct database *db;
+ for (db = db_map->db; db != NULL; db = db->next) {
if (db->id == db_id) {
return db;
}
return NULL;
}
+static int database_count(struct database_map *db_map)
+{
+ struct database *db;
+ int count = 0;
+
+ for (db = db_map->db; db != NULL; db = db->next) {
+ count += 1;
+ }
+
+ return count;
+}
+
+static int database_flags(uint8_t db_flags)
+{
+ int tdb_flags = 0;
+
+ if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
+ tdb_flags = TDB_DEFAULT;
+ } else {
+ /* volatile and replicated use the same flags */
+ tdb_flags = TDB_NOSYNC |
+ TDB_CLEAR_IF_FIRST |
+ TDB_INCOMPATIBLE_HASH;
+ }
+
+ tdb_flags |= TDB_DISALLOW_NESTING;
+
+ return tdb_flags;
+}
+
+static struct database *database_new(struct database_map *db_map,
+ const char *name, uint8_t flags)
+{
+ struct database *db;
+ TDB_DATA key;
+ int tdb_flags;
+
+ db = talloc_zero(db_map, struct database);
+ if (db == NULL) {
+ return NULL;
+ }
+
+ db->name = talloc_strdup(db, name);
+ if (db->name == NULL) {
+ goto fail;
+ }
+
+ db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
+ if (db->path == NULL) {
+ goto fail;
+ }
+
+ key.dsize = strlen(db->name) + 1;
+ key.dptr = discard_const(db->name);
+
+ db->id = tdb_jenkins_hash(&key);
+ db->flags = flags;
+
+ tdb_flags = database_flags(flags);
+
+ db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
+ if (db->tdb == NULL) {
+ DBG_ERR("tdb_open\n");
+ goto fail;
+ }
+
+ DLIST_ADD_END(db_map->db, db);
+ return db;
+
+fail:
+ DBG_ERR("Memory error\n");
+ talloc_free(db);
+ return NULL;
+
+}
+
+static int ltdb_store(struct database *db, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ int ret;
+ bool db_volatile = true;
+ bool keep = false;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
+ (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
+ db_volatile = false;
+ }
+
+ if (data.dsize > 0) {
+ keep = true;
+ } else {
+ if (db_volatile && header->rsn == 0) {
+ keep = true;
+ }
+ }
+
+ if (keep) {
+ TDB_DATA rec[2];
+
+ rec[0].dsize = ctdb_ltdb_header_len(header);
+ rec[0].dptr = (uint8_t *)header;
+
+ rec[1].dsize = data.dsize;
+ rec[1].dptr = data.dptr;
+
+ ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
+ } else {
+ if (header->rsn > 0) {
+ ret = tdb_delete(db->tdb, key);
+ } else {
+ ret = 0;
+ }
+ }
+
+ return ret;
+}
+
+static int ltdb_fetch(struct database *db, TDB_DATA key,
+ struct ctdb_ltdb_header *header,
+ TALLOC_CTX *mem_ctx, TDB_DATA *data)
+{
+ TDB_DATA rec;
+ size_t np;
+ int ret;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ rec = tdb_fetch(db->tdb, key);
+ ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
+ if (ret != 0) {
+ if (rec.dptr != NULL) {
+ free(rec.dptr);
+ }
+
+ *header = (struct ctdb_ltdb_header) {
+ .rsn = 0,
+ .dmaster = 0,
+ .flags = 0,
+ };
+
+ ret = ltdb_store(db, key, header, tdb_null);
+ if (ret != 0) {
+ return ret;
+ }
+
+ *data = tdb_null;
+ return 0;
+ }
+
+ data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
+ data->dptr = talloc_memdup(mem_ctx,
+ rec.dptr + ctdb_ltdb_header_len(header),
+ data->dsize);
+
+ free(rec.dptr);
+
+ if (data->dptr == NULL) {
+ return ENOMEM;
+ }
+
+ return 0;
+}
+
+static int database_seqnum(struct database *db, uint64_t *seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ size_t np;
+ int ret;
+
+ if (db->tdb == NULL) {
+ *seqnum = db->seq_num;
+ return 0;
+ }
+
+ key.dptr = discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ ret = ltdb_fetch(db, key, &header, db, &data);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (data.dsize == 0) {
+ *seqnum = 0;
+ return 0;
+ }
+
+ ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
+ talloc_free(data.dptr);
+ if (ret != 0) {
+ *seqnum = 0;
+ }
+
+ return ret;
+}
+
+static int ltdb_transaction_update(uint32_t reqid,
+ struct ctdb_ltdb_header *no_header,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct database *db = (struct database *)private_data;
+ TALLOC_CTX *tmp_ctx = talloc_new(db);
+ struct ctdb_ltdb_header header = { 0 }, oldheader;
+ TDB_DATA olddata;
+ int ret;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ ret = ctdb_ltdb_header_extract(&data, &header);
+ if (ret != 0) {
+ return ret;
+ }
+
+ ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (olddata.dsize > 0) {
+ if (oldheader.rsn > header.rsn ||
+ (oldheader.rsn == header.rsn &&
+ olddata.dsize != data.dsize)) {
+ return -1;
+ }
+ }
+
+ talloc_free(tmp_ctx);
+
+ ret = ltdb_store(db, key, &header, data);
+ return ret;
+}
+
+static int ltdb_transaction(struct database *db,
+ struct ctdb_rec_buffer *recbuf)
+{
+ int ret;
+
+ if (db->tdb == NULL) {
+ return EINVAL;
+ }
+
+ ret = tdb_transaction_start(db->tdb);
+ if (ret == -1) {
+ return ret;
+ }
+
+ ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
+ if (ret != 0) {
+ tdb_transaction_cancel(db->tdb);
+ }
+
+ ret = tdb_transaction_commit(db->tdb);
+ return ret;
+}
+
static bool public_ips_parse(struct ctdbd_context *ctdb,
uint32_t numnodes)
{
+ bool status;
+
if (numnodes == 0) {
D_ERR("Must initialise nodemap before public IPs\n");
return false;
ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
- return (ctdb->known_ips != NULL);
+ status = (ctdb->known_ips != NULL);
+
+ if (status) {
+ D_INFO("Parsing public IPs done\n");
+ } else {
+ D_INFO("Parsing public IPs failed\n");
+ }
+
+ return status;
+}
+
+/* Read information about controls to fail. Format is:
+ * <opcode> <pnn> {ERROR|TIMEOUT} <comment>
+ */
+static bool control_failures_parse(struct ctdbd_context *ctdb)
+{
+ char line[1024];
+
+ while ((fgets(line, sizeof(line), stdin) != NULL)) {
+ char *tok, *t;
+ enum ctdb_controls opcode;
+ uint32_t pnn;
+ const char *error;
+ const char *comment;
+ struct fake_control_failure *failure = NULL;
+
+ if (line[0] == '\n') {
+ break;
+ }
+
+ /* Get rid of pesky newline */
+ if ((t = strchr(line, '\n')) != NULL) {
+ *t = '\0';
+ }
+
+ /* Get opcode */
+ tok = strtok(line, " \t");
+ if (tok == NULL) {
+ D_ERR("bad line (%s) - missing opcode\n", line);
+ continue;
+ }
+ opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
+
+ /* Get PNN */
+ tok = strtok(NULL, " \t");
+ if (tok == NULL) {
+ D_ERR("bad line (%s) - missing PNN\n", line);
+ continue;
+ }
+ pnn = (uint32_t)strtoul(tok, NULL, 0);
+
+ /* Get error */
+ tok = strtok(NULL, " \t");
+ if (tok == NULL) {
+ D_ERR("bad line (%s) - missing errno\n", line);
+ continue;
+ }
+ error = talloc_strdup(ctdb, tok);
+ if (error == NULL) {
+ goto fail;
+ }
+ if (strcmp(error, "ERROR") != 0 &&
+ strcmp(error, "TIMEOUT") != 0) {
+ D_ERR("bad line (%s) "
+ "- error must be \"ERROR\" or \"TIMEOUT\"\n",
+ line);
+ goto fail;
+ }
+
+ /* Get comment */
+ tok = strtok(NULL, "\n"); /* rest of line */
+ if (tok == NULL) {
+ D_ERR("bad line (%s) - missing comment\n", line);
+ continue;
+ }
+ comment = talloc_strdup(ctdb, tok);
+ if (comment == NULL) {
+ goto fail;
+ }
+
+ failure = talloc_zero(ctdb, struct fake_control_failure);
+ if (failure == NULL) {
+ goto fail;
+ }
+
+ failure->opcode = opcode;
+ failure->pnn = pnn;
+ failure->error = error;
+ failure->comment = comment;
+
+ DLIST_ADD(ctdb->control_failures, failure);
+ }
+
+ D_INFO("Parsing fake control failures done\n");
+ return true;
+
+fail:
+ D_INFO("Parsing fake control failures failed\n");
+ return false;
+}
+
+/*
+ * Manage clients
+ */
+
+static int ctdb_client_destructor(struct ctdb_client *client)
+{
+ DLIST_REMOVE(client->ctdb->client_list, client);
+ return 0;
+}
+
+static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
+ void *client_state)
+{
+ struct ctdb_client *client;
+
+ client = talloc_zero(client_state, struct ctdb_client);
+ if (client == NULL) {
+ return ENOMEM;
+ }
+
+ client->ctdb = ctdb;
+ client->pid = client_pid;
+ client->state = client_state;
+
+ DLIST_ADD(ctdb->client_list, client);
+ talloc_set_destructor(client, ctdb_client_destructor);
+ return 0;
+}
+
+static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
+{
+ struct ctdb_client *client;
+
+ for (client=ctdb->client_list; client != NULL; client=client->next) {
+ if (client->pid == client_pid) {
+ return client->state;
+ }
+ }
+
+ return NULL;
}
/*
return generation;
}
-static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
+static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
+ const char *dbdir)
{
struct ctdbd_context *ctdb;
char line[1024];
bool status;
+ int ret;
ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
if (ctdb == NULL) {
goto fail;
}
- ctdb->db_map = dbmap_init(ctdb);
+ ctdb->db_map = dbmap_init(ctdb, dbdir);
if (ctdb->db_map == NULL) {
goto fail;
}
+ ret = srvid_init(ctdb, &ctdb->srv);
+ if (ret != 0) {
+ goto fail;
+ }
+
while (fgets(line, sizeof(line), stdin) != NULL) {
char *t;
ctdb->node_map->num_nodes);
} else if (strcmp(line, "RECLOCK") == 0) {
status = reclock_parse(ctdb);
+ } else if (strcmp(line, "CONTROLFAILS") == 0) {
+ status = control_failures_parse(ctdb);
} else {
fprintf(stderr, "Unknown line %s\n", line);
status = false;
ctdb_tunable_set_defaults(&ctdb->tun_list);
- ctdb->monitoring_mode = CTDB_MONITORING_ENABLED;
-
return ctdb;
fail:
static bool ctdbd_verify(struct ctdbd_context *ctdb)
{
struct node *node;
- int i;
+ unsigned int i;
if (ctdb->node_map->num_nodes == 0) {
return true;
struct ctdbd_context *ctdb = state->ctdb;
struct tevent_req *subreq;
bool recovery_disabled;
- int i;
+ unsigned int i;
recovery_disabled = false;
for (i=0; i<ctdb->node_map->num_nodes; i++) {
}
}
+static struct ctdb_req_header header_reply_call(
+ struct ctdb_req_header *header,
+ struct ctdbd_context *ctdb)
+{
+ struct ctdb_req_header reply_header;
+
+ reply_header = (struct ctdb_req_header) {
+ .ctdb_magic = CTDB_MAGIC,
+ .ctdb_version = CTDB_PROTOCOL,
+ .generation = ctdb->vnn_map->generation,
+ .operation = CTDB_REPLY_CALL,
+ .destnode = header->srcnode,
+ .srcnode = header->destnode,
+ .reqid = header->reqid,
+ };
+
+ return reply_header;
+}
+
static struct ctdb_req_header header_reply_control(
struct ctdb_req_header *header,
struct ctdbd_context *ctdb)
int fd;
struct ctdbd_context *ctdb;
int pnn;
+ pid_t pid;
struct comm_context *comm;
struct srvid_register_state *rstate;
int status;
};
/*
- * Send replies to controls and messages
+ * Send replies to call, controls and messages
*/
static void client_reply_done(struct tevent_req *subreq);
+static void client_send_call(struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_reply_call *reply)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct tevent_req *subreq;
+ struct ctdb_req_header reply_header;
+ uint8_t *buf;
+ size_t datalen, buflen;
+ int ret;
+
+ reply_header = header_reply_call(header, ctdb);
+
+ datalen = ctdb_reply_call_len(&reply_header, reply);
+ ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, client_reply_done, req);
+
+ talloc_steal(subreq, buf);
+}
+
static void client_send_message(struct tevent_req *req,
struct ctdb_req_header *header,
struct ctdb_req_message_data *message)
struct ctdb_req_header *header,
struct ctdb_req_control *request)
{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct client_state *cstate;
struct ctdb_reply_control reply;
reply.rdata.opcode = request->opcode;
- reply.status = kill(request->rdata.data.pid, 0);
- reply.errmsg = NULL;
+
+ cstate = client_find(ctdb, request->rdata.data.pid);
+ if (cstate == NULL) {
+ reply.status = -1;
+ reply.errmsg = "No client for PID";
+ } else {
+ reply.status = kill(request->rdata.data.pid, 0);
+ reply.errmsg = NULL;
+ }
client_send_control(req, header, &reply);
}
reply.status = ENOENT;
reply.errmsg = "Database not found";
} else {
- const char *base;
- if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
- base = "/var/lib/ctdb/persistent";
- } else {
- base = "/var/run/ctdb/DB_DIR";
- }
reply.rdata.data.db_path =
- talloc_asprintf(mem_ctx, "%s/%s.%u",
- base, db->name, header->destnode);
+ talloc_strdup(mem_ctx, db->path);
if (reply.rdata.data.db_path == NULL) {
reply.status = ENOMEM;
reply.errmsg = "Memory error";
struct ctdbd_context *ctdb = state->ctdb;
struct ctdb_reply_control reply;
struct ctdb_dbid_map *dbmap;
- int i;
+ struct database *db;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
goto fail;
}
- dbmap->num = ctdb->db_map->num_dbs;
+ dbmap->num = database_count(ctdb->db_map);
dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
if (dbmap->dbs == NULL) {
goto fail;
}
+ db = ctdb->db_map->db;
for (i = 0; i < dbmap->num; i++) {
- struct database *db = &ctdb->db_map->db[i];
dbmap->dbs[i] = (struct ctdb_dbid) {
.db_id = db->id,
.flags = db->flags,
};
+
+ db = db->next;
}
reply.rdata.data.dbmap = dbmap;
}
-static int srvid_register_state_destructor(struct srvid_register_state *rstate)
+static void control_db_attach(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
{
- DLIST_REMOVE(rstate->ctdb->rstate, rstate);
- return 0;
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+
+ reply.rdata.opcode = request->opcode;
+
+ for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+ if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+ goto done;
+ }
+ }
+
+ db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
+ if (db == NULL) {
+ reply.status = -1;
+ reply.errmsg = "Failed to attach database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+done:
+ reply.rdata.data.db_id = db->id;
+ reply.status = 0;
+ reply.errmsg = NULL;
+ client_send_control(req, header, &reply);
+}
+
+static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
+{
+ printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
}
static void control_register_srvid(TALLOC_CTX *mem_ctx,
req, struct client_state);
struct ctdbd_context *ctdb = state->ctdb;
struct ctdb_reply_control reply;
- struct srvid_register_state *rstate;
+ int ret;
reply.rdata.opcode = request->opcode;
- rstate = talloc_zero(ctdb, struct srvid_register_state);
- if (rstate == NULL) {
+ ret = srvid_register(ctdb->srv, state, request->srvid,
+ srvid_handler, state);
+ if (ret != 0) {
reply.status = -1;
reply.errmsg = "Memory error";
goto fail;
}
- rstate->ctdb = ctdb;
- rstate->srvid = request->srvid;
-
- talloc_set_destructor(rstate, srvid_register_state_destructor);
- DLIST_ADD_END(ctdb->rstate, rstate);
-
- DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", rstate->srvid));
+ DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
reply.status = 0;
reply.errmsg = NULL;
req, struct client_state);
struct ctdbd_context *ctdb = state->ctdb;
struct ctdb_reply_control reply;
- struct srvid_register_state *rstate = NULL;
+ int ret;
reply.rdata.opcode = request->opcode;
- for (rstate = ctdb->rstate; rstate != NULL; rstate = rstate->next) {
- if (rstate->srvid == request->srvid) {
- break;
- }
- }
-
- if (rstate == NULL) {
+ ret = srvid_deregister(ctdb->srv, request->srvid, state);
+ if (ret != 0) {
reply.status = -1;
reply.errmsg = "srvid not registered";
goto fail;
}
- DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", rstate->srvid));
- talloc_free(rstate);
+ DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
reply.status = 0;
reply.errmsg = NULL;
- client_send_control(req, header, &reply);
- return;
-
fail:
- TALLOC_FREE(rstate);
client_send_control(req, header, &reply);
}
state->status = 99;
}
-static void control_get_monmode(TALLOC_CTX *mem_ctx,
- struct tevent_req *req,
- struct ctdb_req_header *header,
- struct ctdb_req_control *request)
-{
- struct client_state *state = tevent_req_data(
- req, struct client_state);
- struct ctdbd_context *ctdb = state->ctdb;
- struct ctdb_reply_control reply;
-
- reply.rdata.opcode = request->opcode;
- reply.status = ctdb->monitoring_mode;
- reply.errmsg = NULL;
-
- client_send_control(req, header, &reply);
-}
-
static void control_set_tunable(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
client_send_control(req, header, &reply);
}
+static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+
+ reply.rdata.opcode = request->opcode;
+
+ for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+ if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+ goto done;
+ }
+ }
+
+ db = database_new(ctdb->db_map, request->rdata.data.db_name,
+ CTDB_DB_FLAGS_PERSISTENT);
+ if (db == NULL) {
+ reply.status = -1;
+ reply.errmsg = "Failed to attach database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+done:
+ reply.rdata.data.db_id = db->id;
+ reply.status = 0;
+ reply.errmsg = NULL;
+ client_send_control(req, header, &reply);
+}
+
static void control_uptime(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
client_send_control(req, header, &reply);
}
-static void control_enable_monitor(TALLOC_CTX *mem_ctx,
- struct tevent_req *req,
- struct ctdb_req_header *header,
- struct ctdb_req_control *request)
-{
- struct client_state *state = tevent_req_data(
- req, struct client_state);
- struct ctdbd_context *ctdb = state->ctdb;
- struct ctdb_reply_control reply;
-
- ctdb->monitoring_mode = CTDB_MONITORING_ENABLED;
-
- reply.rdata.opcode = request->opcode;
- reply.status = 0;
- reply.errmsg = NULL;
- client_send_control(req, header, &reply);
-}
-
-static void control_disable_monitor(TALLOC_CTX *mem_ctx,
- struct tevent_req *req,
- struct ctdb_req_header *header,
- struct ctdb_req_control *request)
-{
- struct client_state *state = tevent_req_data(
- req, struct client_state);
- struct ctdbd_context *ctdb = state->ctdb;
- struct ctdb_reply_control reply;
-
- ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
-
- reply.rdata.opcode = request->opcode;
- reply.status = 0;
- reply.errmsg = NULL;
- client_send_control(req, header, &reply);
-}
-
static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
struct ctdb_reply_control reply;
struct ctdb_node_map *nodemap;
struct node_map *node_map = ctdb->node_map;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
}
if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
+ int ret;
+
node = &node_map->node[i];
node->flags |= NODE_FLAGS_DELETED;
- parse_ip("0.0.0.0", NULL, 0, &node->addr);
+ ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
+ false);
+ if (ret != 0) {
+ /* Can't happen, but Coverity... */
+ goto fail;
+ }
continue;
}
struct ctdb_reply_control reply;
struct ctdb_public_ip_list *ips = NULL;
struct ctdb_public_ip *t = NULL;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
if (ctdb->known_ips == NULL) {
D_INFO("RELEASE_IP %s - not a public IP\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
goto done;
}
}
if (t == NULL) {
D_INFO("RELEASE_IP %s - not a public IP\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
goto done;
}
if (t->pnn != header->destnode) {
if (header->destnode == ip->pnn) {
D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr),
+ ctdb_sock_addr_to_string(mem_ctx,
+ &ip->addr, false),
ip->pnn);
reply.status = -1;
reply.errmsg = "RELEASE_IP to TAKE_IP node";
}
D_INFO("RELEASE_IP %s - to node %d - redundant\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr),
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
ip->pnn);
t->pnn = ip->pnn;
} else {
D_NOTICE("RELEASE_IP %s - to node %d\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr),
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
ip->pnn);
t->pnn = ip->pnn;
}
struct ctdb_reply_control reply;
struct ctdb_public_ip_list *ips = NULL;
struct ctdb_public_ip *t = NULL;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
if (ctdb->known_ips == NULL) {
D_INFO("TAKEOVER_IP %s - not a public IP\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
goto done;
}
}
if (t == NULL) {
D_INFO("TAKEOVER_IP %s - not a public IP\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
goto done;
}
if (t->pnn == header->destnode) {
D_INFO("TAKEOVER_IP %s - redundant\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
} else {
D_NOTICE("TAKEOVER_IP %s\n",
- ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+ ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
t->pnn = ip->pnn;
}
* no available IPs. Don't worry about interface
* states here - we're not faking down to that level.
*/
- if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
+ uint32_t flags = ctdb->node_map->node[header->destnode].flags;
+ if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
+ ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
/* No available IPs: return dummy empty struct */
ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
if (ips == NULL) {
struct ctdb_reply_control reply;
struct ctdb_node_map *nodemap;
struct node *node;
- int i;
+ unsigned int i;
reply.rdata.opcode = request->opcode;
reply.rdata.opcode = request->opcode;
DEBUG(DEBUG_INFO, ("Stopping node\n"));
- ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
reply.status = 0;
reply.errmsg = "Failed to ban node";
}
+static void control_trans3_commit(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+ int ret;
+
+ reply.rdata.opcode = request->opcode;
+
+ db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
+ if (db == NULL) {
+ reply.status = -1;
+ reply.errmsg = "Unknown database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ if (! (db->flags &
+ (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
+ reply.status = -1;
+ reply.errmsg = "Transactions on volatile database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ ret = ltdb_transaction(db, request->rdata.data.recbuf);
+ if (ret != 0) {
+ reply.status = -1;
+ reply.errmsg = "Transaction failed";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ reply.status = 0;
+ reply.errmsg = NULL;
+ client_send_control(req, header, &reply);
+}
+
static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
struct ctdbd_context *ctdb = state->ctdb;
struct ctdb_reply_control reply;
struct database *db;
+ int ret;
reply.rdata.opcode = request->opcode;
reply.status = ENOENT;
reply.errmsg = "Database not found";
} else {
- reply.rdata.data.seqnum = db->seq_num;
- reply.status = 0;
- reply.errmsg = NULL;
+ uint64_t seqnum;
+
+ ret = database_seqnum(db, &seqnum);
+ if (ret == 0) {
+ reply.rdata.data.seqnum = seqnum;
+ reply.status = 0;
+ reply.errmsg = NULL;
+ } else {
+ reply.status = ret;
+ reply.errmsg = "Failed to get seqnum";
+ }
}
client_send_control(req, header, &reply);
{
struct ctdb_iface_list *iface_list;
struct interface *iface;
- int i;
+ unsigned int i;
iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
if (iface_list == NULL) {
if (i == known->num) {
D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
- ctdb_sock_addr_to_string(mem_ctx, addr));
+ ctdb_sock_addr_to_string(mem_ctx, addr, false));
reply.status = -1;
reply.errmsg = "Unknown address";
goto done;
client_send_control(req, header, &reply);
}
+struct traverse_start_ext_state {
+ struct tevent_req *req;
+ struct ctdb_req_header *header;
+ uint32_t reqid;
+ uint64_t srvid;
+ bool withemptyrecords;
+ int status;
+};
+
+static int traverse_start_ext_handler(struct tdb_context *tdb,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct traverse_start_ext_state *state =
+ (struct traverse_start_ext_state *)private_data;
+ struct ctdb_rec_data rec;
+ struct ctdb_req_message_data message;
+ size_t np;
+
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ return 0;
+ }
+
+ if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
+ (!state->withemptyrecords)) {
+ return 0;
+ }
+
+ rec = (struct ctdb_rec_data) {
+ .reqid = state->reqid,
+ .header = NULL,
+ .key = key,
+ .data = data,
+ };
+
+ message.srvid = state->srvid;
+ message.data.dsize = ctdb_rec_data_len(&rec);
+ message.data.dptr = talloc_size(state->req, message.data.dsize);
+ if (message.data.dptr == NULL) {
+ state->status = ENOMEM;
+ return 1;
+ }
+
+ ctdb_rec_data_push(&rec, message.data.dptr, &np);
+ client_send_message(state->req, state->header, &message);
+
+ talloc_free(message.data.dptr);
+
+ return 0;
+}
+
+static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+ struct ctdb_traverse_start_ext *ext;
+ struct traverse_start_ext_state t_state;
+ struct ctdb_rec_data rec;
+ struct ctdb_req_message_data message;
+ uint8_t buffer[32];
+ size_t np;
+ int ret;
+
+ reply.rdata.opcode = request->opcode;
+
+ ext = request->rdata.data.traverse_start_ext;
+
+ db = database_find(ctdb->db_map, ext->db_id);
+ if (db == NULL) {
+ reply.status = -1;
+ reply.errmsg = "Unknown database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+ t_state = (struct traverse_start_ext_state) {
+ .req = req,
+ .header = header,
+ .reqid = ext->reqid,
+ .srvid = ext->srvid,
+ .withemptyrecords = ext->withemptyrecords,
+ };
+
+ ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
+ DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
+ if (t_state.status != 0) {
+ reply.status = -1;
+ reply.errmsg = "Memory error";
+ client_send_control(req, header, &reply);
+ }
+
+ reply.status = 0;
+ client_send_control(req, header, &reply);
+
+ rec = (struct ctdb_rec_data) {
+ .reqid = ext->reqid,
+ .header = NULL,
+ .key = tdb_null,
+ .data = tdb_null,
+ };
+
+ message.srvid = ext->srvid;
+ message.data.dsize = ctdb_rec_data_len(&rec);
+ ctdb_rec_data_push(&rec, buffer, &np);
+ message.data.dptr = buffer;
+ client_send_message(req, header, &message);
+}
+
static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
client_send_control(req, header, &reply);
}
+static void control_ipreallocated(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct ctdb_reply_control reply;
+
+ /* Always succeed */
+ reply.rdata.opcode = request->opcode;
+ reply.status = 0;
+ reply.errmsg = NULL;
+
+ client_send_control(req, header, &reply);
+}
+
static void control_get_runstate(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
client_send_control(req, header, &reply);
}
+static void control_db_open_flags(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+
+ reply.rdata.opcode = request->opcode;
+
+ db = database_find(ctdb->db_map, request->rdata.data.db_id);
+ if (db == NULL) {
+ reply.status = ENOENT;
+ reply.errmsg = "Database not found";
+ } else {
+ reply.rdata.data.tdb_flags = database_flags(db->flags);
+ reply.status = 0;
+ reply.errmsg = NULL;
+ }
+
+ client_send_control(req, header, &reply);
+}
+
+static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct database *db;
+
+ reply.rdata.opcode = request->opcode;
+
+ for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+ if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+ goto done;
+ }
+ }
+
+ db = database_new(ctdb->db_map, request->rdata.data.db_name,
+ CTDB_DB_FLAGS_REPLICATED);
+ if (db == NULL) {
+ reply.status = -1;
+ reply.errmsg = "Failed to attach database";
+ client_send_control(req, header, &reply);
+ return;
+ }
+
+done:
+ reply.rdata.data.db_id = db->id;
+ reply.status = 0;
+ reply.errmsg = NULL;
+ client_send_control(req, header, &reply);
+}
+
+static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_client *client;
+ struct client_state *cstate;
+ struct ctdb_reply_control reply;
+ bool pid_found, srvid_found;
+ int ret;
+
+ reply.rdata.opcode = request->opcode;
+
+ pid_found = false;
+ srvid_found = false;
+
+ for (client=ctdb->client_list; client != NULL; client=client->next) {
+ if (client->pid == request->rdata.data.pid_srvid->pid) {
+ pid_found = true;
+ cstate = (struct client_state *)client->state;
+ ret = srvid_exists(ctdb->srv,
+ request->rdata.data.pid_srvid->srvid,
+ cstate);
+ if (ret == 0) {
+ srvid_found = true;
+ ret = kill(cstate->pid, 0);
+ if (ret != 0) {
+ reply.status = ret;
+ reply.errmsg = strerror(errno);
+ } else {
+ reply.status = 0;
+ reply.errmsg = NULL;
+ }
+ }
+ }
+ }
+
+ if (! pid_found) {
+ reply.status = -1;
+ reply.errmsg = "No client for PID";
+ } else if (! srvid_found) {
+ reply.status = -1;
+ reply.errmsg = "No client for PID and SRVID";
+ }
+
+ client_send_control(req, header, &reply);
+}
+
+static bool fake_control_failure(TALLOC_CTX *mem_ctx,
+ struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_req_control *request)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct ctdb_reply_control reply;
+ struct fake_control_failure *f = NULL;
+
+ D_DEBUG("Checking fake control failure for control %u on node %u\n",
+ request->opcode, header->destnode);
+ for (f = ctdb->control_failures; f != NULL; f = f->next) {
+ if (f->opcode == request->opcode &&
+ (f->pnn == header->destnode ||
+ f->pnn == CTDB_UNKNOWN_PNN)) {
+
+ reply.rdata.opcode = request->opcode;
+ if (strcmp(f->error, "TIMEOUT") == 0) {
+ /* Causes no reply */
+ D_ERR("Control %u fake timeout on node %u\n",
+ request->opcode, header->destnode);
+ return true;
+ } else if (strcmp(f->error, "ERROR") == 0) {
+ D_ERR("Control %u fake error on node %u\n",
+ request->opcode, header->destnode);
+ reply.status = -1;
+ reply.errmsg = f->comment;
+ client_send_control(req, header, &reply);
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
static void control_error(TALLOC_CTX *mem_ctx,
struct tevent_req *req,
struct ctdb_req_header *header,
{
struct ctdb_reply_control reply;
+ D_DEBUG("Control %u not implemented\n", request->opcode);
+
reply.rdata.opcode = request->opcode;
reply.status = -1;
reply.errmsg = "Not implemented";
static void client_dead_handler(void *private_data);
static void client_process_packet(struct tevent_req *req,
uint8_t *buf, size_t buflen);
+static void client_process_call(struct tevent_req *req,
+ uint8_t *buf, size_t buflen);
static void client_process_message(struct tevent_req *req,
uint8_t *buf, size_t buflen);
static void client_process_control(struct tevent_req *req,
state->ctdb = ctdb;
state->pnn = pnn;
+ (void) ctdb_get_peer_pid(fd, &state->pid);
+
ret = comm_setup(state, ev, fd, client_read_handler, req,
client_dead_handler, req, &state->comm);
if (ret != 0) {
return tevent_req_post(req, ev);
}
+ ret = client_add(ctdb, state->pid, state);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return tevent_req_post(req, ev);
+ }
+
DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
return req;
req, struct client_state);
struct ctdbd_context *ctdb = state->ctdb;
struct ctdb_req_header header;
- int ret, i;
+ size_t np;
+ unsigned int i;
+ int ret;
- ret = ctdb_req_header_pull(buf, buflen, &header);
+ ret = ctdb_req_header_pull(buf, buflen, &header, &np);
if (ret != 0) {
return;
}
for (i=0; i<ctdb->node_map->num_nodes; i++) {
header.destnode = i;
- ctdb_req_header_push(&header, buf);
+ ctdb_req_header_push(&header, buf, &np);
client_process_packet(req, buf, buflen);
}
return;
header.destnode = i;
- ctdb_req_header_push(&header, buf);
+ ctdb_req_header_push(&header, buf, &np);
client_process_packet(req, buf, buflen);
}
return;
return;
}
- ctdb_req_header_push(&header, buf);
+ ctdb_req_header_push(&header, buf, &np);
client_process_packet(req, buf, buflen);
}
uint8_t *buf, size_t buflen)
{
struct ctdb_req_header header;
+ size_t np;
int ret;
- ret = ctdb_req_header_pull(buf, buflen, &header);
+ ret = ctdb_req_header_pull(buf, buflen, &header, &np);
if (ret != 0) {
return;
}
switch (header.operation) {
+ case CTDB_REQ_CALL:
+ client_process_call(req, buf, buflen);
+ break;
+
case CTDB_REQ_MESSAGE:
client_process_message(req, buf, buflen);
break;
}
}
+static void client_process_call(struct tevent_req *req,
+ uint8_t *buf, size_t buflen)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ TALLOC_CTX *mem_ctx;
+ struct ctdb_req_header header;
+ struct ctdb_req_call request;
+ struct ctdb_reply_call reply;
+ struct database *db;
+ struct ctdb_ltdb_header hdr;
+ TDB_DATA data;
+ int ret;
+
+ mem_ctx = talloc_new(state);
+ if (tevent_req_nomem(mem_ctx, req)) {
+ return;
+ }
+
+ ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
+ if (ret != 0) {
+ talloc_free(mem_ctx);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ header_fix_pnn(&header, ctdb);
+
+ if (header.destnode >= ctdb->node_map->num_nodes) {
+ goto fail;
+ }
+
+ DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
+
+ db = database_find(ctdb->db_map, request.db_id);
+ if (db == NULL) {
+ goto fail;
+ }
+
+ ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
+ if (ret != 0) {
+ goto fail;
+ }
+
+ /* Fake migration */
+ if (hdr.dmaster != ctdb->node_map->pnn) {
+ hdr.dmaster = ctdb->node_map->pnn;
+
+ ret = ltdb_store(db, request.key, &hdr, data);
+ if (ret != 0) {
+ goto fail;
+ }
+ }
+
+ talloc_free(mem_ctx);
+
+ reply.status = 0;
+ reply.data = tdb_null;
+
+ client_send_call(req, &header, &reply);
+ return;
+
+fail:
+ talloc_free(mem_ctx);
+ reply.status = -1;
+ reply.data = tdb_null;
+
+ client_send_call(req, &header, &reply);
+}
+
static void client_process_message(struct tevent_req *req,
uint8_t *buf, size_t buflen)
{
message_disable_recoveries(mem_ctx, req, &header, &request);
} else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
message_takeover_run(mem_ctx, req, &header, &request);
+ } else {
+ D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
}
/* check srvid */
DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
request.opcode, header.reqid));
+ if (fake_control_failure(mem_ctx, req, &header, &request)) {
+ goto done;
+ }
+
switch (request.opcode) {
case CTDB_CONTROL_PROCESS_EXISTS:
control_process_exists(mem_ctx, req, &header, &request);
control_set_recmode(mem_ctx, req, &header, &request);
break;
+ case CTDB_CONTROL_DB_ATTACH:
+ control_db_attach(mem_ctx, req, &header, &request);
+ break;
+
case CTDB_CONTROL_REGISTER_SRVID:
control_register_srvid(mem_ctx, req, &header, &request);
break;
control_shutdown(mem_ctx, req, &header, &request);
break;
- case CTDB_CONTROL_GET_MONMODE:
- control_get_monmode(mem_ctx, req, &header, &request);
- break;
-
case CTDB_CONTROL_SET_TUNABLE:
control_set_tunable(mem_ctx, req, &header, &request);
break;
control_get_all_tunables(mem_ctx, req, &header, &request);
break;
- case CTDB_CONTROL_UPTIME:
- control_uptime(mem_ctx, req, &header, &request);
- break;
-
- case CTDB_CONTROL_ENABLE_MONITOR:
- control_enable_monitor(mem_ctx, req, &header, &request);
+ case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
+ control_db_attach_persistent(mem_ctx, req, &header, &request);
break;
- case CTDB_CONTROL_DISABLE_MONITOR:
- control_disable_monitor(mem_ctx, req, &header, &request);
+ case CTDB_CONTROL_UPTIME:
+ control_uptime(mem_ctx, req, &header, &request);
break;
case CTDB_CONTROL_RELOAD_NODES_FILE:
control_set_ban_state(mem_ctx, req, &header, &request);
break;
+ case CTDB_CONTROL_TRANS3_COMMIT:
+ control_trans3_commit(mem_ctx, req, &header, &request);
+ break;
+
case CTDB_CONTROL_GET_DB_SEQNUM:
control_get_db_seqnum(mem_ctx, req, &header, &request);
break;
control_set_db_readonly(mem_ctx, req, &header, &request);
break;
+ case CTDB_CONTROL_TRAVERSE_START_EXT:
+ control_traverse_start_ext(mem_ctx, req, &header, &request);
+ break;
+
case CTDB_CONTROL_SET_DB_STICKY:
control_set_db_sticky(mem_ctx, req, &header, &request);
break;
+ case CTDB_CONTROL_IPREALLOCATED:
+ control_ipreallocated(mem_ctx, req, &header, &request);
+ break;
+
case CTDB_CONTROL_GET_RUNSTATE:
control_get_runstate(mem_ctx, req, &header, &request);
break;
control_get_nodes_file(mem_ctx, req, &header, &request);
break;
+ case CTDB_CONTROL_DB_OPEN_FLAGS:
+ control_db_open_flags(mem_ctx, req, &header, &request);
+ break;
+
+ case CTDB_CONTROL_DB_ATTACH_REPLICATED:
+ control_db_attach_replicated(mem_ctx, req, &header, &request);
+ break;
+
+ case CTDB_CONTROL_CHECK_PID_SRVID:
+ control_check_pid_srvid(mem_ctx, req, &header, &request);
+ break;
+
default:
if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
control_error(mem_ctx, req, &header, &request);
break;
}
+done:
talloc_free(mem_ctx);
}
}
static struct options {
+ const char *dbdir;
const char *sockpath;
const char *pidfile;
const char *debuglevel;
} options;
static struct poptOption cmdline_options[] = {
+ POPT_AUTOHELP
+ { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
+ "Database directory", "directory" },
{ "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
"Unix domain socket path", "filename" },
{ "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
"pid file", "filename" } ,
{ "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
"debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
+ POPT_TABLEEND
};
static void cleanup(void)
exit(1);
}
+ if (options.dbdir == NULL) {
+ fprintf(stderr, "Please specify database directory\n");
+ poptPrintHelp(pc, stdout, 0);
+ exit(1);
+ }
+
if (options.sockpath == NULL) {
fprintf(stderr, "Please specify socket path\n");
poptPrintHelp(pc, stdout, 0);
exit(1);
}
- ctdb = ctdbd_setup(mem_ctx);
+ ctdb = ctdbd_setup(mem_ctx, options.dbdir);
if (ctdb == NULL) {
exit(1);
}