*/
#include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
#include "system/filesys.h"
#include "system/wait.h"
#include "db_wrap.h"
#include "lib/tdb/include/tdb.h"
#include "../include/ctdb_private.h"
+#include "lib/util/dlinklist.h"
typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
terminate the traverse
*/
struct ctdb_traverse_local_handle {
+ struct ctdb_traverse_local_handle *next, *prev;
struct ctdb_db_context *ctdb_db;
int fd[2];
pid_t child;
+ uint64_t srvid;
+ uint32_t client_reqid;
void *private_data;
ctdb_traverse_fn_t callback;
struct timeval start_time;
*/
static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
{
+ DLIST_REMOVE(h->ctdb_db->traverse, h);
kill(h->child, SIGKILL);
- waitpid(h->child, NULL, 0);
return 0;
}
struct ctdb_rec_data *d;
struct ctdb_ltdb_header *hdr;
- /* filter out non-authoritative and zero-length records */
hdr = (struct ctdb_ltdb_header *)data.dptr;
- if (data.dsize <= sizeof(struct ctdb_ltdb_header) ||
- hdr->dmaster != h->ctdb_db->ctdb->vnn) {
- return 0;
+
+ if (h->ctdb_db->persistent == 0) {
+ /* filter out zero-length records */
+ if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+ return 0;
+ }
+
+ /* filter out non-authoritative records */
+ if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
+ return 0;
+ }
}
- d = ctdb_marshall_record(h, 0, key, data);
+ d = ctdb_marshall_record(h, 0, key, NULL, data);
if (d == NULL) {
/* error handling is tricky in this child code .... */
return -1;
return 0;
}
+struct traverse_all_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_traverse_local_handle *h;
+ uint32_t reqid;
+ uint32_t srcnode;
+ uint32_t client_reqid;
+ uint64_t srvid;
+};
/*
setup a non-blocking traverse of a local ltdb. The callback function
*/
static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
ctdb_traverse_fn_t callback,
- void *private_data)
+ struct traverse_all_state *all_state)
{
struct ctdb_traverse_local_handle *h;
int ret;
- h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
+ h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
if (h == NULL) {
return NULL;
}
}
h->callback = callback;
- h->private_data = private_data;
+ h->private_data = all_state;
h->ctdb_db = ctdb_db;
+ h->client_reqid = all_state->client_reqid;
+ h->srvid = all_state->srvid;
if (h->child == 0) {
/* start the traverse in the child */
close(h->fd[0]);
+ debug_extra = talloc_asprintf(NULL, "traverse_local-%s:",
+ ctdb_db->db_name);
tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
_exit(0);
}
close(h->fd[1]);
+ set_close_on_exec(h->fd[0]);
+
talloc_set_destructor(h, traverse_local_destructor);
+ DLIST_ADD(ctdb_db->traverse, h);
+
/*
setup a packet queue between the child and the parent. This
copes with all the async and packet boundary issues
*/
- h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
+ DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
+
+ h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h,
+ "to-ctdbd");
if (h->queue == NULL) {
talloc_free(h);
return NULL;
struct ctdb_traverse_all_handle {
struct ctdb_context *ctdb;
+ struct ctdb_db_context *ctdb_db;
uint32_t reqid;
ctdb_traverse_fn_t callback;
void *private_data;
struct ctdb_traverse_all {
uint32_t db_id;
uint32_t reqid;
- uint32_t vnn;
+ uint32_t pnn;
+ uint32_t client_reqid;
+ uint64_t srvid;
};
/* called when a traverse times out */
{
struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
- state->ctdb->statistics.timeouts.traverse++;
+ DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
+ CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
state->callback(state->private_data, tdb_null, tdb_null);
- talloc_free(state);
}
+
+struct traverse_start_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_traverse_all_handle *h;
+ uint32_t srcnode;
+ uint32_t reqid;
+ uint32_t db_id;
+ uint64_t srvid;
+};
+
+
/*
setup a cluster-wide non-blocking traverse of a ctdb. The
callback function will be called on every record in the local
*/
static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
ctdb_traverse_fn_t callback,
- void *private_data)
+ struct traverse_start_state *start_state)
{
struct ctdb_traverse_all_handle *state;
struct ctdb_context *ctdb = ctdb_db->ctdb;
int ret;
TDB_DATA data;
struct ctdb_traverse_all r;
+ uint32_t destination;
- state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
+ state = talloc(start_state, struct ctdb_traverse_all_handle);
if (state == NULL) {
return NULL;
}
- state->ctdb = ctdb;
- state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
- state->callback = callback;
- state->private_data = private_data;
- state->null_count = 0;
+ state->ctdb = ctdb;
+ state->ctdb_db = ctdb_db;
+ state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
+ state->callback = callback;
+ state->private_data = start_state;
+ state->null_count = 0;
talloc_set_destructor(state, ctdb_traverse_all_destructor);
r.db_id = ctdb_db->db_id;
r.reqid = state->reqid;
- r.vnn = ctdb->vnn;
+ r.pnn = ctdb->pnn;
+ r.client_reqid = start_state->reqid;
+ r.srvid = start_state->srvid;
data.dptr = (uint8_t *)&r;
data.dsize = sizeof(r);
- /* tell all the nodes in the cluster to start sending records to this node */
- ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
- CTDB_CONTROL_TRAVERSE_ALL,
- 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ if (ctdb_db->persistent == 0) {
+ /* normal database, traverse all nodes */
+ destination = CTDB_BROADCAST_VNNMAP;
+ } else {
+ int i;
+ /* persistent database, traverse one node, preferably
+ * the local one
+ */
+ destination = ctdb->pnn;
+ /* check we are in the vnnmap */
+ for (i=0; i < ctdb->vnn_map->size; i++) {
+ if (ctdb->vnn_map->map[i] == ctdb->pnn) {
+ break;
+ }
+ }
+ /* if we are not in the vnn map we just pick the first
+ * node instead
+ */
+ if (i == ctdb->vnn_map->size) {
+ destination = ctdb->vnn_map->map[0];
+ }
+ }
+
+ /* tell all the nodes in the cluster to start sending records to this
+ * node, or if it is a persistent database, just tell the local
+ * node
+ */
+ ret = ctdb_daemon_send_control(ctdb, destination, 0,
+ CTDB_CONTROL_TRAVERSE_ALL,
+ 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+
if (ret != 0) {
talloc_free(state);
return NULL;
return state;
}
-struct traverse_all_state {
- struct ctdb_context *ctdb;
- struct ctdb_traverse_local_handle *h;
- uint32_t reqid;
- uint32_t srcnode;
-};
-
/*
called for each record during a traverse all
*/
struct ctdb_rec_data *d;
TDB_DATA cdata;
- d = ctdb_marshall_record(state, state->reqid, key, data);
+ d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
if (d == NULL) {
/* darn .... */
- DEBUG(0,("Out of memory in traverse_all_callback\n"));
+ DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
return;
}
ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
if (ret != 0) {
- DEBUG(0,("Failed to send traverse data\n"));
+ DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
}
if (key.dsize == 0 && data.dsize == 0) {
struct ctdb_db_context *ctdb_db;
if (data.dsize != sizeof(struct ctdb_traverse_all)) {
- DEBUG(0,("Invalid size in ctdb_control_traverse_all\n"));
+ DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
return -1;
}
return -1;
}
+ if (ctdb_db->unhealthy_reason) {
+ if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+ DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ }
+
state = talloc(ctdb_db, struct traverse_all_state);
if (state == NULL) {
return -1;
}
state->reqid = c->reqid;
- state->srcnode = c->vnn;
+ state->srcnode = c->pnn;
state->ctdb = ctdb;
+ state->client_reqid = c->client_reqid;
+ state->srvid = c->srvid;
state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
if (state->h == NULL) {
void *private_data;
if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
- DEBUG(0,("Bad record size in ctdb_control_traverse_data\n"));
+ DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
return -1;
}
if (key.dsize == 0 && data.dsize == 0) {
state->null_count++;
- if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
- return 0;
+ /* Persistent databases are only scanned on one node (the local
+ * node)
+ */
+ if (state->ctdb_db->persistent == 0) {
+ if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
+ return 0;
+ }
}
}
private_data = state->private_data;
callback(private_data, key, data);
- if (key.dsize == 0 && data.dsize == 0) {
- /* we've received all of the null replies, so all
- nodes are finished */
- talloc_free(state);
- }
return 0;
}
-struct traverse_start_state {
- struct ctdb_context *ctdb;
- struct ctdb_traverse_all_handle *h;
- uint32_t srcnode;
- uint32_t reqid;
- uint64_t srvid;
-};
+/*
+ kill a in-progress traverse, used when a client disconnects
+ */
+int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
+ TDB_DATA *outdata, uint32_t srcnode)
+{
+ struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_traverse_local_handle *t;
+
+ ctdb_db = find_ctdb_db(ctdb, d->db_id);
+ if (ctdb_db == NULL) {
+ return -1;
+ }
+
+ for (t=ctdb_db->traverse; t; t=t->next) {
+ if (t->client_reqid == d->reqid &&
+ t->srvid == d->srvid) {
+ talloc_free(t);
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+/*
+ this is called when a client disconnects during a traverse
+ we need to notify all the nodes taking part in the search that they
+ should kill their traverse children
+ */
+static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
+{
+ struct ctdb_traverse_start r;
+ TDB_DATA data;
+
+ DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
+ r.db_id = state->db_id;
+ r.reqid = state->reqid;
+ r.srvid = state->srvid;
+
+ data.dptr = (uint8_t *)&r;
+ data.dsize = sizeof(r);
+
+ ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
+ CTDB_CONTROL_TRAVERSE_KILL,
+ 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ return 0;
+}
/*
callback which sends records as messages to the client
state = talloc_get_type(p, struct traverse_start_state);
- d = ctdb_marshall_record(state, state->reqid, key, data);
+ d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
if (d == NULL) {
return;
}
ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
if (key.dsize == 0 && data.dsize == 0) {
/* end of traverse */
+ talloc_set_destructor(state, NULL);
talloc_free(state);
}
}
+
/*
start a traverse_all - called as a control from a client
*/
int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
- TDB_DATA *outdata, uint32_t srcnode)
+ TDB_DATA *outdata, uint32_t srcnode, uint32_t client_id)
{
struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
struct traverse_start_state *state;
struct ctdb_db_context *ctdb_db;
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
+ return -1;
+ }
if (data.dsize != sizeof(*d)) {
- DEBUG(0,("Bad record size in ctdb_control_traverse_start\n"));
+ DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
return -1;
}
return -1;
}
- state = talloc(ctdb_db, struct traverse_start_state);
+ if (ctdb_db->unhealthy_reason) {
+ if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+ DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ }
+
+ state = talloc(client, struct traverse_start_state);
if (state == NULL) {
return -1;
}
state->srcnode = srcnode;
state->reqid = d->reqid;
state->srvid = d->srvid;
+ state->db_id = d->db_id;
state->ctdb = ctdb;
state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
return -1;
}
+ talloc_set_destructor(state, ctdb_traverse_start_destructor);
+
return 0;
}