4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
108 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
112 message handler for when we are in daemon mode. This redirects the message
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
116 TDB_DATA data, void *private_data)
118 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119 struct ctdb_req_message *r;
122 /* construct a message to send to the client containing the data */
123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
124 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
125 len, struct ctdb_req_message);
126 CTDB_NO_MEMORY_VOID(ctdb, r);
128 talloc_set_name_const(r, "req_message packet");
131 r->datalen = data.dsize;
132 memcpy(&r->data[0], data.dptr, data.dsize);
134 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 if (client->ctdb->statistics.num_clients) {
188 client->ctdb->statistics.num_clients--;
191 if (client->num_persistent_updates != 0) {
192 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
195 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
197 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198 "commit active. Forcing recovery.\n"));
199 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200 ctdb_db->transaction_active = false;
208 this is called when the ctdb daemon received a ctdb request message
209 from a local client over the unix domain socket
211 static void daemon_request_message_from_client(struct ctdb_client *client,
212 struct ctdb_req_message *c)
217 /* maybe the message is for another client on this node */
218 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
223 /* its for a remote node */
224 data.dptr = &c->data[0];
225 data.dsize = c->datalen;
226 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
229 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
235 struct daemon_call_state {
236 struct ctdb_client *client;
238 struct ctdb_call *call;
239 struct timeval start_time;
243 complete a call from a client
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
247 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
248 struct daemon_call_state);
249 struct ctdb_reply_call *r;
252 struct ctdb_client *client = dstate->client;
253 struct ctdb_db_context *ctdb_db = state->ctdb_db;
255 talloc_steal(client, dstate);
256 talloc_steal(dstate, dstate->call);
258 res = ctdb_daemon_call_recv(state, dstate->call);
260 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261 if (client->ctdb->statistics.pending_calls > 0) {
262 client->ctdb->statistics.pending_calls--;
264 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
268 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
270 length, struct ctdb_reply_call);
272 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273 if (client->ctdb->statistics.pending_calls > 0) {
274 client->ctdb->statistics.pending_calls--;
276 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
279 r->hdr.reqid = dstate->reqid;
280 r->datalen = dstate->call->reply_data.dsize;
281 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
283 res = daemon_queue_send(client, &r->hdr);
285 /* client is dead - return immediately */
289 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
291 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
293 if (client->ctdb->statistics.pending_calls > 0) {
294 client->ctdb->statistics.pending_calls--;
298 struct ctdb_daemon_packet_wrap {
299 struct ctdb_context *ctdb;
304 a wrapper to catch disconnected clients
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
308 struct ctdb_client *client;
309 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
310 struct ctdb_daemon_packet_wrap);
312 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
316 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317 if (client == NULL) {
318 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
326 daemon_incoming_packet(client, hdr);
331 this is called when the ctdb daemon received a ctdb request call
332 from a local client over the unix domain socket
334 static void daemon_request_call_from_client(struct ctdb_client *client,
335 struct ctdb_req_call *c)
337 struct ctdb_call_state *state;
338 struct ctdb_db_context *ctdb_db;
339 struct daemon_call_state *dstate;
340 struct ctdb_call *call;
341 struct ctdb_ltdb_header header;
344 struct ctdb_context *ctdb = client->ctdb;
345 struct ctdb_daemon_packet_wrap *w;
347 ctdb->statistics.total_calls++;
348 if (client->ctdb->statistics.pending_calls > 0) {
349 ctdb->statistics.pending_calls++;
352 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
354 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
356 if (client->ctdb->statistics.pending_calls > 0) {
357 ctdb->statistics.pending_calls--;
362 if (ctdb_db->unhealthy_reason) {
364 * this is just a warning, as the tdb should be empty anyway,
365 * and only persistent databases can be unhealthy, which doesn't
366 * use this code patch
368 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369 ctdb_db->db_name, ctdb_db->unhealthy_reason));
373 key.dsize = c->keylen;
375 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376 CTDB_NO_MEMORY_VOID(ctdb, w);
379 w->client_id = client->client_id;
381 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
382 (struct ctdb_req_header *)c, &data,
383 daemon_incoming_packet_wrap, w, True);
385 /* will retry later */
386 if (client->ctdb->statistics.pending_calls > 0) {
387 ctdb->statistics.pending_calls--;
395 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396 if (client->ctdb->statistics.pending_calls > 0) {
397 ctdb->statistics.pending_calls--;
402 dstate = talloc(client, struct daemon_call_state);
403 if (dstate == NULL) {
404 ret = ctdb_ltdb_unlock(ctdb_db, key);
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
410 if (client->ctdb->statistics.pending_calls > 0) {
411 ctdb->statistics.pending_calls--;
415 dstate->start_time = timeval_current();
416 dstate->client = client;
417 dstate->reqid = c->hdr.reqid;
418 talloc_steal(dstate, data.dptr);
420 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
422 ret = ctdb_ltdb_unlock(ctdb_db, key);
424 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
427 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
428 if (client->ctdb->statistics.pending_calls > 0) {
429 ctdb->statistics.pending_calls--;
431 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
435 call->call_id = c->callid;
437 call->call_data.dptr = c->data + c->keylen;
438 call->call_data.dsize = c->calldatalen;
439 call->flags = c->flags;
441 if (header.dmaster == ctdb->pnn) {
442 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
444 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
447 ret = ctdb_ltdb_unlock(ctdb_db, key);
449 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
453 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
454 if (client->ctdb->statistics.pending_calls > 0) {
455 ctdb->statistics.pending_calls--;
457 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
460 talloc_steal(state, dstate);
461 talloc_steal(client, state);
463 state->async.fn = daemon_call_from_client_callback;
464 state->async.private_data = dstate;
468 static void daemon_request_control_from_client(struct ctdb_client *client,
469 struct ctdb_req_control *c);
471 /* data contains a packet from the client */
472 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
474 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
476 struct ctdb_context *ctdb = client->ctdb;
478 /* place the packet as a child of a tmp_ctx. We then use
479 talloc_free() below to free it. If any of the calls want
480 to keep it, then they will steal it somewhere else, and the
481 talloc_free() will be a no-op */
482 tmp_ctx = talloc_new(client);
483 talloc_steal(tmp_ctx, hdr);
485 if (hdr->ctdb_magic != CTDB_MAGIC) {
486 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
490 if (hdr->ctdb_version != CTDB_VERSION) {
491 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
495 switch (hdr->operation) {
497 ctdb->statistics.client.req_call++;
498 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
501 case CTDB_REQ_MESSAGE:
502 ctdb->statistics.client.req_message++;
503 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
506 case CTDB_REQ_CONTROL:
507 ctdb->statistics.client.req_control++;
508 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
512 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
517 talloc_free(tmp_ctx);
521 called when the daemon gets a incoming packet
523 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
525 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
526 struct ctdb_req_header *hdr;
533 client->ctdb->statistics.client_packets_recv++;
535 if (cnt < sizeof(*hdr)) {
536 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
540 hdr = (struct ctdb_req_header *)data;
541 if (cnt != hdr->length) {
542 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
543 (unsigned)hdr->length, (unsigned)cnt);
547 if (hdr->ctdb_magic != CTDB_MAGIC) {
548 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
552 if (hdr->ctdb_version != CTDB_VERSION) {
553 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
557 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
558 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
559 hdr->srcnode, hdr->destnode));
561 /* it is the responsibility of the incoming packet function to free 'data' */
562 daemon_incoming_packet(client, hdr);
566 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
568 if (client_pid->ctdb->client_pids != NULL) {
569 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
576 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
577 uint16_t flags, void *private_data)
579 struct sockaddr_un addr;
582 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
583 struct ctdb_client *client;
584 struct ctdb_client_pid_list *client_pid;
586 struct peercred_struct cr;
587 socklen_t crl = sizeof(struct peercred_struct);
590 socklen_t crl = sizeof(struct ucred);
593 memset(&addr, 0, sizeof(addr));
595 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
601 set_close_on_exec(fd);
603 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
605 client = talloc_zero(ctdb, struct ctdb_client);
607 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
609 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
611 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
616 client->client_id = ctdb_reqid_new(ctdb, client);
617 client->pid = cr.pid;
619 client_pid = talloc(client, struct ctdb_client_pid_list);
620 if (client_pid == NULL) {
621 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
626 client_pid->ctdb = ctdb;
627 client_pid->pid = cr.pid;
628 client_pid->client = client;
630 DLIST_ADD(ctdb->client_pids, client_pid);
632 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
633 ctdb_daemon_read_cb, client,
634 "client-%u", client->pid);
636 talloc_set_destructor(client, ctdb_client_destructor);
637 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
638 ctdb->statistics.num_clients++;
644 create a unix domain socket and bind it
645 return a file descriptor open on the socket
647 static int ux_socket_bind(struct ctdb_context *ctdb)
649 struct sockaddr_un addr;
651 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
652 if (ctdb->daemon.sd == -1) {
656 set_close_on_exec(ctdb->daemon.sd);
657 set_nonblocking(ctdb->daemon.sd);
659 memset(&addr, 0, sizeof(addr));
660 addr.sun_family = AF_UNIX;
661 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
663 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
664 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
668 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
669 chmod(ctdb->daemon.name, 0700) != 0) {
670 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
675 if (listen(ctdb->daemon.sd, 100) != 0) {
676 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
683 close(ctdb->daemon.sd);
684 ctdb->daemon.sd = -1;
688 static void sig_child_handler(struct event_context *ev,
689 struct signal_event *se, int signum, int count,
693 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
698 pid = waitpid(-1, &status, WNOHANG);
700 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
704 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
709 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
713 ctdb_fatal(ctdb, "Failed to run setup event\n");
716 ctdb_run_notification_script(ctdb, "setup");
718 /* tell all other nodes we've just started up */
719 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
720 0, CTDB_CONTROL_STARTUP, 0,
721 CTDB_CTRL_FLAG_NOREPLY,
722 tdb_null, NULL, NULL);
726 start the protocol going as a daemon
728 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
731 struct fd_event *fde;
732 const char *domain_socket_name;
733 struct signal_event *se;
735 /* get rid of any old sockets */
736 unlink(ctdb->daemon.name);
738 /* create a unix domain stream socket to listen to */
739 res = ux_socket_bind(ctdb);
741 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
745 if (do_fork && fork()) {
749 tdb_reopen_all(False);
754 if (open("/dev/null", O_RDONLY) != 0) {
755 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
759 block_signal(SIGPIPE);
761 ctdbd_pid = getpid();
764 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
766 ctdb_high_priority(ctdb);
768 /* ensure the socket is deleted on exit of the daemon */
769 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
770 if (domain_socket_name == NULL) {
771 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
775 ctdb->ev = event_context_init(NULL);
776 tevent_loop_allow_nesting(ctdb->ev);
778 ctdb_set_child_logging(ctdb);
780 /* force initial recovery for election */
781 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
783 if (strcmp(ctdb->transport, "tcp") == 0) {
784 int ctdb_tcp_init(struct ctdb_context *);
785 ret = ctdb_tcp_init(ctdb);
787 #ifdef USE_INFINIBAND
788 if (strcmp(ctdb->transport, "ib") == 0) {
789 int ctdb_ibw_init(struct ctdb_context *);
790 ret = ctdb_ibw_init(ctdb);
794 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
798 if (ctdb->methods == NULL) {
799 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
800 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
803 /* initialise the transport */
804 if (ctdb->methods->initialise(ctdb) != 0) {
805 ctdb_fatal(ctdb, "transport failed to initialise");
808 /* attach to existing databases */
809 if (ctdb_attach_databases(ctdb) != 0) {
810 ctdb_fatal(ctdb, "Failed to attach to databases\n");
813 /* start frozen, then let the first election sort things out */
814 if (ctdb_blocking_freeze(ctdb)) {
815 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
818 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
820 ctdb_fatal(ctdb, "Failed to run init event\n");
822 ctdb_run_notification_script(ctdb, "init");
824 /* now start accepting clients, only can do this once frozen */
825 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
827 ctdb_accept_client, ctdb);
828 tevent_fd_set_auto_close(fde);
830 /* release any IPs we hold from previous runs of the daemon */
831 ctdb_release_all_ips(ctdb);
833 /* start the transport going */
834 ctdb_start_transport(ctdb);
836 /* set up a handler to pick up sigchld */
837 se = event_add_signal(ctdb->ev, ctdb,
842 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
846 ret = ctdb_event_script_callback(ctdb,
848 ctdb_setup_event_callback,
854 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
859 if (start_syslog_daemon(ctdb)) {
860 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
865 ctdb_lockdown_memory(ctdb);
867 /* go into a wait loop to allow other nodes to complete */
868 event_loop_wait(ctdb->ev);
870 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
875 allocate a packet for use in daemon<->daemon communication
877 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
879 enum ctdb_operation operation,
880 size_t length, size_t slength,
884 struct ctdb_req_header *hdr;
886 length = MAX(length, slength);
887 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
889 if (ctdb->methods == NULL) {
890 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
891 operation, (unsigned)length));
895 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
897 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
898 operation, (unsigned)length));
901 talloc_set_name_const(hdr, type);
902 memset(hdr, 0, slength);
903 hdr->length = length;
904 hdr->operation = operation;
905 hdr->ctdb_magic = CTDB_MAGIC;
906 hdr->ctdb_version = CTDB_VERSION;
907 hdr->generation = ctdb->vnn_map->generation;
908 hdr->srcnode = ctdb->pnn;
913 struct daemon_control_state {
914 struct daemon_control_state *next, *prev;
915 struct ctdb_client *client;
916 struct ctdb_req_control *c;
918 struct ctdb_node *node;
922 callback when a control reply comes in
924 static void daemon_control_callback(struct ctdb_context *ctdb,
925 int32_t status, TDB_DATA data,
926 const char *errormsg,
929 struct daemon_control_state *state = talloc_get_type(private_data,
930 struct daemon_control_state);
931 struct ctdb_client *client = state->client;
932 struct ctdb_reply_control *r;
936 /* construct a message to send to the client containing the data */
937 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
939 len += strlen(errormsg);
941 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
942 struct ctdb_reply_control);
943 CTDB_NO_MEMORY_VOID(ctdb, r);
945 r->hdr.reqid = state->reqid;
947 r->datalen = data.dsize;
949 memcpy(&r->data[0], data.dptr, data.dsize);
951 r->errorlen = strlen(errormsg);
952 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
955 ret = daemon_queue_send(client, &r->hdr);
962 fail all pending controls to a disconnected node
964 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
966 struct daemon_control_state *state;
967 while ((state = node->pending_controls)) {
968 DLIST_REMOVE(node->pending_controls, state);
969 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
970 "node is disconnected", state);
975 destroy a daemon_control_state
977 static int daemon_control_destructor(struct daemon_control_state *state)
980 DLIST_REMOVE(state->node->pending_controls, state);
986 this is called when the ctdb daemon received a ctdb request control
987 from a local client over the unix domain socket
989 static void daemon_request_control_from_client(struct ctdb_client *client,
990 struct ctdb_req_control *c)
994 struct daemon_control_state *state;
995 TALLOC_CTX *tmp_ctx = talloc_new(client);
997 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
998 c->hdr.destnode = client->ctdb->pnn;
1001 state = talloc(client, struct daemon_control_state);
1002 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1004 state->client = client;
1005 state->c = talloc_steal(state, c);
1006 state->reqid = c->hdr.reqid;
1007 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1008 state->node = client->ctdb->nodes[c->hdr.destnode];
1009 DLIST_ADD(state->node->pending_controls, state);
1014 talloc_set_destructor(state, daemon_control_destructor);
1016 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1017 talloc_steal(tmp_ctx, state);
1020 data.dptr = &c->data[0];
1021 data.dsize = c->datalen;
1022 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1023 c->srvid, c->opcode, client->client_id,
1025 data, daemon_control_callback,
1028 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1032 talloc_free(tmp_ctx);
1036 register a call function
1038 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1039 ctdb_fn_t fn, int id)
1041 struct ctdb_registered_call *call;
1042 struct ctdb_db_context *ctdb_db;
1044 ctdb_db = find_ctdb_db(ctdb, db_id);
1045 if (ctdb_db == NULL) {
1049 call = talloc(ctdb_db, struct ctdb_registered_call);
1053 DLIST_ADD(ctdb_db->calls, call);
1060 this local messaging handler is ugly, but is needed to prevent
1061 recursion in ctdb_send_message() when the destination node is the
1062 same as the source node
1064 struct ctdb_local_message {
1065 struct ctdb_context *ctdb;
1070 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1071 struct timeval t, void *private_data)
1073 struct ctdb_local_message *m = talloc_get_type(private_data,
1074 struct ctdb_local_message);
1077 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1079 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1080 (unsigned long long)m->srvid));
1085 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1087 struct ctdb_local_message *m;
1088 m = talloc(ctdb, struct ctdb_local_message);
1089 CTDB_NO_MEMORY(ctdb, m);
1094 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1095 if (m->data.dptr == NULL) {
1100 /* this needs to be done as an event to prevent recursion */
1101 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1108 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1109 uint64_t srvid, TDB_DATA data)
1111 struct ctdb_req_message *r;
1114 if (ctdb->methods == NULL) {
1115 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1119 /* see if this is a message to ourselves */
1120 if (pnn == ctdb->pnn) {
1121 return ctdb_local_message(ctdb, srvid, data);
1124 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1125 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1126 struct ctdb_req_message);
1127 CTDB_NO_MEMORY(ctdb, r);
1129 r->hdr.destnode = pnn;
1131 r->datalen = data.dsize;
1132 memcpy(&r->data[0], data.dptr, data.dsize);
1134 ctdb_queue_packet(ctdb, &r->hdr);
1142 struct ctdb_client_notify_list {
1143 struct ctdb_client_notify_list *next, *prev;
1144 struct ctdb_context *ctdb;
1150 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1154 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1156 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1158 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1164 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1166 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1167 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1168 struct ctdb_client_notify_list *nl;
1170 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1172 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1173 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1177 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1178 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1183 if (client == NULL) {
1184 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1188 for(nl=client->notify; nl; nl=nl->next) {
1189 if (nl->srvid == notify->srvid) {
1194 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1198 nl = talloc(client, struct ctdb_client_notify_list);
1199 CTDB_NO_MEMORY(ctdb, nl);
1201 nl->srvid = notify->srvid;
1202 nl->data.dsize = notify->len;
1203 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1204 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1205 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1207 DLIST_ADD(client->notify, nl);
1208 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1213 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1215 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1216 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1217 struct ctdb_client_notify_list *nl;
1219 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1221 if (client == NULL) {
1222 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1226 for(nl=client->notify; nl; nl=nl->next) {
1227 if (nl->srvid == notify->srvid) {
1232 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1236 DLIST_REMOVE(client->notify, nl);
1237 talloc_set_destructor(nl, NULL);
1243 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1245 struct ctdb_client_pid_list *client_pid;
1247 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1248 if (client_pid->pid == pid) {
1249 return client_pid->client;
1256 /* This control is used by samba when probing if a process (of a samba daemon)
1258 Samba does this when it needs/wants to check if a subrecord in one of the
1259 databases is still valied, or if it is stale and can be removed.
1260 If the node is in unhealthy or stopped state we just kill of the samba
1261 process holding htis sub-record and return to the calling samba that
1262 the process does not exist.
1263 This allows us to forcefully recall subrecords registered by samba processes
1264 on banned and stopped nodes.
1266 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1268 struct ctdb_client *client;
1270 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1271 client = ctdb_find_client_by_pid(ctdb, pid);
1272 if (client != NULL) {
1273 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1274 talloc_free(client);
1279 return kill(pid, 0);