4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This library is free software; you can redistribute it and/or
8 modify it under the terms of the GNU Lesser General Public
9 License as published by the Free Software Foundation; either
10 version 2 of the License, or (at your option) any later version.
12 This library is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with this library; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
33 queue a packet for sending from client to daemon
35 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
37 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
42 handle a connect wait reply packet
44 static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
45 struct ctdb_req_header *hdr)
47 struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
48 ctdb->num_connected = r->num_connected;
51 enum fetch_lock_state { CTDB_FETCH_LOCK_WAIT, CTDB_FETCH_LOCK_DONE, CTDB_FETCH_LOCK_ERROR };
54 state of a in-progress ctdb call
56 struct ctdb_fetch_lock_state {
57 enum fetch_lock_state state;
58 struct ctdb_db_context *ctdb_db;
59 struct ctdb_reply_fetch_lock *r;
60 struct ctdb_req_fetch_lock *req;
61 struct ctdb_ltdb_header header;
67 called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon
69 This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It
70 contains any reply data from the call
72 void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
74 struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr;
75 struct ctdb_fetch_lock_state *state;
77 state = idr_find(ctdb->idr, hdr->reqid);
79 DEBUG(0, ("reqid %d not found at %s\n", hdr->reqid,
84 if (!talloc_get_type(state, struct ctdb_fetch_lock_state)) {
85 DEBUG(0, ("ctdb idr type error at %s, it's a %s\n",
86 __location__, talloc_get_name(state)));
90 state->r = talloc_steal(state, r);
92 state->state = CTDB_FETCH_LOCK_DONE;
96 this is called in the client, when data comes in from the daemon
98 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
100 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
101 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
104 /* place the packet as a child of a tmp_ctx. We then use
105 talloc_free() below to free it. If any of the calls want
106 to keep it, then they will steal it somewhere else, and the
107 talloc_free() will be a no-op */
108 tmp_ctx = talloc_new(ctdb);
109 talloc_steal(tmp_ctx, hdr);
112 DEBUG(2,("Daemon has exited - shutting down client\n"));
116 if (cnt < sizeof(*hdr)) {
117 DEBUG(0,("Bad packet length %d in client\n", cnt));
120 if (cnt != hdr->length) {
121 ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n",
126 if (hdr->ctdb_magic != CTDB_MAGIC) {
127 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
131 if (hdr->ctdb_version != CTDB_VERSION) {
132 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
136 switch (hdr->operation) {
137 case CTDB_REPLY_CALL:
138 ctdb_reply_call(ctdb, hdr);
141 case CTDB_REQ_MESSAGE:
142 ctdb_request_message(ctdb, hdr);
145 case CTDB_REPLY_CONNECT_WAIT:
146 ctdb_reply_connect_wait(ctdb, hdr);
149 case CTDB_REPLY_FETCH_LOCK:
150 ctdb_reply_fetch_lock(ctdb, hdr);
154 DEBUG(0,("bogus operation code:%d\n",hdr->operation));
158 talloc_free(tmp_ctx);
162 connect to a unix domain socket
164 static int ux_socket_connect(struct ctdb_context *ctdb)
166 struct sockaddr_un addr;
168 memset(&addr, 0, sizeof(addr));
169 addr.sun_family = AF_UNIX;
170 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
172 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
173 if (ctdb->daemon.sd == -1) {
177 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
178 close(ctdb->daemon.sd);
179 ctdb->daemon.sd = -1;
183 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
185 ctdb_client_read_cb, ctdb);
190 struct ctdb_record_handle {
191 struct ctdb_db_context *ctdb_db;
194 struct ctdb_ltdb_header header;
198 make a recv call to the local ctdb daemon - called from client context
200 This is called when the program wants to wait for a ctdb_call to complete and get the
201 results. This call will block unless the call has already completed.
203 int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
205 struct ctdb_record_handle *rec;
207 while (state->state < CTDB_CALL_DONE) {
208 event_loop_once(state->node->ctdb->ev);
210 if (state->state != CTDB_CALL_DONE) {
211 ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
216 rec = state->fetch_private;
218 /* ugly hack to manage forced migration */
220 rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
221 rec->data->dsize = state->call.reply_data.dsize;
226 if (state->call.reply_data.dsize) {
227 call->reply_data.dptr = talloc_memdup(state->node->ctdb,
228 state->call.reply_data.dptr,
229 state->call.reply_data.dsize);
230 call->reply_data.dsize = state->call.reply_data.dsize;
232 call->reply_data.dptr = NULL;
233 call->reply_data.dsize = 0;
235 call->status = state->call.status;
245 destroy a ctdb_call in client
247 static int ctdb_client_call_destructor(struct ctdb_call_state *state)
249 idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
256 make a ctdb call to the local daemon - async send. Called from client context.
258 This constructs a ctdb_call request and queues it for processing.
259 This call never blocks.
261 struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
262 struct ctdb_call *call)
264 struct ctdb_call_state *state;
265 struct ctdb_context *ctdb = ctdb_db->ctdb;
266 struct ctdb_ltdb_header header;
271 /* if the domain socket is not yet open, open it */
272 if (ctdb->daemon.sd==-1) {
273 ux_socket_connect(ctdb);
276 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
282 if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
283 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
284 talloc_free(data.dptr);
289 state = talloc_zero(ctdb_db, struct ctdb_call_state);
291 DEBUG(0, (__location__ " failed to allocate state\n"));
295 talloc_steal(state, data.dptr);
297 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
298 state->c = ctdbd_allocate_pkt(ctdb, len);
299 if (state->c == NULL) {
300 DEBUG(0, (__location__ " failed to allocate packet\n"));
303 talloc_set_name_const(state->c, "ctdbd req_call packet");
304 talloc_steal(state, state->c);
306 state->c->hdr.length = len;
307 state->c->hdr.ctdb_magic = CTDB_MAGIC;
308 state->c->hdr.ctdb_version = CTDB_VERSION;
309 state->c->hdr.operation = CTDB_REQ_CALL;
310 state->c->hdr.destnode = header.dmaster;
311 state->c->hdr.srcnode = ctdb->vnn;
312 /* this limits us to 16k outstanding messages - not unreasonable */
313 state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
314 state->c->flags = call->flags;
315 state->c->db_id = ctdb_db->db_id;
316 state->c->callid = call->call_id;
317 state->c->keylen = call->key.dsize;
318 state->c->calldatalen = call->call_data.dsize;
319 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
320 memcpy(&state->c->data[call->key.dsize],
321 call->call_data.dptr, call->call_data.dsize);
323 state->call.call_data.dptr = &state->c->data[call->key.dsize];
324 state->call.key.dptr = &state->c->data[0];
326 state->node = ctdb->nodes[header.dmaster];
327 state->state = CTDB_CALL_WAIT;
328 state->header = header;
329 state->ctdb_db = ctdb_db;
331 talloc_set_destructor(state, ctdb_client_call_destructor);
333 ctdb_client_queue_pkt(ctdb, &state->c->hdr);
335 /*XXX set up timeout to cleanup if server doesnt respond
336 event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
337 ctdb_call_timeout, state);
345 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
347 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
349 struct ctdb_call_state *state;
351 state = ctdb_call_send(ctdb_db, call);
352 return ctdb_call_recv(state, call);
357 tell the daemon what messaging srvid we will use, and register the message
358 handler function in the client
360 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
361 ctdb_message_fn_t handler,
365 struct ctdb_req_register c;
368 /* if the domain socket is not yet open, open it */
369 if (ctdb->daemon.sd==-1) {
370 ux_socket_connect(ctdb);
375 c.hdr.length = sizeof(c);
376 c.hdr.ctdb_magic = CTDB_MAGIC;
377 c.hdr.ctdb_version = CTDB_VERSION;
378 c.hdr.operation = CTDB_REQ_REGISTER;
381 res = ctdb_client_queue_pkt(ctdb, &c.hdr);
386 /* also need to register the handler with our ctdb structure */
387 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
392 send a message - from client context
394 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
395 uint32_t srvid, TDB_DATA data)
397 struct ctdb_req_message *r;
400 len = offsetof(struct ctdb_req_message, data) + data.dsize;
401 r = ctdb->methods->allocate_pkt(ctdb, len);
402 CTDB_NO_MEMORY(ctdb, r);
403 talloc_set_name_const(r, "req_message packet");
406 r->hdr.ctdb_magic = CTDB_MAGIC;
407 r->hdr.ctdb_version = CTDB_VERSION;
408 r->hdr.operation = CTDB_REQ_MESSAGE;
409 r->hdr.destnode = vnn;
410 r->hdr.srcnode = ctdb->vnn;
413 r->datalen = data.dsize;
414 memcpy(&r->data[0], data.dptr, data.dsize);
416 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
426 wait for all nodes to be connected - from client
428 void ctdb_connect_wait(struct ctdb_context *ctdb)
430 struct ctdb_req_connect_wait r;
435 r.hdr.length = sizeof(r);
436 r.hdr.ctdb_magic = CTDB_MAGIC;
437 r.hdr.ctdb_version = CTDB_VERSION;
438 r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
440 DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
442 /* if the domain socket is not yet open, open it */
443 if (ctdb->daemon.sd==-1) {
444 ux_socket_connect(ctdb);
447 res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
449 DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
453 DEBUG(3,("ctdb_connect_wait: waiting\n"));
455 /* now we can go into the normal wait routine, as the reply packet
456 will update the ctdb->num_connected variable */
457 ctdb_daemon_connect_wait(ctdb);
460 static int ctdb_fetch_lock_destructor(struct ctdb_fetch_lock_state *state)
462 idr_remove(state->ctdb_db->ctdb->idr, state->req->hdr.reqid);
466 static struct ctdb_fetch_lock_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db,
469 struct ctdb_ltdb_header *header)
471 struct ctdb_fetch_lock_state *state;
472 struct ctdb_context *ctdb = ctdb_db->ctdb;
473 struct ctdb_req_fetch_lock *req;
476 /* if the domain socket is not yet open, open it */
477 if (ctdb->daemon.sd==-1) {
478 ux_socket_connect(ctdb);
481 state = talloc_zero(ctdb_db, struct ctdb_fetch_lock_state);
483 DEBUG(0, (__location__ " failed to allocate state\n"));
486 state->state = CTDB_FETCH_LOCK_WAIT;
487 state->ctdb_db = ctdb_db;
488 len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
489 state->req = req = ctdbd_allocate_pkt(ctdb, len);
491 DEBUG(0, (__location__ " failed to allocate packet\n"));
495 talloc_set_name_const(req, "ctdbd req_fetch_lock packet");
496 talloc_steal(state, req);
498 req->hdr.length = len;
499 req->hdr.ctdb_magic = CTDB_MAGIC;
500 req->hdr.ctdb_version = CTDB_VERSION;
501 req->hdr.operation = CTDB_REQ_FETCH_LOCK;
502 req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
503 req->db_id = ctdb_db->db_id;
504 req->keylen = key.dsize;
505 req->header = *header;
506 memcpy(&req->key[0], key.dptr, key.dsize);
508 talloc_set_destructor(state, ctdb_fetch_lock_destructor);
510 res = ctdb_client_queue_pkt(ctdb, &req->hdr);
520 make a recv call to the local ctdb daemon - called from client context
522 This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the
523 results. This call will block unless the call has already completed.
525 int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state, TALLOC_CTX *mem_ctx,
526 TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
528 while (state->state < CTDB_FETCH_LOCK_DONE) {
529 event_loop_once(state->ctdb_db->ctdb->ev);
531 if (state->state != CTDB_FETCH_LOCK_DONE) {
536 *header = state->r->header;
537 data->dsize = state->r->datalen;
538 data->dptr = talloc_memdup(mem_ctx, state->r->data, data->dsize);
546 cancel a ctdb_fetch_lock operation, releasing the lock
548 static int fetch_lock_destructor(struct ctdb_record_handle *h)
550 ctdb_ltdb_unlock(h->ctdb_db, h->key);
555 get a lock on a record, and return the records data. Blocks until it gets the lock
557 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
558 TDB_DATA key, TDB_DATA *data)
561 struct ctdb_record_handle *h;
562 struct ctdb_fetch_lock_state *state;
565 procedure is as follows:
567 1) get the chain lock.
568 2) check if we are dmaster
569 3) if we are the dmaster then return handle
570 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
572 5) when we get the reply, we are now dmaster, update vnn in header
576 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
581 h->ctdb_db = ctdb_db;
583 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
584 if (h->key.dptr == NULL) {
590 DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize,
591 (const char *)key.dptr));
593 /* step 1 - get the chain lock */
594 ret = ctdb_ltdb_lock(ctdb_db, key);
596 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
601 DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
603 talloc_set_destructor(h, fetch_lock_destructor);
605 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
611 DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
613 /* step 2 - check if we are the dmaster */
614 if (h->header.dmaster == ctdb_db->ctdb->vnn) {
615 DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
619 /* we're not the dmaster - ask the ctdb daemon to make us dmaster */
620 state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key, &h->header);
621 DEBUG(4,("ctdb_fetch_lock: done fetch_lock_send\n"));
622 ret = ctdb_client_fetch_lock_recv(state, mem_ctx, key, &h->header, data);
624 DEBUG(4,("ctdb_fetch_lock: fetch_lock_recv failed\n"));
629 DEBUG(4,("ctdb_fetch_lock: record is now local\n"));
631 /* the record is now local, and locked. update the record on disk
632 to mark us as the dmaster*/
633 h->header.dmaster = ctdb_db->ctdb->vnn;
634 ret = ctdb_ltdb_store(ctdb_db, key, &h->header, *data);
636 DEBUG(0, (__location__" can't update record to mark us as dmaster\n"));
641 DEBUG(4,("ctdb_fetch_lock: done\n"));
643 /* give the caller a handle to be used for ctdb_record_store() or a cancel via
649 store some data to the record that was locked with ctdb_fetch_lock()
651 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
653 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
657 wait until we're the only node left.
658 this function never returns
660 void ctdb_shutdown(struct ctdb_context *ctdb)
662 struct ctdb_req_shutdown r;
665 /* if the domain socket is not yet open, open it */
666 if (ctdb->daemon.sd==-1) {
667 ux_socket_connect(ctdb);
670 len = sizeof(struct ctdb_req_shutdown);
673 r.hdr.ctdb_magic = CTDB_MAGIC;
674 r.hdr.ctdb_version = CTDB_VERSION;
675 r.hdr.operation = CTDB_REQ_SHUTDOWN;
678 ctdb_client_queue_pkt(ctdb, &(r.hdr));
680 /* this event loop will terminate once we receive the reply */
682 event_loop_once(ctdb->ev);