4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
49 struct timeval t, void *private_data)
51 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
53 if (getpid() != ctdbd_pid) {
57 event_add_timed(ctdb->ev, ctdb,
58 timeval_current_ofs(1, 0),
59 ctdb_time_tick, ctdb);
62 /* Used to trigger a dummy event once per second, to make
63 * detection of hangs more reliable.
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
67 event_add_timed(ctdb->ev, ctdb,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick, ctdb);
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
76 if (ctdb->methods == NULL) {
77 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
81 /* start the transport running */
82 if (ctdb->methods->start(ctdb) != 0) {
83 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84 ctdb_fatal(ctdb, "transport failed to start");
87 /* start the recovery daemon process */
88 if (ctdb_start_recoverd(ctdb) != 0) {
89 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
93 /* Make sure we log something when the daemon terminates */
94 atexit(print_exit_message);
96 /* start monitoring for connected/disconnected nodes */
97 ctdb_start_keepalive(ctdb);
99 /* start monitoring for node health */
100 ctdb_start_monitoring(ctdb);
102 /* start periodic update of tcp tickle lists */
103 ctdb_start_tcp_tickle_update(ctdb);
105 /* start listening for recovery daemon pings */
106 ctdb_control_recd_ping(ctdb);
108 /* start listening to timer ticks */
109 ctdb_start_time_tickd(ctdb);
112 static void block_signal(int signum)
114 struct sigaction act;
116 memset(&act, 0, sizeof(act));
118 act.sa_handler = SIG_IGN;
119 sigemptyset(&act.sa_mask);
120 sigaddset(&act.sa_mask, signum);
121 sigaction(signum, &act, NULL);
126 send a packet to a client
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
130 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131 if (hdr->operation == CTDB_REQ_MESSAGE) {
132 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
142 message handler for when we are in daemon mode. This redirects the message
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
146 TDB_DATA data, void *private_data)
148 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149 struct ctdb_req_message *r;
152 /* construct a message to send to the client containing the data */
153 len = offsetof(struct ctdb_req_message, data) + data.dsize;
154 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
155 len, struct ctdb_req_message);
156 CTDB_NO_MEMORY_VOID(ctdb, r);
158 talloc_set_name_const(r, "req_message packet");
161 r->datalen = data.dsize;
162 memcpy(&r->data[0], data.dptr, data.dsize);
164 daemon_queue_send(client, &r->hdr);
170 this is called when the ctdb daemon received a ctdb request to
171 set the srvid from the client
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
175 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
177 if (client == NULL) {
178 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
181 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
183 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
184 (unsigned long long)srvid));
186 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
187 (unsigned long long)srvid));
194 this is called when the ctdb daemon received a ctdb request to
195 remove a srvid from the client
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200 if (client == NULL) {
201 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
204 return ctdb_deregister_message_handler(ctdb, srvid, client);
209 destroy a ctdb_client
211 static int ctdb_client_destructor(struct ctdb_client *client)
213 struct ctdb_db_context *ctdb_db;
215 ctdb_takeover_client_destructor_hook(client);
216 ctdb_reqid_remove(client->ctdb, client->client_id);
217 CTDB_DECREMENT_STAT(client->ctdb, num_clients);
219 if (client->num_persistent_updates != 0) {
220 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
221 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
223 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
225 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
226 "commit active. Forcing recovery.\n"));
227 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
229 /* legacy trans2 transaction state: */
230 ctdb_db->transaction_active = false;
233 * trans3 transaction state:
235 * The destructor sets the pointer to NULL.
237 talloc_free(ctdb_db->persistent_state);
245 this is called when the ctdb daemon received a ctdb request message
246 from a local client over the unix domain socket
248 static void daemon_request_message_from_client(struct ctdb_client *client,
249 struct ctdb_req_message *c)
254 /* maybe the message is for another client on this node */
255 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
256 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
260 /* its for a remote node */
261 data.dptr = &c->data[0];
262 data.dsize = c->datalen;
263 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
266 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
272 struct daemon_call_state {
273 struct ctdb_client *client;
275 struct ctdb_call *call;
276 struct timeval start_time;
280 complete a call from a client
282 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
284 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
285 struct daemon_call_state);
286 struct ctdb_reply_call *r;
289 struct ctdb_client *client = dstate->client;
290 struct ctdb_db_context *ctdb_db = state->ctdb_db;
292 talloc_steal(client, dstate);
293 talloc_steal(dstate, dstate->call);
295 res = ctdb_daemon_call_recv(state, dstate->call);
297 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
298 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
300 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
304 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
305 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
306 length, struct ctdb_reply_call);
308 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
309 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
310 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
313 r->hdr.reqid = dstate->reqid;
314 r->datalen = dstate->call->reply_data.dsize;
315 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
317 res = daemon_queue_send(client, &r->hdr);
319 /* client is dead - return immediately */
323 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
325 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
326 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
330 struct ctdb_daemon_packet_wrap {
331 struct ctdb_context *ctdb;
336 a wrapper to catch disconnected clients
338 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
340 struct ctdb_client *client;
341 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
342 struct ctdb_daemon_packet_wrap);
344 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
348 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
349 if (client == NULL) {
350 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
358 daemon_incoming_packet(client, hdr);
363 this is called when the ctdb daemon received a ctdb request call
364 from a local client over the unix domain socket
366 static void daemon_request_call_from_client(struct ctdb_client *client,
367 struct ctdb_req_call *c)
369 struct ctdb_call_state *state;
370 struct ctdb_db_context *ctdb_db;
371 struct daemon_call_state *dstate;
372 struct ctdb_call *call;
373 struct ctdb_ltdb_header header;
376 struct ctdb_context *ctdb = client->ctdb;
377 struct ctdb_daemon_packet_wrap *w;
379 CTDB_INCREMENT_STAT(ctdb, total_calls);
380 CTDB_DECREMENT_STAT(ctdb, pending_calls);
382 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
384 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
386 CTDB_DECREMENT_STAT(ctdb, pending_calls);
390 if (ctdb_db->unhealthy_reason) {
392 * this is just a warning, as the tdb should be empty anyway,
393 * and only persistent databases can be unhealthy, which doesn't
394 * use this code patch
396 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
397 ctdb_db->db_name, ctdb_db->unhealthy_reason));
401 key.dsize = c->keylen;
403 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
404 CTDB_NO_MEMORY_VOID(ctdb, w);
407 w->client_id = client->client_id;
409 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
410 (struct ctdb_req_header *)c, &data,
411 daemon_incoming_packet_wrap, w, True);
413 /* will retry later */
414 CTDB_DECREMENT_STAT(ctdb, pending_calls);
421 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
422 CTDB_DECREMENT_STAT(ctdb, pending_calls);
426 dstate = talloc(client, struct daemon_call_state);
427 if (dstate == NULL) {
428 ret = ctdb_ltdb_unlock(ctdb_db, key);
430 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
433 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
434 CTDB_DECREMENT_STAT(ctdb, pending_calls);
437 dstate->start_time = timeval_current();
438 dstate->client = client;
439 dstate->reqid = c->hdr.reqid;
440 talloc_steal(dstate, data.dptr);
442 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
444 ret = ctdb_ltdb_unlock(ctdb_db, key);
446 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
449 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
450 CTDB_DECREMENT_STAT(ctdb, pending_calls);
451 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
455 call->call_id = c->callid;
457 call->call_data.dptr = c->data + c->keylen;
458 call->call_data.dsize = c->calldatalen;
459 call->flags = c->flags;
461 if (header.dmaster == ctdb->pnn) {
462 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
464 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
467 ret = ctdb_ltdb_unlock(ctdb_db, key);
469 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
473 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
474 CTDB_DECREMENT_STAT(ctdb, pending_calls);
475 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
478 talloc_steal(state, dstate);
479 talloc_steal(client, state);
481 state->async.fn = daemon_call_from_client_callback;
482 state->async.private_data = dstate;
486 static void daemon_request_control_from_client(struct ctdb_client *client,
487 struct ctdb_req_control *c);
489 /* data contains a packet from the client */
490 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
492 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
494 struct ctdb_context *ctdb = client->ctdb;
496 /* place the packet as a child of a tmp_ctx. We then use
497 talloc_free() below to free it. If any of the calls want
498 to keep it, then they will steal it somewhere else, and the
499 talloc_free() will be a no-op */
500 tmp_ctx = talloc_new(client);
501 talloc_steal(tmp_ctx, hdr);
503 if (hdr->ctdb_magic != CTDB_MAGIC) {
504 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
508 if (hdr->ctdb_version != CTDB_VERSION) {
509 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
513 switch (hdr->operation) {
515 CTDB_INCREMENT_STAT(ctdb, client.req_call);
516 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
519 case CTDB_REQ_MESSAGE:
520 CTDB_INCREMENT_STAT(ctdb, client.req_message);
521 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
524 case CTDB_REQ_CONTROL:
525 CTDB_INCREMENT_STAT(ctdb, client.req_control);
526 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
530 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
535 talloc_free(tmp_ctx);
539 called when the daemon gets a incoming packet
541 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
543 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
544 struct ctdb_req_header *hdr;
551 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
553 if (cnt < sizeof(*hdr)) {
554 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
558 hdr = (struct ctdb_req_header *)data;
559 if (cnt != hdr->length) {
560 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
561 (unsigned)hdr->length, (unsigned)cnt);
565 if (hdr->ctdb_magic != CTDB_MAGIC) {
566 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
570 if (hdr->ctdb_version != CTDB_VERSION) {
571 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
575 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
576 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
577 hdr->srcnode, hdr->destnode));
579 /* it is the responsibility of the incoming packet function to free 'data' */
580 daemon_incoming_packet(client, hdr);
584 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
586 if (client_pid->ctdb->client_pids != NULL) {
587 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
594 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
595 uint16_t flags, void *private_data)
597 struct sockaddr_un addr;
600 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
601 struct ctdb_client *client;
602 struct ctdb_client_pid_list *client_pid;
604 struct peercred_struct cr;
605 socklen_t crl = sizeof(struct peercred_struct);
608 socklen_t crl = sizeof(struct ucred);
611 memset(&addr, 0, sizeof(addr));
613 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
619 set_close_on_exec(fd);
621 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
623 client = talloc_zero(ctdb, struct ctdb_client);
625 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
627 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
629 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
634 client->client_id = ctdb_reqid_new(ctdb, client);
635 client->pid = cr.pid;
637 client_pid = talloc(client, struct ctdb_client_pid_list);
638 if (client_pid == NULL) {
639 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
644 client_pid->ctdb = ctdb;
645 client_pid->pid = cr.pid;
646 client_pid->client = client;
648 DLIST_ADD(ctdb->client_pids, client_pid);
650 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
651 ctdb_daemon_read_cb, client,
652 "client-%u", client->pid);
654 talloc_set_destructor(client, ctdb_client_destructor);
655 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
656 CTDB_INCREMENT_STAT(ctdb, num_clients);
662 create a unix domain socket and bind it
663 return a file descriptor open on the socket
665 static int ux_socket_bind(struct ctdb_context *ctdb)
667 struct sockaddr_un addr;
669 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
670 if (ctdb->daemon.sd == -1) {
674 set_close_on_exec(ctdb->daemon.sd);
675 set_nonblocking(ctdb->daemon.sd);
677 memset(&addr, 0, sizeof(addr));
678 addr.sun_family = AF_UNIX;
679 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
681 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
682 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
686 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
687 chmod(ctdb->daemon.name, 0700) != 0) {
688 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
693 if (listen(ctdb->daemon.sd, 100) != 0) {
694 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
701 close(ctdb->daemon.sd);
702 ctdb->daemon.sd = -1;
706 static void sig_child_handler(struct event_context *ev,
707 struct signal_event *se, int signum, int count,
711 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
716 pid = waitpid(-1, &status, WNOHANG);
718 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
722 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
727 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
731 ctdb_fatal(ctdb, "Failed to run setup event\n");
734 ctdb_run_notification_script(ctdb, "setup");
736 /* tell all other nodes we've just started up */
737 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
738 0, CTDB_CONTROL_STARTUP, 0,
739 CTDB_CTRL_FLAG_NOREPLY,
740 tdb_null, NULL, NULL);
744 start the protocol going as a daemon
746 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
749 struct fd_event *fde;
750 const char *domain_socket_name;
751 struct signal_event *se;
753 /* get rid of any old sockets */
754 unlink(ctdb->daemon.name);
756 /* create a unix domain stream socket to listen to */
757 res = ux_socket_bind(ctdb);
759 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
763 if (do_fork && fork()) {
767 tdb_reopen_all(False);
772 if (open("/dev/null", O_RDONLY) != 0) {
773 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
777 block_signal(SIGPIPE);
779 ctdbd_pid = getpid();
782 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
784 if (ctdb->do_setsched) {
785 /* try to set us up as realtime */
786 ctdb_set_scheduler(ctdb);
789 /* ensure the socket is deleted on exit of the daemon */
790 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
791 if (domain_socket_name == NULL) {
792 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
796 ctdb->ev = event_context_init(NULL);
797 tevent_loop_allow_nesting(ctdb->ev);
798 ret = ctdb_init_tevent_logging(ctdb);
800 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
804 ctdb_set_child_logging(ctdb);
806 /* initialize statistics collection */
807 ctdb_statistics_init(ctdb);
809 /* force initial recovery for election */
810 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
812 if (strcmp(ctdb->transport, "tcp") == 0) {
813 int ctdb_tcp_init(struct ctdb_context *);
814 ret = ctdb_tcp_init(ctdb);
816 #ifdef USE_INFINIBAND
817 if (strcmp(ctdb->transport, "ib") == 0) {
818 int ctdb_ibw_init(struct ctdb_context *);
819 ret = ctdb_ibw_init(ctdb);
823 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
827 if (ctdb->methods == NULL) {
828 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
829 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
832 /* initialise the transport */
833 if (ctdb->methods->initialise(ctdb) != 0) {
834 ctdb_fatal(ctdb, "transport failed to initialise");
836 if (public_address_list) {
837 ret = ctdb_set_public_addresses(ctdb, public_address_list);
839 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
845 /* attach to existing databases */
846 if (ctdb_attach_databases(ctdb) != 0) {
847 ctdb_fatal(ctdb, "Failed to attach to databases\n");
850 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
852 ctdb_fatal(ctdb, "Failed to run init event\n");
854 ctdb_run_notification_script(ctdb, "init");
856 /* start frozen, then let the first election sort things out */
857 if (ctdb_blocking_freeze(ctdb)) {
858 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
861 /* now start accepting clients, only can do this once frozen */
862 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
864 ctdb_accept_client, ctdb);
865 tevent_fd_set_auto_close(fde);
867 /* release any IPs we hold from previous runs of the daemon */
868 if (ctdb->tunable.disable_ip_failover == 0) {
869 ctdb_release_all_ips(ctdb);
872 /* start the transport going */
873 ctdb_start_transport(ctdb);
875 /* set up a handler to pick up sigchld */
876 se = event_add_signal(ctdb->ev, ctdb,
881 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
885 ret = ctdb_event_script_callback(ctdb,
887 ctdb_setup_event_callback,
893 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
898 if (start_syslog_daemon(ctdb)) {
899 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
904 ctdb_lockdown_memory(ctdb);
906 /* go into a wait loop to allow other nodes to complete */
907 event_loop_wait(ctdb->ev);
909 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
914 allocate a packet for use in daemon<->daemon communication
916 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
918 enum ctdb_operation operation,
919 size_t length, size_t slength,
923 struct ctdb_req_header *hdr;
925 length = MAX(length, slength);
926 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
928 if (ctdb->methods == NULL) {
929 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
930 operation, (unsigned)length));
934 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
936 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
937 operation, (unsigned)length));
940 talloc_set_name_const(hdr, type);
941 memset(hdr, 0, slength);
942 hdr->length = length;
943 hdr->operation = operation;
944 hdr->ctdb_magic = CTDB_MAGIC;
945 hdr->ctdb_version = CTDB_VERSION;
946 hdr->generation = ctdb->vnn_map->generation;
947 hdr->srcnode = ctdb->pnn;
952 struct daemon_control_state {
953 struct daemon_control_state *next, *prev;
954 struct ctdb_client *client;
955 struct ctdb_req_control *c;
957 struct ctdb_node *node;
961 callback when a control reply comes in
963 static void daemon_control_callback(struct ctdb_context *ctdb,
964 int32_t status, TDB_DATA data,
965 const char *errormsg,
968 struct daemon_control_state *state = talloc_get_type(private_data,
969 struct daemon_control_state);
970 struct ctdb_client *client = state->client;
971 struct ctdb_reply_control *r;
975 /* construct a message to send to the client containing the data */
976 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
978 len += strlen(errormsg);
980 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
981 struct ctdb_reply_control);
982 CTDB_NO_MEMORY_VOID(ctdb, r);
984 r->hdr.reqid = state->reqid;
986 r->datalen = data.dsize;
988 memcpy(&r->data[0], data.dptr, data.dsize);
990 r->errorlen = strlen(errormsg);
991 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
994 ret = daemon_queue_send(client, &r->hdr);
1001 fail all pending controls to a disconnected node
1003 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1005 struct daemon_control_state *state;
1006 while ((state = node->pending_controls)) {
1007 DLIST_REMOVE(node->pending_controls, state);
1008 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1009 "node is disconnected", state);
1014 destroy a daemon_control_state
1016 static int daemon_control_destructor(struct daemon_control_state *state)
1019 DLIST_REMOVE(state->node->pending_controls, state);
1025 this is called when the ctdb daemon received a ctdb request control
1026 from a local client over the unix domain socket
1028 static void daemon_request_control_from_client(struct ctdb_client *client,
1029 struct ctdb_req_control *c)
1033 struct daemon_control_state *state;
1034 TALLOC_CTX *tmp_ctx = talloc_new(client);
1036 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1037 c->hdr.destnode = client->ctdb->pnn;
1040 state = talloc(client, struct daemon_control_state);
1041 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1043 state->client = client;
1044 state->c = talloc_steal(state, c);
1045 state->reqid = c->hdr.reqid;
1046 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1047 state->node = client->ctdb->nodes[c->hdr.destnode];
1048 DLIST_ADD(state->node->pending_controls, state);
1053 talloc_set_destructor(state, daemon_control_destructor);
1055 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1056 talloc_steal(tmp_ctx, state);
1059 data.dptr = &c->data[0];
1060 data.dsize = c->datalen;
1061 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1062 c->srvid, c->opcode, client->client_id,
1064 data, daemon_control_callback,
1067 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1071 talloc_free(tmp_ctx);
1075 register a call function
1077 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1078 ctdb_fn_t fn, int id)
1080 struct ctdb_registered_call *call;
1081 struct ctdb_db_context *ctdb_db;
1083 ctdb_db = find_ctdb_db(ctdb, db_id);
1084 if (ctdb_db == NULL) {
1088 call = talloc(ctdb_db, struct ctdb_registered_call);
1092 DLIST_ADD(ctdb_db->calls, call);
1099 this local messaging handler is ugly, but is needed to prevent
1100 recursion in ctdb_send_message() when the destination node is the
1101 same as the source node
1103 struct ctdb_local_message {
1104 struct ctdb_context *ctdb;
1109 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1110 struct timeval t, void *private_data)
1112 struct ctdb_local_message *m = talloc_get_type(private_data,
1113 struct ctdb_local_message);
1116 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1118 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1119 (unsigned long long)m->srvid));
1124 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1126 struct ctdb_local_message *m;
1127 m = talloc(ctdb, struct ctdb_local_message);
1128 CTDB_NO_MEMORY(ctdb, m);
1133 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1134 if (m->data.dptr == NULL) {
1139 /* this needs to be done as an event to prevent recursion */
1140 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1147 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1148 uint64_t srvid, TDB_DATA data)
1150 struct ctdb_req_message *r;
1153 if (ctdb->methods == NULL) {
1154 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1158 /* see if this is a message to ourselves */
1159 if (pnn == ctdb->pnn) {
1160 return ctdb_local_message(ctdb, srvid, data);
1163 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1164 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1165 struct ctdb_req_message);
1166 CTDB_NO_MEMORY(ctdb, r);
1168 r->hdr.destnode = pnn;
1170 r->datalen = data.dsize;
1171 memcpy(&r->data[0], data.dptr, data.dsize);
1173 ctdb_queue_packet(ctdb, &r->hdr);
1181 struct ctdb_client_notify_list {
1182 struct ctdb_client_notify_list *next, *prev;
1183 struct ctdb_context *ctdb;
1189 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1193 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1195 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1197 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1203 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1205 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1206 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1207 struct ctdb_client_notify_list *nl;
1209 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1211 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1212 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1216 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1217 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1222 if (client == NULL) {
1223 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1227 for(nl=client->notify; nl; nl=nl->next) {
1228 if (nl->srvid == notify->srvid) {
1233 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1237 nl = talloc(client, struct ctdb_client_notify_list);
1238 CTDB_NO_MEMORY(ctdb, nl);
1240 nl->srvid = notify->srvid;
1241 nl->data.dsize = notify->len;
1242 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1243 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1244 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1246 DLIST_ADD(client->notify, nl);
1247 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1252 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1254 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1255 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1256 struct ctdb_client_notify_list *nl;
1258 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1260 if (client == NULL) {
1261 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1265 for(nl=client->notify; nl; nl=nl->next) {
1266 if (nl->srvid == notify->srvid) {
1271 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1275 DLIST_REMOVE(client->notify, nl);
1276 talloc_set_destructor(nl, NULL);
1282 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1284 struct ctdb_client_pid_list *client_pid;
1286 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1287 if (client_pid->pid == pid) {
1288 return client_pid->client;
1295 /* This control is used by samba when probing if a process (of a samba daemon)
1297 Samba does this when it needs/wants to check if a subrecord in one of the
1298 databases is still valied, or if it is stale and can be removed.
1299 If the node is in unhealthy or stopped state we just kill of the samba
1300 process holding htis sub-record and return to the calling samba that
1301 the process does not exist.
1302 This allows us to forcefully recall subrecords registered by samba processes
1303 on banned and stopped nodes.
1305 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1307 struct ctdb_client *client;
1309 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1310 client = ctdb_find_client_by_pid(ctdb, pid);
1311 if (client != NULL) {
1312 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1313 talloc_free(client);
1318 return kill(pid, 0);