4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This library is free software; you can redistribute it and/or
8 modify it under the terms of the GNU Lesser General Public
9 License as published by the Free Software Foundation; either
10 version 3 of the License, or (at your option) any later version.
12 This library is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with this library; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
33 queue a packet for sending from client to daemon
35 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
37 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
42 handle a connect wait reply packet
44 static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
45 struct ctdb_req_header *hdr)
47 struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
48 ctdb->num_connected = r->num_connected;
52 state of a in-progress ctdb call in client
54 struct ctdb_client_call_state {
55 enum call_state state;
57 struct ctdb_db_context *ctdb_db;
58 struct ctdb_call call;
62 called when a CTDB_REPLY_CALL packet comes in in the client
64 This packet comes in response to a CTDB_REQ_CALL request packet. It
65 contains any reply data from the call
67 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
69 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
70 struct ctdb_client_call_state *state;
72 state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
74 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
78 state->call.reply_data.dptr = c->data;
79 state->call.reply_data.dsize = c->datalen;
80 state->call.status = c->status;
82 talloc_steal(state, c);
84 state->state = CTDB_CALL_DONE;
87 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
90 this is called in the client, when data comes in from the daemon
92 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
94 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
95 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
98 /* place the packet as a child of a tmp_ctx. We then use
99 talloc_free() below to free it. If any of the calls want
100 to keep it, then they will steal it somewhere else, and the
101 talloc_free() will be a no-op */
102 tmp_ctx = talloc_new(ctdb);
103 talloc_steal(tmp_ctx, hdr);
106 DEBUG(2,("Daemon has exited - shutting down client\n"));
110 if (cnt < sizeof(*hdr)) {
111 DEBUG(0,("Bad packet length %d in client\n", cnt));
114 if (cnt != hdr->length) {
115 ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n",
120 if (hdr->ctdb_magic != CTDB_MAGIC) {
121 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
125 if (hdr->ctdb_version != CTDB_VERSION) {
126 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
130 switch (hdr->operation) {
131 case CTDB_REPLY_CALL:
132 ctdb_client_reply_call(ctdb, hdr);
135 case CTDB_REQ_MESSAGE:
136 ctdb_request_message(ctdb, hdr);
139 case CTDB_REPLY_CONNECT_WAIT:
140 ctdb_reply_connect_wait(ctdb, hdr);
143 case CTDB_REPLY_STATUS:
144 ctdb_reply_status(ctdb, hdr);
148 DEBUG(0,("bogus operation code:%d\n",hdr->operation));
152 talloc_free(tmp_ctx);
156 connect to a unix domain socket
158 int ctdb_socket_connect(struct ctdb_context *ctdb)
160 struct sockaddr_un addr;
162 memset(&addr, 0, sizeof(addr));
163 addr.sun_family = AF_UNIX;
164 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
166 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
167 if (ctdb->daemon.sd == -1) {
171 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
172 close(ctdb->daemon.sd);
173 ctdb->daemon.sd = -1;
177 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
179 ctdb_client_read_cb, ctdb);
184 struct ctdb_record_handle {
185 struct ctdb_db_context *ctdb_db;
188 struct ctdb_ltdb_header header;
193 make a recv call to the local ctdb daemon - called from client context
195 This is called when the program wants to wait for a ctdb_call to complete and get the
196 results. This call will block unless the call has already completed.
198 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
200 while (state->state < CTDB_CALL_DONE) {
201 event_loop_once(state->ctdb_db->ctdb->ev);
203 if (state->state != CTDB_CALL_DONE) {
204 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
209 if (state->call.reply_data.dsize) {
210 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
211 state->call.reply_data.dptr,
212 state->call.reply_data.dsize);
213 call->reply_data.dsize = state->call.reply_data.dsize;
215 call->reply_data.dptr = NULL;
216 call->reply_data.dsize = 0;
218 call->status = state->call.status;
228 destroy a ctdb_call in client
230 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
232 idr_remove(state->ctdb_db->ctdb->idr, state->reqid);
237 construct an event driven local ctdb_call
239 this is used so that locally processed ctdb_call requests are processed
240 in an event driven manner
242 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
243 struct ctdb_call *call,
244 struct ctdb_ltdb_header *header,
247 struct ctdb_client_call_state *state;
248 struct ctdb_context *ctdb = ctdb_db->ctdb;
251 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
252 CTDB_NO_MEMORY_NULL(ctdb, state);
254 talloc_steal(state, data->dptr);
256 state->state = CTDB_CALL_DONE;
258 state->ctdb_db = ctdb_db;
260 ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
261 talloc_steal(state, state->call.reply_data.dptr);
267 make a ctdb call to the local daemon - async send. Called from client context.
269 This constructs a ctdb_call request and queues it for processing.
270 This call never blocks.
272 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
273 struct ctdb_call *call)
275 struct ctdb_client_call_state *state;
276 struct ctdb_context *ctdb = ctdb_db->ctdb;
277 struct ctdb_ltdb_header header;
281 struct ctdb_req_call *c;
283 /* if the domain socket is not yet open, open it */
284 if (ctdb->daemon.sd==-1) {
285 ctdb_socket_connect(ctdb);
288 ret = ctdb_ltdb_lock(ctdb_db, call->key);
290 DEBUG(0,(__location__ " Failed to get chainlock\n"));
294 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
296 ctdb_ltdb_unlock(ctdb_db, call->key);
297 DEBUG(0,(__location__ " Failed to fetch record\n"));
301 if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
302 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
303 talloc_free(data.dptr);
304 ctdb_ltdb_unlock(ctdb_db, call->key);
308 ctdb_ltdb_unlock(ctdb_db, call->key);
309 talloc_free(data.dptr);
311 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
313 DEBUG(0, (__location__ " failed to allocate state\n"));
317 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
318 c = ctdbd_allocate_pkt(state, len);
320 DEBUG(0, (__location__ " failed to allocate packet\n"));
323 talloc_set_name_const(c, "ctdb client req_call packet");
324 memset(c, 0, offsetof(struct ctdb_req_call, data));
327 c->hdr.ctdb_magic = CTDB_MAGIC;
328 c->hdr.ctdb_version = CTDB_VERSION;
329 c->hdr.operation = CTDB_REQ_CALL;
330 /* this limits us to 16k outstanding messages - not unreasonable */
331 c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
332 c->flags = call->flags;
333 c->db_id = ctdb_db->db_id;
334 c->callid = call->call_id;
335 c->keylen = call->key.dsize;
336 c->calldatalen = call->call_data.dsize;
337 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
338 memcpy(&c->data[call->key.dsize],
339 call->call_data.dptr, call->call_data.dsize);
341 state->call.call_data.dptr = &c->data[call->key.dsize];
342 state->call.key.dptr = &c->data[0];
344 state->state = CTDB_CALL_WAIT;
345 state->ctdb_db = ctdb_db;
346 state->reqid = c->hdr.reqid;
348 talloc_set_destructor(state, ctdb_client_call_destructor);
350 ctdb_client_queue_pkt(ctdb, &c->hdr);
357 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
359 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
361 struct ctdb_client_call_state *state;
363 state = ctdb_call_send(ctdb_db, call);
364 return ctdb_call_recv(state, call);
369 tell the daemon what messaging srvid we will use, and register the message
370 handler function in the client
372 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
373 ctdb_message_fn_t handler,
377 struct ctdb_req_register c;
380 /* if the domain socket is not yet open, open it */
381 if (ctdb->daemon.sd==-1) {
382 ctdb_socket_connect(ctdb);
387 c.hdr.length = sizeof(c);
388 c.hdr.ctdb_magic = CTDB_MAGIC;
389 c.hdr.ctdb_version = CTDB_VERSION;
390 c.hdr.operation = CTDB_REQ_REGISTER;
393 res = ctdb_client_queue_pkt(ctdb, &c.hdr);
398 /* also need to register the handler with our ctdb structure */
399 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
404 send a message - from client context
406 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
407 uint32_t srvid, TDB_DATA data)
409 struct ctdb_req_message *r;
412 len = offsetof(struct ctdb_req_message, data) + data.dsize;
413 r = ctdb->methods->allocate_pkt(ctdb, len);
414 CTDB_NO_MEMORY(ctdb, r);
415 talloc_set_name_const(r, "req_message packet");
418 r->hdr.ctdb_magic = CTDB_MAGIC;
419 r->hdr.ctdb_version = CTDB_VERSION;
420 r->hdr.operation = CTDB_REQ_MESSAGE;
421 r->hdr.destnode = vnn;
422 r->hdr.srcnode = ctdb->vnn;
425 r->datalen = data.dsize;
426 memcpy(&r->data[0], data.dptr, data.dsize);
428 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
438 wait for all nodes to be connected - from client
440 void ctdb_connect_wait(struct ctdb_context *ctdb)
442 struct ctdb_req_connect_wait r;
447 r.hdr.length = sizeof(r);
448 r.hdr.ctdb_magic = CTDB_MAGIC;
449 r.hdr.ctdb_version = CTDB_VERSION;
450 r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
452 DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
454 /* if the domain socket is not yet open, open it */
455 if (ctdb->daemon.sd==-1) {
456 ctdb_socket_connect(ctdb);
459 res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
461 DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
465 DEBUG(3,("ctdb_connect_wait: waiting\n"));
467 /* now we can go into the normal wait routine, as the reply packet
468 will update the ctdb->num_connected variable */
469 ctdb_daemon_connect_wait(ctdb);
473 cancel a ctdb_fetch_lock operation, releasing the lock
475 static int fetch_lock_destructor(struct ctdb_record_handle *h)
477 ctdb_ltdb_unlock(h->ctdb_db, h->key);
482 force the migration of a record to this node
484 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
486 struct ctdb_call call;
488 call.call_id = CTDB_NULL_FUNC;
490 call.flags = CTDB_IMMEDIATE_MIGRATION;
491 return ctdb_call(ctdb_db, &call);
495 get a lock on a record, and return the records data. Blocks until it gets the lock
497 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
498 TDB_DATA key, TDB_DATA *data)
501 struct ctdb_record_handle *h;
504 procedure is as follows:
506 1) get the chain lock.
507 2) check if we are dmaster
508 3) if we are the dmaster then return handle
509 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
511 5) when we get the reply, goto (1)
514 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
519 h->ctdb_db = ctdb_db;
521 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
522 if (h->key.dptr == NULL) {
528 DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize,
529 (const char *)key.dptr));
532 /* step 1 - get the chain lock */
533 ret = ctdb_ltdb_lock(ctdb_db, key);
535 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
540 DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
542 talloc_set_destructor(h, fetch_lock_destructor);
544 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
546 ctdb_ltdb_unlock(ctdb_db, key);
551 /* when torturing, ensure we test the remote path */
552 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
554 h->header.dmaster = (uint32_t)-1;
558 DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
560 if (h->header.dmaster != ctdb_db->ctdb->vnn) {
561 ctdb_ltdb_unlock(ctdb_db, key);
562 ret = ctdb_client_force_migration(ctdb_db, key);
564 DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
571 DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
576 store some data to the record that was locked with ctdb_fetch_lock()
578 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
580 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
584 wait until we're the only node left.
585 this function never returns
587 void ctdb_shutdown(struct ctdb_context *ctdb)
589 struct ctdb_req_shutdown r;
592 /* if the domain socket is not yet open, open it */
593 if (ctdb->daemon.sd==-1) {
594 ctdb_socket_connect(ctdb);
597 len = sizeof(struct ctdb_req_shutdown);
600 r.hdr.ctdb_magic = CTDB_MAGIC;
601 r.hdr.ctdb_version = CTDB_VERSION;
602 r.hdr.operation = CTDB_REQ_SHUTDOWN;
605 ctdb_client_queue_pkt(ctdb, &(r.hdr));
607 /* this event loop will terminate once we receive the reply */
609 event_loop_once(ctdb->ev);
613 enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE};
615 struct ctdb_status_state {
617 struct ctdb_status *status;
618 enum ctdb_status_states state;
622 handle a ctdb_reply_status reply
624 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
626 struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr;
627 struct ctdb_status_state *state;
629 state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state);
631 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
635 *state->status = r->status;
636 state->state = CTDB_STATUS_DONE;
640 wait until we're the only node left.
641 this function never returns
643 int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status)
645 struct ctdb_req_status r;
647 struct ctdb_status_state *state;
649 /* if the domain socket is not yet open, open it */
650 if (ctdb->daemon.sd==-1) {
651 ctdb_socket_connect(ctdb);
654 state = talloc(ctdb, struct ctdb_status_state);
655 CTDB_NO_MEMORY(ctdb, state);
657 state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
658 state->status = status;
659 state->state = CTDB_STATUS_WAIT;
662 r.hdr.length = sizeof(r);
663 r.hdr.ctdb_magic = CTDB_MAGIC;
664 r.hdr.ctdb_version = CTDB_VERSION;
665 r.hdr.operation = CTDB_REQ_STATUS;
666 r.hdr.reqid = state->reqid;
668 ret = ctdb_client_queue_pkt(ctdb, &(r.hdr));
674 while (state->state == CTDB_STATUS_WAIT) {
675 event_loop_once(ctdb->ev);