4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This library is free software; you can redistribute it and/or
8 modify it under the terms of the GNU Lesser General Public
9 License as published by the Free Software Foundation; either
10 version 3 of the License, or (at your option) any later version.
12 This library is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
32 queue a packet for sending from client to daemon
34 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
36 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
41 handle a connect wait reply packet
43 static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
44 struct ctdb_req_header *hdr)
46 struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
47 ctdb->num_connected = r->num_connected;
51 state of a in-progress ctdb call in client
53 struct ctdb_client_call_state {
54 enum call_state state;
56 struct ctdb_db_context *ctdb_db;
57 struct ctdb_call call;
61 called when a CTDB_REPLY_CALL packet comes in in the client
63 This packet comes in response to a CTDB_REQ_CALL request packet. It
64 contains any reply data from the call
66 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
68 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
69 struct ctdb_client_call_state *state;
71 state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
73 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
77 state->call.reply_data.dptr = c->data;
78 state->call.reply_data.dsize = c->datalen;
79 state->call.status = c->status;
81 talloc_steal(state, c);
83 state->state = CTDB_CALL_DONE;
86 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
89 this is called in the client, when data comes in from the daemon
91 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
93 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
94 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
97 /* place the packet as a child of a tmp_ctx. We then use
98 talloc_free() below to free it. If any of the calls want
99 to keep it, then they will steal it somewhere else, and the
100 talloc_free() will be a no-op */
101 tmp_ctx = talloc_new(ctdb);
102 talloc_steal(tmp_ctx, hdr);
105 DEBUG(2,("Daemon has exited - shutting down client\n"));
109 if (cnt < sizeof(*hdr)) {
110 DEBUG(0,("Bad packet length %d in client\n", cnt));
113 if (cnt != hdr->length) {
114 ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n",
119 if (hdr->ctdb_magic != CTDB_MAGIC) {
120 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
124 if (hdr->ctdb_version != CTDB_VERSION) {
125 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
129 switch (hdr->operation) {
130 case CTDB_REPLY_CALL:
131 ctdb_client_reply_call(ctdb, hdr);
134 case CTDB_REQ_MESSAGE:
135 ctdb_request_message(ctdb, hdr);
138 case CTDB_REPLY_CONNECT_WAIT:
139 ctdb_reply_connect_wait(ctdb, hdr);
142 case CTDB_REPLY_STATUS:
143 ctdb_reply_status(ctdb, hdr);
147 DEBUG(0,("bogus operation code:%d\n",hdr->operation));
151 talloc_free(tmp_ctx);
155 connect to a unix domain socket
157 int ctdb_socket_connect(struct ctdb_context *ctdb)
159 struct sockaddr_un addr;
161 memset(&addr, 0, sizeof(addr));
162 addr.sun_family = AF_UNIX;
163 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
165 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
166 if (ctdb->daemon.sd == -1) {
170 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
171 close(ctdb->daemon.sd);
172 ctdb->daemon.sd = -1;
176 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
178 ctdb_client_read_cb, ctdb);
183 struct ctdb_record_handle {
184 struct ctdb_db_context *ctdb_db;
187 struct ctdb_ltdb_header header;
192 make a recv call to the local ctdb daemon - called from client context
194 This is called when the program wants to wait for a ctdb_call to complete and get the
195 results. This call will block unless the call has already completed.
197 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
199 while (state->state < CTDB_CALL_DONE) {
200 event_loop_once(state->ctdb_db->ctdb->ev);
202 if (state->state != CTDB_CALL_DONE) {
203 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
208 if (state->call.reply_data.dsize) {
209 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
210 state->call.reply_data.dptr,
211 state->call.reply_data.dsize);
212 call->reply_data.dsize = state->call.reply_data.dsize;
214 call->reply_data.dptr = NULL;
215 call->reply_data.dsize = 0;
217 call->status = state->call.status;
227 destroy a ctdb_call in client
229 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
231 idr_remove(state->ctdb_db->ctdb->idr, state->reqid);
236 construct an event driven local ctdb_call
238 this is used so that locally processed ctdb_call requests are processed
239 in an event driven manner
241 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
242 struct ctdb_call *call,
243 struct ctdb_ltdb_header *header,
246 struct ctdb_client_call_state *state;
247 struct ctdb_context *ctdb = ctdb_db->ctdb;
250 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
251 CTDB_NO_MEMORY_NULL(ctdb, state);
253 talloc_steal(state, data->dptr);
255 state->state = CTDB_CALL_DONE;
257 state->ctdb_db = ctdb_db;
259 ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
260 talloc_steal(state, state->call.reply_data.dptr);
266 make a ctdb call to the local daemon - async send. Called from client context.
268 This constructs a ctdb_call request and queues it for processing.
269 This call never blocks.
271 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
272 struct ctdb_call *call)
274 struct ctdb_client_call_state *state;
275 struct ctdb_context *ctdb = ctdb_db->ctdb;
276 struct ctdb_ltdb_header header;
280 struct ctdb_req_call *c;
282 /* if the domain socket is not yet open, open it */
283 if (ctdb->daemon.sd==-1) {
284 ctdb_socket_connect(ctdb);
287 ret = ctdb_ltdb_lock(ctdb_db, call->key);
289 DEBUG(0,(__location__ " Failed to get chainlock\n"));
293 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
295 ctdb_ltdb_unlock(ctdb_db, call->key);
296 DEBUG(0,(__location__ " Failed to fetch record\n"));
300 if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
301 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
302 talloc_free(data.dptr);
303 ctdb_ltdb_unlock(ctdb_db, call->key);
307 ctdb_ltdb_unlock(ctdb_db, call->key);
308 talloc_free(data.dptr);
310 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
312 DEBUG(0, (__location__ " failed to allocate state\n"));
316 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
317 c = ctdbd_allocate_pkt(state, len);
319 DEBUG(0, (__location__ " failed to allocate packet\n"));
322 talloc_set_name_const(c, "ctdb client req_call packet");
323 memset(c, 0, offsetof(struct ctdb_req_call, data));
326 c->hdr.ctdb_magic = CTDB_MAGIC;
327 c->hdr.ctdb_version = CTDB_VERSION;
328 c->hdr.operation = CTDB_REQ_CALL;
329 /* this limits us to 16k outstanding messages - not unreasonable */
330 c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
331 c->flags = call->flags;
332 c->db_id = ctdb_db->db_id;
333 c->callid = call->call_id;
334 c->keylen = call->key.dsize;
335 c->calldatalen = call->call_data.dsize;
336 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
337 memcpy(&c->data[call->key.dsize],
338 call->call_data.dptr, call->call_data.dsize);
340 state->call.call_data.dptr = &c->data[call->key.dsize];
341 state->call.key.dptr = &c->data[0];
343 state->state = CTDB_CALL_WAIT;
344 state->ctdb_db = ctdb_db;
345 state->reqid = c->hdr.reqid;
347 talloc_set_destructor(state, ctdb_client_call_destructor);
349 ctdb_client_queue_pkt(ctdb, &c->hdr);
356 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
358 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
360 struct ctdb_client_call_state *state;
362 state = ctdb_call_send(ctdb_db, call);
363 return ctdb_call_recv(state, call);
368 tell the daemon what messaging srvid we will use, and register the message
369 handler function in the client
371 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
372 ctdb_message_fn_t handler,
376 struct ctdb_req_register c;
379 /* if the domain socket is not yet open, open it */
380 if (ctdb->daemon.sd==-1) {
381 ctdb_socket_connect(ctdb);
386 c.hdr.length = sizeof(c);
387 c.hdr.ctdb_magic = CTDB_MAGIC;
388 c.hdr.ctdb_version = CTDB_VERSION;
389 c.hdr.operation = CTDB_REQ_REGISTER;
392 res = ctdb_client_queue_pkt(ctdb, &c.hdr);
397 /* also need to register the handler with our ctdb structure */
398 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
403 send a message - from client context
405 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
406 uint32_t srvid, TDB_DATA data)
408 struct ctdb_req_message *r;
411 len = offsetof(struct ctdb_req_message, data) + data.dsize;
412 r = ctdb->methods->allocate_pkt(ctdb, len);
413 CTDB_NO_MEMORY(ctdb, r);
414 talloc_set_name_const(r, "req_message packet");
417 r->hdr.ctdb_magic = CTDB_MAGIC;
418 r->hdr.ctdb_version = CTDB_VERSION;
419 r->hdr.operation = CTDB_REQ_MESSAGE;
420 r->hdr.destnode = vnn;
421 r->hdr.srcnode = ctdb->vnn;
424 r->datalen = data.dsize;
425 memcpy(&r->data[0], data.dptr, data.dsize);
427 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
437 wait for all nodes to be connected - from client
439 void ctdb_connect_wait(struct ctdb_context *ctdb)
441 struct ctdb_req_connect_wait r;
446 r.hdr.length = sizeof(r);
447 r.hdr.ctdb_magic = CTDB_MAGIC;
448 r.hdr.ctdb_version = CTDB_VERSION;
449 r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
451 DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
453 /* if the domain socket is not yet open, open it */
454 if (ctdb->daemon.sd==-1) {
455 ctdb_socket_connect(ctdb);
458 res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
460 DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
464 DEBUG(3,("ctdb_connect_wait: waiting\n"));
466 /* now we can go into the normal wait routine, as the reply packet
467 will update the ctdb->num_connected variable */
468 ctdb_daemon_connect_wait(ctdb);
472 cancel a ctdb_fetch_lock operation, releasing the lock
474 static int fetch_lock_destructor(struct ctdb_record_handle *h)
476 ctdb_ltdb_unlock(h->ctdb_db, h->key);
481 force the migration of a record to this node
483 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
485 struct ctdb_call call;
487 call.call_id = CTDB_NULL_FUNC;
489 call.flags = CTDB_IMMEDIATE_MIGRATION;
490 return ctdb_call(ctdb_db, &call);
494 get a lock on a record, and return the records data. Blocks until it gets the lock
496 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
497 TDB_DATA key, TDB_DATA *data)
500 struct ctdb_record_handle *h;
503 procedure is as follows:
505 1) get the chain lock.
506 2) check if we are dmaster
507 3) if we are the dmaster then return handle
508 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
510 5) when we get the reply, goto (1)
513 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
518 h->ctdb_db = ctdb_db;
520 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
521 if (h->key.dptr == NULL) {
527 DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize,
528 (const char *)key.dptr));
531 /* step 1 - get the chain lock */
532 ret = ctdb_ltdb_lock(ctdb_db, key);
534 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
539 DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
541 talloc_set_destructor(h, fetch_lock_destructor);
543 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
545 ctdb_ltdb_unlock(ctdb_db, key);
550 /* when torturing, ensure we test the remote path */
551 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
553 h->header.dmaster = (uint32_t)-1;
557 DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
559 if (h->header.dmaster != ctdb_db->ctdb->vnn) {
560 ctdb_ltdb_unlock(ctdb_db, key);
561 ret = ctdb_client_force_migration(ctdb_db, key);
563 DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
570 DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
575 store some data to the record that was locked with ctdb_fetch_lock()
577 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
579 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
583 wait until we're the only node left.
584 this function never returns
586 void ctdb_shutdown(struct ctdb_context *ctdb)
588 struct ctdb_req_shutdown r;
591 /* if the domain socket is not yet open, open it */
592 if (ctdb->daemon.sd==-1) {
593 ctdb_socket_connect(ctdb);
596 len = sizeof(struct ctdb_req_shutdown);
599 r.hdr.ctdb_magic = CTDB_MAGIC;
600 r.hdr.ctdb_version = CTDB_VERSION;
601 r.hdr.operation = CTDB_REQ_SHUTDOWN;
604 ctdb_client_queue_pkt(ctdb, &(r.hdr));
606 /* this event loop will terminate once we receive the reply */
608 event_loop_once(ctdb->ev);
612 enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE};
614 struct ctdb_status_state {
616 struct ctdb_status *status;
617 enum ctdb_status_states state;
621 handle a ctdb_reply_status reply
623 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
625 struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr;
626 struct ctdb_status_state *state;
628 state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state);
630 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
634 *state->status = r->status;
635 state->state = CTDB_STATUS_DONE;
639 wait until we're the only node left.
640 this function never returns
642 int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status)
644 struct ctdb_req_status r;
646 struct ctdb_status_state *state;
648 /* if the domain socket is not yet open, open it */
649 if (ctdb->daemon.sd==-1) {
650 ctdb_socket_connect(ctdb);
653 state = talloc(ctdb, struct ctdb_status_state);
654 CTDB_NO_MEMORY(ctdb, state);
656 state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
657 state->status = status;
658 state->state = CTDB_STATUS_WAIT;
661 r.hdr.length = sizeof(r);
662 r.hdr.ctdb_magic = CTDB_MAGIC;
663 r.hdr.ctdb_version = CTDB_VERSION;
664 r.hdr.operation = CTDB_REQ_STATUS;
665 r.hdr.reqid = state->reqid;
667 ret = ctdb_client_queue_pkt(ctdb, &(r.hdr));
673 while (state->state == CTDB_STATUS_WAIT) {
674 event_loop_once(ctdb->ev);