4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
33 structure describing a connected client in the daemon
36 struct ctdb_context *ctdb;
38 struct ctdb_queue *queue;
43 static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
45 static void ctdb_main_loop(struct ctdb_context *ctdb)
47 /* we are the dispatcher process now, so start the protocol going */
48 if (ctdb_init_transport(ctdb)) {
52 ctdb->methods->start(ctdb);
54 /* go into a wait loop to allow other nodes to complete */
55 event_loop_wait(ctdb->ev);
57 DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
62 static void set_non_blocking(int fd)
65 v = fcntl(fd, F_GETFL, 0);
66 fcntl(fd, F_SETFL, v | O_NONBLOCK);
69 static void block_signal(int signum)
73 memset(&act, 0, sizeof(act));
75 act.sa_handler = SIG_IGN;
76 sigemptyset(&act.sa_mask);
77 sigaddset(&act.sa_mask, signum);
78 sigaction(signum, &act, NULL);
83 send a packet to a client
85 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
87 client->ctdb->status.client_packets_sent++;
88 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
92 message handler for when we are in daemon mode. This redirects the message
95 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
96 TDB_DATA data, void *private_data)
98 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
99 struct ctdb_req_message *r;
102 /* construct a message to send to the client containing the data */
103 len = offsetof(struct ctdb_req_message, data) + data.dsize;
104 r = ctdbd_allocate_pkt(ctdb, len);
106 talloc_set_name_const(r, "req_message packet");
108 memset(r, 0, offsetof(struct ctdb_req_message, data));
111 r->hdr.ctdb_magic = CTDB_MAGIC;
112 r->hdr.ctdb_version = CTDB_VERSION;
113 r->hdr.operation = CTDB_REQ_MESSAGE;
115 r->datalen = data.dsize;
116 memcpy(&r->data[0], data.dptr, data.dsize);
118 daemon_queue_send(client, &r->hdr);
125 this is called when the ctdb daemon received a ctdb request to
126 set the srvid from the client
128 static void daemon_request_register_message_handler(struct ctdb_client *client,
129 struct ctdb_req_register *c)
132 res = ctdb_register_message_handler(client->ctdb, client,
133 c->srvid, daemon_message_handler,
136 DEBUG(0,(__location__ " Failed to register handler %u in daemon\n",
139 DEBUG(2,(__location__ " Registered message handler for srvid=%u\n",
146 called when the daemon gets a shutdown request from a client
148 static void daemon_request_shutdown(struct ctdb_client *client,
149 struct ctdb_req_shutdown *f)
151 struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
155 /* we dont send to ourself so we can already count one daemon as
157 ctdb->num_finished++;
160 /* loop over all nodes of the cluster */
161 for (node=0; node<ctdb->num_nodes;node++) {
162 struct ctdb_req_finished *rf;
164 /* dont send a message to ourself */
165 if (ctdb->vnn == node) {
169 len = sizeof(struct ctdb_req_finished);
170 rf = ctdb->methods->allocate_pkt(ctdb, len);
171 CTDB_NO_MEMORY_FATAL(ctdb, rf);
172 talloc_set_name_const(rf, "ctdb_req_finished packet");
175 rf->hdr.length = len;
176 rf->hdr.ctdb_magic = CTDB_MAGIC;
177 rf->hdr.ctdb_version = CTDB_VERSION;
178 rf->hdr.operation = CTDB_REQ_FINISHED;
179 rf->hdr.destnode = node;
180 rf->hdr.srcnode = ctdb->vnn;
183 ctdb_queue_packet(ctdb, &(rf->hdr));
188 /* wait until all nodes have are prepared to shutdown */
189 while (ctdb->num_finished != ctdb->num_nodes) {
190 event_loop_once(ctdb->ev);
193 /* all daemons have requested to finish - we now exit */
194 DEBUG(1,("All daemons finished - exiting\n"));
201 called when the daemon gets a connect wait request from a client
203 static void daemon_request_connect_wait(struct ctdb_client *client,
204 struct ctdb_req_connect_wait *c)
206 struct ctdb_reply_connect_wait r;
209 /* first wait - in the daemon */
210 ctdb_daemon_connect_wait(client->ctdb);
212 /* now send the reply */
215 r.hdr.length = sizeof(r);
216 r.hdr.ctdb_magic = CTDB_MAGIC;
217 r.hdr.ctdb_version = CTDB_VERSION;
218 r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
219 r.vnn = ctdb_get_vnn(client->ctdb);
220 r.num_connected = client->ctdb->num_connected;
222 res = daemon_queue_send(client, &r.hdr);
224 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
231 called when the daemon gets a getdbpath request from a client
233 static void daemon_request_getdbpath(struct ctdb_client *client,
234 struct ctdb_req_getdbpath *c)
236 struct ctdb_reply_getdbpath *r;
237 struct ctdb_db_context *ctdb_db;
241 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
243 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
245 ctdb_db->ctdb->status.pending_calls--;
249 path = talloc_asprintf(c, "%s/%s", ctdb_db->ctdb->db_directory, ctdb_db->db_name);
251 /* now send the reply */
252 len = offsetof(struct ctdb_reply_getdbpath, data) + strlen(path);
253 r = ctdbd_allocate_pkt(ctdb_db->ctdb, len);
255 talloc_set_name_const(r, "reply_getdbpath packet");
257 memset(r, 0, offsetof(struct ctdb_reply_getdbpath, data));
260 r->hdr.ctdb_magic = CTDB_MAGIC;
261 r->hdr.ctdb_version = CTDB_VERSION;
262 r->hdr.operation = CTDB_REPLY_GETDBPATH;
263 r->hdr.reqid = c->hdr.reqid;
264 r->datalen = strlen(path);
265 memcpy(&r->data[0], path, r->datalen);
267 res = daemon_queue_send(client, &(r->hdr));
269 DEBUG(0,(__location__ " Failed to queue a getdbpath response\n"));
276 destroy a ctdb_client
278 static int ctdb_client_destructor(struct ctdb_client *client)
287 this is called when the ctdb daemon received a ctdb request message
288 from a local client over the unix domain socket
290 static void daemon_request_message_from_client(struct ctdb_client *client,
291 struct ctdb_req_message *c)
296 /* maybe the message is for another client on this node */
297 if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
298 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
302 /* its for a remote node */
303 data.dptr = &c->data[0];
304 data.dsize = c->datalen;
305 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
308 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
314 struct daemon_call_state {
315 struct ctdb_client *client;
317 struct ctdb_call *call;
318 struct timeval start_time;
322 complete a call from a client
324 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
326 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
327 struct daemon_call_state);
328 struct ctdb_reply_call *r;
331 struct ctdb_client *client = dstate->client;
333 talloc_steal(client, dstate);
334 talloc_steal(dstate, dstate->call);
336 res = ctdb_daemon_call_recv(state, dstate->call);
338 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
339 client->ctdb->status.pending_calls--;
340 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
344 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
345 r = ctdbd_allocate_pkt(dstate, length);
347 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
348 client->ctdb->status.pending_calls--;
349 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
352 memset(r, 0, offsetof(struct ctdb_reply_call, data));
353 r->hdr.length = length;
354 r->hdr.ctdb_magic = CTDB_MAGIC;
355 r->hdr.ctdb_version = CTDB_VERSION;
356 r->hdr.operation = CTDB_REPLY_CALL;
357 r->hdr.reqid = dstate->reqid;
358 r->datalen = dstate->call->reply_data.dsize;
359 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
361 res = daemon_queue_send(client, &r->hdr);
363 DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
365 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
367 client->ctdb->status.pending_calls--;
372 this is called when the ctdb daemon received a ctdb request call
373 from a local client over the unix domain socket
375 static void daemon_request_call_from_client(struct ctdb_client *client,
376 struct ctdb_req_call *c)
378 struct ctdb_call_state *state;
379 struct ctdb_db_context *ctdb_db;
380 struct daemon_call_state *dstate;
381 struct ctdb_call *call;
382 struct ctdb_ltdb_header header;
385 struct ctdb_context *ctdb = client->ctdb;
387 ctdb->status.total_calls++;
388 ctdb->status.pending_calls++;
390 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
392 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
394 ctdb->status.pending_calls--;
399 key.dsize = c->keylen;
401 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
402 (struct ctdb_req_header *)c, &data,
403 daemon_incoming_packet, client);
405 /* will retry later */
406 ctdb->status.pending_calls--;
411 DEBUG(0,(__location__ " Unable to fetch record\n"));
412 ctdb->status.pending_calls--;
416 dstate = talloc(client, struct daemon_call_state);
417 if (dstate == NULL) {
418 ctdb_ltdb_unlock(ctdb_db, key);
419 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
420 ctdb->status.pending_calls--;
423 dstate->start_time = timeval_current();
424 dstate->client = client;
425 dstate->reqid = c->hdr.reqid;
426 talloc_steal(dstate, data.dptr);
428 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
430 ctdb_ltdb_unlock(ctdb_db, key);
431 DEBUG(0,(__location__ " Unable to allocate call\n"));
432 ctdb->status.pending_calls--;
433 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
437 call->call_id = c->callid;
439 call->call_data.dptr = c->data + c->keylen;
440 call->call_data.dsize = c->calldatalen;
441 call->flags = c->flags;
443 if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
444 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
446 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
449 ctdb_ltdb_unlock(ctdb_db, key);
452 DEBUG(0,(__location__ " Unable to setup call send\n"));
453 ctdb->status.pending_calls--;
454 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
457 talloc_steal(state, dstate);
458 talloc_steal(client, state);
460 state->async.fn = daemon_call_from_client_callback;
461 state->async.private_data = dstate;
465 static void daemon_request_control_from_client(struct ctdb_client *client,
466 struct ctdb_req_control *c);
468 /* data contains a packet from the client */
469 static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
471 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
472 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
474 struct ctdb_context *ctdb = client->ctdb;
476 /* place the packet as a child of a tmp_ctx. We then use
477 talloc_free() below to free it. If any of the calls want
478 to keep it, then they will steal it somewhere else, and the
479 talloc_free() will be a no-op */
480 tmp_ctx = talloc_new(client);
481 talloc_steal(tmp_ctx, hdr);
483 if (hdr->ctdb_magic != CTDB_MAGIC) {
484 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
488 if (hdr->ctdb_version != CTDB_VERSION) {
489 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
493 switch (hdr->operation) {
495 ctdb->status.client.req_call++;
496 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
499 case CTDB_REQ_REGISTER:
500 ctdb->status.client.req_register++;
501 daemon_request_register_message_handler(client,
502 (struct ctdb_req_register *)hdr);
505 case CTDB_REQ_MESSAGE:
506 ctdb->status.client.req_message++;
507 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
510 case CTDB_REQ_CONNECT_WAIT:
511 ctdb->status.client.req_connect_wait++;
512 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
515 case CTDB_REQ_SHUTDOWN:
516 ctdb->status.client.req_shutdown++;
517 daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
520 case CTDB_REQ_GETDBPATH:
521 daemon_request_getdbpath(client, (struct ctdb_req_getdbpath *)hdr);
524 case CTDB_REQ_CONTROL:
525 ctdb->status.client.req_control++;
526 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
530 DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
535 talloc_free(tmp_ctx);
539 called when the daemon gets a incoming packet
541 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
543 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
544 struct ctdb_req_header *hdr;
551 client->ctdb->status.client_packets_recv++;
553 if (cnt < sizeof(*hdr)) {
554 ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
557 hdr = (struct ctdb_req_header *)data;
558 if (cnt != hdr->length) {
559 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon",
564 if (hdr->ctdb_magic != CTDB_MAGIC) {
565 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
569 if (hdr->ctdb_version != CTDB_VERSION) {
570 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
574 DEBUG(3,(__location__ " client request %d of type %d length %d from "
575 "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
576 hdr->srcnode, hdr->destnode));
578 /* it is the responsibility of the incoming packet function to free 'data' */
579 daemon_incoming_packet(client, data, cnt);
582 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
583 uint16_t flags, void *private_data)
585 struct sockaddr_in addr;
588 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
589 struct ctdb_client *client;
591 memset(&addr, 0, sizeof(addr));
593 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
597 set_non_blocking(fd);
599 client = talloc_zero(ctdb, struct ctdb_client);
603 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
604 ctdb_daemon_read_cb, client);
606 talloc_set_destructor(client, ctdb_client_destructor);
611 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde,
612 uint16_t flags, void *private_data)
614 int *fd = private_data;
618 /* XXX this is a good place to try doing some cleaning up before exiting */
619 cnt = read(*fd, &buf, 1);
621 DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
624 DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
632 create a unix domain socket and bind it
633 return a file descriptor open on the socket
635 static int ux_socket_bind(struct ctdb_context *ctdb)
637 struct sockaddr_un addr;
639 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
640 if (ctdb->daemon.sd == -1) {
641 ctdb->daemon.sd = -1;
645 set_non_blocking(ctdb->daemon.sd);
647 memset(&addr, 0, sizeof(addr));
648 addr.sun_family = AF_UNIX;
649 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
651 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
652 close(ctdb->daemon.sd);
653 ctdb->daemon.sd = -1;
656 listen(ctdb->daemon.sd, 1);
662 delete the socket on exit - called on destruction of autofree context
664 static int unlink_destructor(const char *name)
671 start the protocol going
673 int ctdb_start(struct ctdb_context *ctdb)
678 struct fd_event *fde;
679 const char *domain_socket_name;
681 /* get rid of any old sockets */
682 unlink(ctdb->daemon.name);
684 /* create a unix domain stream socket to listen to */
685 res = ux_socket_bind(ctdb);
687 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
693 DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
698 DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
704 close(ctdb->daemon.sd);
705 ctdb->daemon.sd = -1;
707 /* Added because of ctdb->methods->allocate_pkt calls */
709 int ctdb_tcp_init(struct ctdb_context *ctdb);
715 block_signal(SIGPIPE);
717 /* ensure the socket is deleted on exit of the daemon */
718 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
719 talloc_set_destructor(domain_socket_name, unlink_destructor);
723 ctdb->ev = event_context_init(NULL);
724 fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
725 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
726 ctdb_main_loop(ctdb);
732 allocate a packet for use in client<->daemon communication
734 void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
738 size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
739 return talloc_size(mem_ctx, size);
743 called when a CTDB_REQ_FINISHED packet comes in
745 void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
747 ctdb->num_finished++;
751 struct daemon_control_state {
752 struct ctdb_client *client;
753 struct ctdb_req_control *c;
757 callback when a control reply comes in
759 static void daemon_control_callback(struct ctdb_context *ctdb,
760 uint32_t status, TDB_DATA data,
763 struct daemon_control_state *state = talloc_get_type(private_data,
764 struct daemon_control_state);
765 struct ctdb_client *client = state->client;
766 struct ctdb_reply_control *r;
769 DEBUG(0,("callback: size=%u\n", data.dsize));
770 DEBUG(0,("callback: size=%u\n", data.dsize));
771 DEBUG(0,("callback: size=%u\n", data.dsize));
772 DEBUG(0,("callback: size=%u\n", data.dsize));
773 DEBUG(0,("callback: size=%u\n", data.dsize));
774 DEBUG(0,("callback: size=%u\n", data.dsize));
776 /* construct a message to send to the client containing the data */
777 len = offsetof(struct ctdb_req_control, data) + data.dsize;
778 r = ctdbd_allocate_pkt(client, len);
779 talloc_set_name_const(r, "reply_control packet");
781 memset(r, 0, offsetof(struct ctdb_req_message, data));
784 r->hdr.ctdb_magic = CTDB_MAGIC;
785 r->hdr.ctdb_version = CTDB_VERSION;
786 r->hdr.operation = CTDB_REPLY_CONTROL;
788 r->datalen = data.dsize;
789 memcpy(&r->data[0], data.dptr, data.dsize);
791 daemon_queue_send(client, &r->hdr);
797 this is called when the ctdb daemon received a ctdb request control
798 from a local client over the unix domain socket
800 static void daemon_request_control_from_client(struct ctdb_client *client,
801 struct ctdb_req_control *c)
805 struct daemon_control_state *state;
807 state = talloc(client, struct daemon_control_state);
808 CTDB_NO_MEMORY_VOID(client->ctdb, state);
810 state->client = client;
811 state->c = talloc_steal(state, c);
813 data.dptr = &c->data[0];
814 data.dsize = c->datalen;
815 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
816 c->srvid, c->opcode, data, daemon_control_callback,
819 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",