4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
108 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
112 message handler for when we are in daemon mode. This redirects the message
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
116 TDB_DATA data, void *private_data)
118 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119 struct ctdb_req_message *r;
122 /* construct a message to send to the client containing the data */
123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
124 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
125 len, struct ctdb_req_message);
126 CTDB_NO_MEMORY_VOID(ctdb, r);
128 talloc_set_name_const(r, "req_message packet");
131 r->datalen = data.dsize;
132 memcpy(&r->data[0], data.dptr, data.dsize);
134 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 if (client->ctdb->statistics.num_clients) {
188 client->ctdb->statistics.num_clients--;
191 if (client->num_persistent_updates != 0) {
192 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
195 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
197 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198 "commit active. Forcing recovery.\n"));
199 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200 ctdb_db->transaction_active = false;
208 this is called when the ctdb daemon received a ctdb request message
209 from a local client over the unix domain socket
211 static void daemon_request_message_from_client(struct ctdb_client *client,
212 struct ctdb_req_message *c)
217 /* maybe the message is for another client on this node */
218 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
223 /* its for a remote node */
224 data.dptr = &c->data[0];
225 data.dsize = c->datalen;
226 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
229 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
235 struct daemon_call_state {
236 struct ctdb_client *client;
238 struct ctdb_call *call;
239 struct timeval start_time;
243 complete a call from a client
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
247 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
248 struct daemon_call_state);
249 struct ctdb_reply_call *r;
252 struct ctdb_client *client = dstate->client;
253 struct ctdb_db_context *ctdb_db = state->ctdb_db;
255 talloc_steal(client, dstate);
256 talloc_steal(dstate, dstate->call);
258 res = ctdb_daemon_call_recv(state, dstate->call);
260 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261 if (client->ctdb->statistics.pending_calls > 0) {
262 client->ctdb->statistics.pending_calls--;
264 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
268 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
270 length, struct ctdb_reply_call);
272 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273 if (client->ctdb->statistics.pending_calls > 0) {
274 client->ctdb->statistics.pending_calls--;
276 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
279 r->hdr.reqid = dstate->reqid;
280 r->datalen = dstate->call->reply_data.dsize;
281 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
283 res = daemon_queue_send(client, &r->hdr);
285 /* client is dead - return immediately */
289 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
291 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
293 if (client->ctdb->statistics.pending_calls > 0) {
294 client->ctdb->statistics.pending_calls--;
298 struct ctdb_daemon_packet_wrap {
299 struct ctdb_context *ctdb;
304 a wrapper to catch disconnected clients
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
308 struct ctdb_client *client;
309 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
310 struct ctdb_daemon_packet_wrap);
312 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
316 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317 if (client == NULL) {
318 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
326 daemon_incoming_packet(client, hdr);
331 this is called when the ctdb daemon received a ctdb request call
332 from a local client over the unix domain socket
334 static void daemon_request_call_from_client(struct ctdb_client *client,
335 struct ctdb_req_call *c)
337 struct ctdb_call_state *state;
338 struct ctdb_db_context *ctdb_db;
339 struct daemon_call_state *dstate;
340 struct ctdb_call *call;
341 struct ctdb_ltdb_header header;
344 struct ctdb_context *ctdb = client->ctdb;
345 struct ctdb_daemon_packet_wrap *w;
347 ctdb->statistics.total_calls++;
348 if (client->ctdb->statistics.pending_calls > 0) {
349 ctdb->statistics.pending_calls++;
352 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
354 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
356 if (client->ctdb->statistics.pending_calls > 0) {
357 ctdb->statistics.pending_calls--;
362 if (ctdb_db->unhealthy_reason) {
364 * this is just a warning, as the tdb should be empty anyway,
365 * and only persistent databases can be unhealthy, which doesn't
366 * use this code patch
368 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369 ctdb_db->db_name, ctdb_db->unhealthy_reason));
373 key.dsize = c->keylen;
375 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376 CTDB_NO_MEMORY_VOID(ctdb, w);
379 w->client_id = client->client_id;
381 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
382 (struct ctdb_req_header *)c, &data,
383 daemon_incoming_packet_wrap, w, True);
385 /* will retry later */
386 if (client->ctdb->statistics.pending_calls > 0) {
387 ctdb->statistics.pending_calls--;
395 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396 if (client->ctdb->statistics.pending_calls > 0) {
397 ctdb->statistics.pending_calls--;
402 dstate = talloc(client, struct daemon_call_state);
403 if (dstate == NULL) {
404 ret = ctdb_ltdb_unlock(ctdb_db, key);
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
410 if (client->ctdb->statistics.pending_calls > 0) {
411 ctdb->statistics.pending_calls--;
415 dstate->start_time = timeval_current();
416 dstate->client = client;
417 dstate->reqid = c->hdr.reqid;
418 talloc_steal(dstate, data.dptr);
420 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
422 ret = ctdb_ltdb_unlock(ctdb_db, key);
424 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
427 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
428 if (client->ctdb->statistics.pending_calls > 0) {
429 ctdb->statistics.pending_calls--;
431 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
435 call->call_id = c->callid;
437 call->call_data.dptr = c->data + c->keylen;
438 call->call_data.dsize = c->calldatalen;
439 call->flags = c->flags;
441 if (header.dmaster == ctdb->pnn) {
442 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
444 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
447 ret = ctdb_ltdb_unlock(ctdb_db, key);
449 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
453 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
454 if (client->ctdb->statistics.pending_calls > 0) {
455 ctdb->statistics.pending_calls--;
457 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
460 talloc_steal(state, dstate);
461 talloc_steal(client, state);
463 state->async.fn = daemon_call_from_client_callback;
464 state->async.private_data = dstate;
468 static void daemon_request_control_from_client(struct ctdb_client *client,
469 struct ctdb_req_control *c);
471 /* data contains a packet from the client */
472 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
474 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
476 struct ctdb_context *ctdb = client->ctdb;
478 /* place the packet as a child of a tmp_ctx. We then use
479 talloc_free() below to free it. If any of the calls want
480 to keep it, then they will steal it somewhere else, and the
481 talloc_free() will be a no-op */
482 tmp_ctx = talloc_new(client);
483 talloc_steal(tmp_ctx, hdr);
485 if (hdr->ctdb_magic != CTDB_MAGIC) {
486 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
490 if (hdr->ctdb_version != CTDB_VERSION) {
491 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
495 switch (hdr->operation) {
497 ctdb->statistics.client.req_call++;
498 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
501 case CTDB_REQ_MESSAGE:
502 ctdb->statistics.client.req_message++;
503 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
506 case CTDB_REQ_CONTROL:
507 ctdb->statistics.client.req_control++;
508 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
512 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
517 talloc_free(tmp_ctx);
521 called when the daemon gets a incoming packet
523 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
525 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
526 struct ctdb_req_header *hdr;
533 client->ctdb->statistics.client_packets_recv++;
535 if (cnt < sizeof(*hdr)) {
536 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
540 hdr = (struct ctdb_req_header *)data;
541 if (cnt != hdr->length) {
542 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
543 (unsigned)hdr->length, (unsigned)cnt);
547 if (hdr->ctdb_magic != CTDB_MAGIC) {
548 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
552 if (hdr->ctdb_version != CTDB_VERSION) {
553 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
557 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
558 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
559 hdr->srcnode, hdr->destnode));
561 /* it is the responsibility of the incoming packet function to free 'data' */
562 daemon_incoming_packet(client, hdr);
566 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
568 if (client_pid->ctdb->client_pids != NULL) {
569 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
576 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
577 uint16_t flags, void *private_data)
579 struct sockaddr_un addr;
582 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
583 struct ctdb_client *client;
584 struct ctdb_client_pid_list *client_pid;
586 struct peercred_struct cr;
587 socklen_t crl = sizeof(struct peercred_struct);
590 socklen_t crl = sizeof(struct ucred);
593 memset(&addr, 0, sizeof(addr));
595 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
601 set_close_on_exec(fd);
603 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
605 client = talloc_zero(ctdb, struct ctdb_client);
607 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
609 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
611 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
616 client->client_id = ctdb_reqid_new(ctdb, client);
617 client->pid = cr.pid;
619 client_pid = talloc(client, struct ctdb_client_pid_list);
620 if (client_pid == NULL) {
621 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
626 client_pid->ctdb = ctdb;
627 client_pid->pid = cr.pid;
628 client_pid->client = client;
630 DLIST_ADD(ctdb->client_pids, client_pid);
632 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
633 ctdb_daemon_read_cb, client,
634 "client-%u", client->pid);
636 talloc_set_destructor(client, ctdb_client_destructor);
637 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
638 ctdb->statistics.num_clients++;
644 create a unix domain socket and bind it
645 return a file descriptor open on the socket
647 static int ux_socket_bind(struct ctdb_context *ctdb)
649 struct sockaddr_un addr;
651 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
652 if (ctdb->daemon.sd == -1) {
656 set_close_on_exec(ctdb->daemon.sd);
657 set_nonblocking(ctdb->daemon.sd);
659 memset(&addr, 0, sizeof(addr));
660 addr.sun_family = AF_UNIX;
661 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
663 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
664 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
668 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
669 chmod(ctdb->daemon.name, 0700) != 0) {
670 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
675 if (listen(ctdb->daemon.sd, 100) != 0) {
676 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
683 close(ctdb->daemon.sd);
684 ctdb->daemon.sd = -1;
688 static void sig_child_handler(struct event_context *ev,
689 struct signal_event *se, int signum, int count,
693 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
698 pid = waitpid(-1, &status, WNOHANG);
700 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
704 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
709 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
713 ctdb_fatal(ctdb, "Failed to run setup event\n");
716 ctdb_run_notification_script(ctdb, "setup");
718 /* tell all other nodes we've just started up */
719 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
720 0, CTDB_CONTROL_STARTUP, 0,
721 CTDB_CTRL_FLAG_NOREPLY,
722 tdb_null, NULL, NULL);
726 start the protocol going as a daemon
728 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
731 struct fd_event *fde;
732 const char *domain_socket_name;
733 struct signal_event *se;
735 /* get rid of any old sockets */
736 unlink(ctdb->daemon.name);
738 /* create a unix domain stream socket to listen to */
739 res = ux_socket_bind(ctdb);
741 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
745 if (do_fork && fork()) {
749 tdb_reopen_all(False);
754 if (open("/dev/null", O_RDONLY) != 0) {
755 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
759 block_signal(SIGPIPE);
761 ctdbd_pid = getpid();
764 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
766 ctdb_high_priority(ctdb);
768 /* ensure the socket is deleted on exit of the daemon */
769 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
770 if (domain_socket_name == NULL) {
771 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
775 ctdb->ev = event_context_init(NULL);
777 ctdb_set_child_logging(ctdb);
779 /* force initial recovery for election */
780 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
782 if (strcmp(ctdb->transport, "tcp") == 0) {
783 int ctdb_tcp_init(struct ctdb_context *);
784 ret = ctdb_tcp_init(ctdb);
786 #ifdef USE_INFINIBAND
787 if (strcmp(ctdb->transport, "ib") == 0) {
788 int ctdb_ibw_init(struct ctdb_context *);
789 ret = ctdb_ibw_init(ctdb);
793 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
797 if (ctdb->methods == NULL) {
798 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
799 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
802 /* initialise the transport */
803 if (ctdb->methods->initialise(ctdb) != 0) {
804 ctdb_fatal(ctdb, "transport failed to initialise");
807 /* attach to existing databases */
808 if (ctdb_attach_databases(ctdb) != 0) {
809 ctdb_fatal(ctdb, "Failed to attach to databases\n");
812 /* start frozen, then let the first election sort things out */
813 if (ctdb_blocking_freeze(ctdb)) {
814 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
817 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
819 ctdb_fatal(ctdb, "Failed to run init event\n");
821 ctdb_run_notification_script(ctdb, "init");
823 /* now start accepting clients, only can do this once frozen */
824 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
826 ctdb_accept_client, ctdb);
827 tevent_fd_set_auto_close(fde);
829 /* release any IPs we hold from previous runs of the daemon */
830 ctdb_release_all_ips(ctdb);
832 /* start the transport going */
833 ctdb_start_transport(ctdb);
835 /* set up a handler to pick up sigchld */
836 se = event_add_signal(ctdb->ev, ctdb,
841 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
845 ret = ctdb_event_script_callback(ctdb,
847 ctdb_setup_event_callback,
853 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
858 if (start_syslog_daemon(ctdb)) {
859 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
864 ctdb_lockdown_memory(ctdb);
866 /* go into a wait loop to allow other nodes to complete */
867 event_loop_wait(ctdb->ev);
869 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
874 allocate a packet for use in daemon<->daemon communication
876 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
878 enum ctdb_operation operation,
879 size_t length, size_t slength,
883 struct ctdb_req_header *hdr;
885 length = MAX(length, slength);
886 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
888 if (ctdb->methods == NULL) {
889 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
890 operation, (unsigned)length));
894 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
896 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
897 operation, (unsigned)length));
900 talloc_set_name_const(hdr, type);
901 memset(hdr, 0, slength);
902 hdr->length = length;
903 hdr->operation = operation;
904 hdr->ctdb_magic = CTDB_MAGIC;
905 hdr->ctdb_version = CTDB_VERSION;
906 hdr->generation = ctdb->vnn_map->generation;
907 hdr->srcnode = ctdb->pnn;
912 struct daemon_control_state {
913 struct daemon_control_state *next, *prev;
914 struct ctdb_client *client;
915 struct ctdb_req_control *c;
917 struct ctdb_node *node;
921 callback when a control reply comes in
923 static void daemon_control_callback(struct ctdb_context *ctdb,
924 int32_t status, TDB_DATA data,
925 const char *errormsg,
928 struct daemon_control_state *state = talloc_get_type(private_data,
929 struct daemon_control_state);
930 struct ctdb_client *client = state->client;
931 struct ctdb_reply_control *r;
935 /* construct a message to send to the client containing the data */
936 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
938 len += strlen(errormsg);
940 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
941 struct ctdb_reply_control);
942 CTDB_NO_MEMORY_VOID(ctdb, r);
944 r->hdr.reqid = state->reqid;
946 r->datalen = data.dsize;
948 memcpy(&r->data[0], data.dptr, data.dsize);
950 r->errorlen = strlen(errormsg);
951 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
954 ret = daemon_queue_send(client, &r->hdr);
961 fail all pending controls to a disconnected node
963 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
965 struct daemon_control_state *state;
966 while ((state = node->pending_controls)) {
967 DLIST_REMOVE(node->pending_controls, state);
968 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
969 "node is disconnected", state);
974 destroy a daemon_control_state
976 static int daemon_control_destructor(struct daemon_control_state *state)
979 DLIST_REMOVE(state->node->pending_controls, state);
985 this is called when the ctdb daemon received a ctdb request control
986 from a local client over the unix domain socket
988 static void daemon_request_control_from_client(struct ctdb_client *client,
989 struct ctdb_req_control *c)
993 struct daemon_control_state *state;
994 TALLOC_CTX *tmp_ctx = talloc_new(client);
996 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
997 c->hdr.destnode = client->ctdb->pnn;
1000 state = talloc(client, struct daemon_control_state);
1001 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1003 state->client = client;
1004 state->c = talloc_steal(state, c);
1005 state->reqid = c->hdr.reqid;
1006 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1007 state->node = client->ctdb->nodes[c->hdr.destnode];
1008 DLIST_ADD(state->node->pending_controls, state);
1013 talloc_set_destructor(state, daemon_control_destructor);
1015 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1016 talloc_steal(tmp_ctx, state);
1019 data.dptr = &c->data[0];
1020 data.dsize = c->datalen;
1021 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1022 c->srvid, c->opcode, client->client_id,
1024 data, daemon_control_callback,
1027 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1031 talloc_free(tmp_ctx);
1035 register a call function
1037 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1038 ctdb_fn_t fn, int id)
1040 struct ctdb_registered_call *call;
1041 struct ctdb_db_context *ctdb_db;
1043 ctdb_db = find_ctdb_db(ctdb, db_id);
1044 if (ctdb_db == NULL) {
1048 call = talloc(ctdb_db, struct ctdb_registered_call);
1052 DLIST_ADD(ctdb_db->calls, call);
1059 this local messaging handler is ugly, but is needed to prevent
1060 recursion in ctdb_send_message() when the destination node is the
1061 same as the source node
1063 struct ctdb_local_message {
1064 struct ctdb_context *ctdb;
1069 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1070 struct timeval t, void *private_data)
1072 struct ctdb_local_message *m = talloc_get_type(private_data,
1073 struct ctdb_local_message);
1076 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1078 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1079 (unsigned long long)m->srvid));
1084 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1086 struct ctdb_local_message *m;
1087 m = talloc(ctdb, struct ctdb_local_message);
1088 CTDB_NO_MEMORY(ctdb, m);
1093 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1094 if (m->data.dptr == NULL) {
1099 /* this needs to be done as an event to prevent recursion */
1100 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1107 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1108 uint64_t srvid, TDB_DATA data)
1110 struct ctdb_req_message *r;
1113 if (ctdb->methods == NULL) {
1114 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1118 /* see if this is a message to ourselves */
1119 if (pnn == ctdb->pnn) {
1120 return ctdb_local_message(ctdb, srvid, data);
1123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1124 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1125 struct ctdb_req_message);
1126 CTDB_NO_MEMORY(ctdb, r);
1128 r->hdr.destnode = pnn;
1130 r->datalen = data.dsize;
1131 memcpy(&r->data[0], data.dptr, data.dsize);
1133 ctdb_queue_packet(ctdb, &r->hdr);
1141 struct ctdb_client_notify_list {
1142 struct ctdb_client_notify_list *next, *prev;
1143 struct ctdb_context *ctdb;
1149 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1153 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1155 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1157 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1163 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1165 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1166 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1167 struct ctdb_client_notify_list *nl;
1169 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1171 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1172 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1176 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1177 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1182 if (client == NULL) {
1183 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1187 for(nl=client->notify; nl; nl=nl->next) {
1188 if (nl->srvid == notify->srvid) {
1193 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1197 nl = talloc(client, struct ctdb_client_notify_list);
1198 CTDB_NO_MEMORY(ctdb, nl);
1200 nl->srvid = notify->srvid;
1201 nl->data.dsize = notify->len;
1202 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1203 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1204 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1206 DLIST_ADD(client->notify, nl);
1207 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1212 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1214 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1215 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1216 struct ctdb_client_notify_list *nl;
1218 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1220 if (client == NULL) {
1221 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1225 for(nl=client->notify; nl; nl=nl->next) {
1226 if (nl->srvid == notify->srvid) {
1231 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1235 DLIST_REMOVE(client->notify, nl);
1236 talloc_set_destructor(nl, NULL);
1242 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1244 struct ctdb_client_pid_list *client_pid;
1246 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1247 if (client_pid->pid == pid) {
1248 return client_pid->client;
1255 /* This control is used by samba when probing if a process (of a samba daemon)
1257 Samba does this when it needs/wants to check if a subrecord in one of the
1258 databases is still valied, or if it is stale and can be removed.
1259 If the node is in unhealthy or stopped state we just kill of the samba
1260 process holding htis sub-record and return to the calling samba that
1261 the process does not exist.
1262 This allows us to forcefully recall subrecords registered by samba processes
1263 on banned and stopped nodes.
1265 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1267 struct ctdb_client *client;
1269 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1270 client = ctdb_find_client_by_pid(ctdb, pid);
1271 if (client != NULL) {
1272 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1273 talloc_free(client);
1278 return kill(pid, 0);