From 03c49c052640673697684489c1fd3ac9096cede7 Mon Sep 17 00:00:00 2001 From: Ronnie sahlberg Date: Fri, 13 Apr 2007 09:41:15 +1000 Subject: [PATCH] add store_unlock pdu's for the domain socket. note that the store_unlock does not actually do anything yet apart from passing the pdu from client to daemon and daemon responds. next is to make sure the daemon actually stores the data in a database (This used to be ctdb commit 167d6993e78f6a1d0f6607ef66925a14993ae6a1) --- ctdb/common/ctdb_call.c | 5 ++ ctdb/common/ctdb_client.c | 117 ++++++++++++++++++++++++++++++++++++ ctdb/common/ctdb_daemon.c | 38 ++++++++++++ ctdb/include/ctdb_private.h | 21 ++++++- ctdb/tests/ctdb_fetch1.c | 7 ++- 5 files changed, 186 insertions(+), 2 deletions(-) diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c index 5938cacab2a..0b7b3bc6c81 100644 --- a/ctdb/common/ctdb_call.c +++ b/ctdb/common/ctdb_call.c @@ -786,6 +786,11 @@ int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) { int ret; struct ctdb_ltdb_header header; + struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context); + + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_store_unlock(rec, data); + } /* should be avoided if possible hang header off rec ? */ ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL); diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c index 74e41d0d5b3..fc6cefb7f9d 100644 --- a/ctdb/common/ctdb_client.c +++ b/ctdb/common/ctdb_client.c @@ -78,6 +78,33 @@ void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hd } } +/* + called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon + + This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} /* this is called in the client, when data comes in from the daemon */ @@ -124,6 +151,10 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) ctdb_reply_fetch_lock(ctdb, hdr); break; + case CTDB_REPLY_STORE_UNLOCK: + ctdb_reply_store_unlock(ctdb, hdr); + break; + default: printf("bogus operation code:%d\n",hdr->operation); } @@ -494,6 +525,61 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb } +struct ctdb_call_state *ctdb_client_store_unlock_send( + struct ctdb_record_handle *rh, + TALLOC_CTX *mem_ctx, + TDB_DATA data) +{ + struct ctdb_call_state *state; + struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context); + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_store_unlock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_store_unlock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_store_unlock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_STORE_UNLOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = rh->key.dsize; + req->datalen = data.dsize; + memcpy(&req->data[0], rh->key.dptr, rh->key.dsize); + memcpy(&req->data[req->keylen], data.dptr, data.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + /* make a recv call to the local ctdb daemon - called from client context @@ -526,6 +612,25 @@ struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *s return rec; } +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_store_unlock to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec) +{ + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + } + + talloc_free(state); + return state->state; +} + struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) @@ -538,3 +643,15 @@ struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_d return rec; } + +int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) +{ + struct ctdb_call_state *state; + int res; + + state = ctdb_client_store_unlock_send(rec, rec, data); + res = ctdb_client_store_unlock_recv(state, rec); + + return res; + +} diff --git a/ctdb/common/ctdb_daemon.c b/ctdb/common/ctdb_daemon.c index 824cfed936c..b727bd62b23 100644 --- a/ctdb/common/ctdb_daemon.c +++ b/ctdb/common/ctdb_daemon.c @@ -204,6 +204,39 @@ static void daemon_request_fetch_lock(struct ctdb_client *client, state->async.private = fl_data; } +/* + called when the daemon gets a store unlock request from a client + + this would never block? + */ +static void daemon_request_store_unlock(struct ctdb_client *client, + struct ctdb_req_store_unlock *f) +{ + struct ctdb_db_context *ctdb_db; + struct ctdb_reply_store_unlock r; + int res; + + ctdb_db = find_ctdb_db(client->ctdb, f->db_id); + /* write the data to ltdb */ +/*XXX*/ + + /* now send the reply */ + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REPLY_STORE_UNLOCK; + r.hdr.reqid = f->hdr.reqid; + r.state = CTDB_CALL_DONE; + + res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a store unlock response\n"); + return; + } +} + /* called when the daemon gets a connect wait request from a client */ @@ -364,6 +397,11 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_ case CTDB_REQ_FETCH_LOCK: daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr); break; + case CTDB_REQ_STORE_UNLOCK: + daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr); + break; + default: + printf("daemon: unrecognized operation:%d\n",hdr->operation); } done: diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index 4d4fa0b562d..10f85e670f1 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -217,7 +217,9 @@ enum ctdb_operation { CTDB_REQ_CONNECT_WAIT = 1001, CTDB_REPLY_CONNECT_WAIT = 1002, CTDB_REQ_FETCH_LOCK = 1003, - CTDB_REPLY_FETCH_LOCK = 1004 + CTDB_REPLY_FETCH_LOCK = 1004, + CTDB_REQ_STORE_UNLOCK = 1005, + CTDB_REPLY_STORE_UNLOCK = 1006 }; #define CTDB_MAGIC 0x43544442 /* CTDB */ @@ -315,6 +317,18 @@ struct ctdb_reply_fetch_lock { uint32_t datalen; uint8_t data[1]; /* data[] */ }; +struct ctdb_req_store_unlock { + struct ctdb_req_header hdr; + uint32_t db_id; + uint32_t keylen; + uint32_t datalen; + uint8_t data[1]; /* key[] and data[] */ +}; + +struct ctdb_reply_store_unlock { + struct ctdb_req_header hdr; + uint32_t state; +}; /* internal prototypes */ void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3); @@ -430,4 +444,9 @@ struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_d TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data); +/* + do a store unlock from a client to the local daemon +*/ +int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data); + #endif diff --git a/ctdb/tests/ctdb_fetch1.c b/ctdb/tests/ctdb_fetch1.c index 96b900ca43e..29a4fc9a62b 100644 --- a/ctdb/tests/ctdb_fetch1.c +++ b/ctdb/tests/ctdb_fetch1.c @@ -42,7 +42,7 @@ int main(int argc, const char *argv[]) const char *myaddress = NULL; int self_connect=0; int daemon_mode=0; - TDB_DATA key, *data; + TDB_DATA key, *data, store_data; struct ctdb_record_handle *rh; struct poptOption popt_options[] = { @@ -141,6 +141,11 @@ int main(int argc, const char *argv[]) data = NULL; rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, data); + store_data.dptr = "data to store"; + store_data.dsize = strlen(store_data.dptr)+1; + ret = ctdb_store_unlock(rh, store_data); + printf("ctdb_store_unlock ret:%d\n",ret); + while (1) { event_loop_once(ev); } -- 2.34.1