return 0;
}
+/*
+ send a fetch lock request from the client to the daemon. This just asks for
+ a record migration to the current node
+ */
static struct ctdb_fetch_lock_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db,
TALLOC_CTX *mem_ctx,
- TDB_DATA key,
- struct ctdb_ltdb_header *header)
+ TDB_DATA key)
{
struct ctdb_fetch_lock_state *state;
struct ctdb_context *ctdb = ctdb_db->ctdb;
req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
req->db_id = ctdb_db->db_id;
req->keylen = key.dsize;
- req->header = *header;
memcpy(&req->key[0], key.dptr, key.dsize);
talloc_set_destructor(state, ctdb_fetch_lock_destructor);
This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the
results. This call will block unless the call has already completed.
*/
-int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state, TALLOC_CTX *mem_ctx,
- TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
+int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state)
{
while (state->state < CTDB_FETCH_LOCK_DONE) {
event_loop_once(state->ctdb_db->ctdb->ev);
talloc_free(state);
return -1;
}
-
- *header = state->r->header;
- data->dsize = state->r->datalen;
- data->dptr = talloc_memdup(mem_ctx, state->r->data, data->dsize);
-
talloc_free(state);
-
return 0;
}
3) if we are the dmaster then return handle
4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
reply from ctdbd
- 5) when we get the reply, we are now dmaster, update vnn in header
- 6) return handle
+ 5) when we get the reply, goto (1)
*/
h = talloc_zero(mem_ctx, struct ctdb_record_handle);
DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize,
(const char *)key.dptr));
+again:
/* step 1 - get the chain lock */
ret = ctdb_ltdb_lock(ctdb_db, key);
if (ret != 0) {
ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
talloc_free(h);
return NULL;
}
DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
- /* step 2 - check if we are the dmaster */
- if (h->header.dmaster == ctdb_db->ctdb->vnn) {
- DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
- return h;
- }
-
- /* we're not the dmaster - ask the ctdb daemon to make us dmaster */
- state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key, &h->header);
- DEBUG(4,("ctdb_fetch_lock: done fetch_lock_send\n"));
- ret = ctdb_client_fetch_lock_recv(state, mem_ctx, key, &h->header, data);
- if (ret != 0) {
- DEBUG(4,("ctdb_fetch_lock: fetch_lock_recv failed\n"));
- talloc_free(h);
- return NULL;
- }
-
- DEBUG(4,("ctdb_fetch_lock: record is now local\n"));
-
- /* the record is now local, and locked. update the record on disk
- to mark us as the dmaster*/
- h->header.dmaster = ctdb_db->ctdb->vnn;
- ret = ctdb_ltdb_store(ctdb_db, key, &h->header, *data);
- if (ret != 0) {
- DEBUG(0, (__location__" can't update record to mark us as dmaster\n"));
- talloc_free(h);
- return NULL;
- }
-
- DEBUG(4,("ctdb_fetch_lock: done\n"));
-
- /* give the caller a handle to be used for ctdb_record_store() or a cancel via
- a talloc_free() */
+ if (h->header.dmaster != ctdb_db->ctdb->vnn) {
+ /* we're not the dmaster - ask the ctdb daemon to make us dmaster */
+ state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
+ ctdb_ltdb_unlock(ctdb_db, key);
+ DEBUG(4,("ctdb_fetch_lock: done fetch_lock_send\n"));
+ ret = ctdb_client_fetch_lock_recv(state);
+ if (ret != 0) {
+ DEBUG(4,("ctdb_fetch_lock: fetch_lock_recv failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ goto again;
+ }
+
+ DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
return h;
}
}
+/*
+ send a fetch lock request (actually a ctdb_call()) to a remote node
+ */
static struct ctdb_call_state *ctdb_daemon_fetch_lock_send(struct ctdb_db_context *ctdb_db,
TALLOC_CTX *mem_ctx,
TDB_DATA key, struct ctdb_ltdb_header *header,
return state;
}
-struct client_fetch_lock_data {
- struct ctdb_client *client;
- uint32_t reqid;
-};
-
-static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
-{
- struct ctdb_reply_fetch_lock *r;
- struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data);
- struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
- int length, res;
-
- length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize;
- r = ctdbd_allocate_pkt(client->ctdb, length);
- if (r == NULL) {
- DEBUG(0,(__location__ " Failed to allocate reply_call in ctdb daemon\n"));
- return;
- }
- memset(r, 0, offsetof(struct ctdb_reply_fetch_lock, data));
- r->hdr.length = length;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REPLY_FETCH_LOCK;
- r->hdr.reqid = data->reqid;
- r->state = state->state;
- r->datalen = state->call.reply_data.dsize;
- memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen);
-
- res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
- if (res != 0) {
- DEBUG(0,(__location__ " Failed to queue packet from daemon to client\n"));
- }
- talloc_free(r);
- talloc_free(state);
-}
-
/*
called when the daemon gets a shutdown request from a client
*/
}
+struct client_fetch_lock_data {
+ struct ctdb_client *client;
+ struct ctdb_req_fetch_lock *f;
+};
+
/*
send a fetch lock error reply to the client
*/
-static void daemon_fetch_lock_error(struct ctdb_client *client,
- struct ctdb_req_fetch_lock *f)
+static void daemon_fetch_lock_reply(struct ctdb_client *client,
+ struct ctdb_req_fetch_lock *f,
+ int state)
{
struct ctdb_reply_fetch_lock r;
r.hdr.ctdb_version = CTDB_VERSION;
r.hdr.operation = CTDB_REPLY_FETCH_LOCK;
r.hdr.reqid = f->hdr.reqid;
- r.state = -1;
+ r.state = state;
- /*
- * Ignore the result, there's not much we can do anyway.
- */
- ctdb_queue_send(client->queue, (uint8_t *)&r.hdr,
- r.hdr.length);
+ ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
}
+/*
+ called when a remote fetch lock finishes
+ */
+static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
+{
+ struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data,
+ struct client_fetch_lock_data);
+ struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
+
+ daemon_fetch_lock_reply(client, data->f, 0);
+ talloc_free(state);
+}
+
+
+
/*
called when the daemon gets a fetch lock request from a client
*/
TDB_DATA key, *data;
struct ctdb_db_context *ctdb_db;
struct client_fetch_lock_data *fl_data;
+ struct ctdb_ltdb_header header;
+ int ret;
ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
if (ctdb_db == NULL) {
- daemon_fetch_lock_error(client, f);
+ daemon_fetch_lock_reply(client, f, -1);
return;
}
- if (!ctdb_validate_vnn(client->ctdb, f->header.dmaster)) {
- DEBUG(0,(__location__ " Invalid dmaster %u\n", f->header.dmaster));
- daemon_fetch_lock_error(client, f);
+ key.dsize = f->keylen;
+ key.dptr = &f->key[0];
+
+ /* XXX - needs non-blocking lock here */
+
+ ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, NULL);
+ if (ret != 0) {
+ daemon_fetch_lock_reply(client, f, -1);
return;
}
- key.dsize = f->keylen;
- key.dptr = &f->key[0];
+ if (header.dmaster == ctdb_db->ctdb->vnn) {
+ /* we already are the dmaster */
+ daemon_fetch_lock_reply(client, f, 0);
+ return;
+ }
data = talloc(client, TDB_DATA);
data->dptr = NULL;
data->dsize = 0;
- state = ctdb_daemon_fetch_lock_send(ctdb_db, client, key, &f->header, data);
+ state = ctdb_daemon_fetch_lock_send(ctdb_db, client, key, &header, data);
talloc_steal(state, data);
fl_data = talloc(state, struct client_fetch_lock_data);
fl_data->client = client;
- fl_data->reqid = f->hdr.reqid;
+ fl_data->f = talloc_steal(fl_data, f);
+
state->async.fn = daemon_fetch_lock_complete;
state->async.private_data = fl_data;
}
case CTDB_REQ_CONNECT_WAIT:
daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
break;
+
case CTDB_REQ_FETCH_LOCK:
daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
break;
+
case CTDB_REQ_SHUTDOWN:
daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
break;
+
default:
DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
hdr->operation));