4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
108 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
112 message handler for when we are in daemon mode. This redirects the message
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
116 TDB_DATA data, void *private_data)
118 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119 struct ctdb_req_message *r;
122 /* construct a message to send to the client containing the data */
123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
124 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
125 len, struct ctdb_req_message);
126 CTDB_NO_MEMORY_VOID(ctdb, r);
128 talloc_set_name_const(r, "req_message packet");
131 r->datalen = data.dsize;
132 memcpy(&r->data[0], data.dptr, data.dsize);
134 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 CTDB_DECREMENT_STAT(client->ctdb, num_clients);
189 if (client->num_persistent_updates != 0) {
190 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
191 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
193 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
195 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
196 "commit active. Forcing recovery.\n"));
197 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
198 ctdb_db->transaction_active = false;
206 this is called when the ctdb daemon received a ctdb request message
207 from a local client over the unix domain socket
209 static void daemon_request_message_from_client(struct ctdb_client *client,
210 struct ctdb_req_message *c)
215 /* maybe the message is for another client on this node */
216 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
217 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
221 /* its for a remote node */
222 data.dptr = &c->data[0];
223 data.dsize = c->datalen;
224 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
233 struct daemon_call_state {
234 struct ctdb_client *client;
236 struct ctdb_call *call;
237 struct timeval start_time;
241 complete a call from a client
243 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
245 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
246 struct daemon_call_state);
247 struct ctdb_reply_call *r;
250 struct ctdb_client *client = dstate->client;
251 struct ctdb_db_context *ctdb_db = state->ctdb_db;
253 talloc_steal(client, dstate);
254 talloc_steal(dstate, dstate->call);
256 res = ctdb_daemon_call_recv(state, dstate->call);
258 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
259 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
261 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
265 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
266 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
267 length, struct ctdb_reply_call);
269 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
270 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
271 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
274 r->hdr.reqid = dstate->reqid;
275 r->datalen = dstate->call->reply_data.dsize;
276 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
278 res = daemon_queue_send(client, &r->hdr);
280 /* client is dead - return immediately */
284 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
286 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
287 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
291 struct ctdb_daemon_packet_wrap {
292 struct ctdb_context *ctdb;
297 a wrapper to catch disconnected clients
299 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
301 struct ctdb_client *client;
302 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
303 struct ctdb_daemon_packet_wrap);
305 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
309 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
310 if (client == NULL) {
311 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
319 daemon_incoming_packet(client, hdr);
324 this is called when the ctdb daemon received a ctdb request call
325 from a local client over the unix domain socket
327 static void daemon_request_call_from_client(struct ctdb_client *client,
328 struct ctdb_req_call *c)
330 struct ctdb_call_state *state;
331 struct ctdb_db_context *ctdb_db;
332 struct daemon_call_state *dstate;
333 struct ctdb_call *call;
334 struct ctdb_ltdb_header header;
337 struct ctdb_context *ctdb = client->ctdb;
338 struct ctdb_daemon_packet_wrap *w;
340 CTDB_INCREMENT_STAT(ctdb, total_calls);
341 CTDB_DECREMENT_STAT(ctdb, pending_calls);
343 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
345 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
347 CTDB_DECREMENT_STAT(ctdb, pending_calls);
351 if (ctdb_db->unhealthy_reason) {
353 * this is just a warning, as the tdb should be empty anyway,
354 * and only persistent databases can be unhealthy, which doesn't
355 * use this code patch
357 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
358 ctdb_db->db_name, ctdb_db->unhealthy_reason));
362 key.dsize = c->keylen;
364 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
365 CTDB_NO_MEMORY_VOID(ctdb, w);
368 w->client_id = client->client_id;
370 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
371 (struct ctdb_req_header *)c, &data,
372 daemon_incoming_packet_wrap, w, True);
374 /* will retry later */
375 CTDB_DECREMENT_STAT(ctdb, pending_calls);
382 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
383 CTDB_DECREMENT_STAT(ctdb, pending_calls);
387 dstate = talloc(client, struct daemon_call_state);
388 if (dstate == NULL) {
389 ret = ctdb_ltdb_unlock(ctdb_db, key);
391 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
394 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
395 CTDB_DECREMENT_STAT(ctdb, pending_calls);
398 dstate->start_time = timeval_current();
399 dstate->client = client;
400 dstate->reqid = c->hdr.reqid;
401 talloc_steal(dstate, data.dptr);
403 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
405 ret = ctdb_ltdb_unlock(ctdb_db, key);
407 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
410 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
411 CTDB_DECREMENT_STAT(ctdb, pending_calls);
412 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
416 call->call_id = c->callid;
418 call->call_data.dptr = c->data + c->keylen;
419 call->call_data.dsize = c->calldatalen;
420 call->flags = c->flags;
422 if (header.dmaster == ctdb->pnn) {
423 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
425 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
428 ret = ctdb_ltdb_unlock(ctdb_db, key);
430 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
434 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
435 CTDB_DECREMENT_STAT(ctdb, pending_calls);
436 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
439 talloc_steal(state, dstate);
440 talloc_steal(client, state);
442 state->async.fn = daemon_call_from_client_callback;
443 state->async.private_data = dstate;
447 static void daemon_request_control_from_client(struct ctdb_client *client,
448 struct ctdb_req_control *c);
450 /* data contains a packet from the client */
451 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
453 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
455 struct ctdb_context *ctdb = client->ctdb;
457 /* place the packet as a child of a tmp_ctx. We then use
458 talloc_free() below to free it. If any of the calls want
459 to keep it, then they will steal it somewhere else, and the
460 talloc_free() will be a no-op */
461 tmp_ctx = talloc_new(client);
462 talloc_steal(tmp_ctx, hdr);
464 if (hdr->ctdb_magic != CTDB_MAGIC) {
465 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
469 if (hdr->ctdb_version != CTDB_VERSION) {
470 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
474 switch (hdr->operation) {
476 CTDB_INCREMENT_STAT(ctdb, client.req_call);
477 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
480 case CTDB_REQ_MESSAGE:
481 CTDB_INCREMENT_STAT(ctdb, client.req_message);
482 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
485 case CTDB_REQ_CONTROL:
486 CTDB_INCREMENT_STAT(ctdb, client.req_control);
487 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
491 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
496 talloc_free(tmp_ctx);
500 called when the daemon gets a incoming packet
502 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
504 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
505 struct ctdb_req_header *hdr;
512 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
514 if (cnt < sizeof(*hdr)) {
515 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
519 hdr = (struct ctdb_req_header *)data;
520 if (cnt != hdr->length) {
521 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
522 (unsigned)hdr->length, (unsigned)cnt);
526 if (hdr->ctdb_magic != CTDB_MAGIC) {
527 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
531 if (hdr->ctdb_version != CTDB_VERSION) {
532 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
536 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
537 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
538 hdr->srcnode, hdr->destnode));
540 /* it is the responsibility of the incoming packet function to free 'data' */
541 daemon_incoming_packet(client, hdr);
545 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
547 if (client_pid->ctdb->client_pids != NULL) {
548 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
555 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
556 uint16_t flags, void *private_data)
558 struct sockaddr_un addr;
561 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
562 struct ctdb_client *client;
563 struct ctdb_client_pid_list *client_pid;
565 struct peercred_struct cr;
566 socklen_t crl = sizeof(struct peercred_struct);
569 socklen_t crl = sizeof(struct ucred);
572 memset(&addr, 0, sizeof(addr));
574 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
580 set_close_on_exec(fd);
582 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
584 client = talloc_zero(ctdb, struct ctdb_client);
586 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
588 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
590 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
595 client->client_id = ctdb_reqid_new(ctdb, client);
596 client->pid = cr.pid;
598 client_pid = talloc(client, struct ctdb_client_pid_list);
599 if (client_pid == NULL) {
600 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
605 client_pid->ctdb = ctdb;
606 client_pid->pid = cr.pid;
607 client_pid->client = client;
609 DLIST_ADD(ctdb->client_pids, client_pid);
611 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
612 ctdb_daemon_read_cb, client,
613 "client-%u", client->pid);
615 talloc_set_destructor(client, ctdb_client_destructor);
616 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
617 CTDB_INCREMENT_STAT(ctdb, num_clients);
623 create a unix domain socket and bind it
624 return a file descriptor open on the socket
626 static int ux_socket_bind(struct ctdb_context *ctdb)
628 struct sockaddr_un addr;
630 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
631 if (ctdb->daemon.sd == -1) {
635 set_close_on_exec(ctdb->daemon.sd);
636 set_nonblocking(ctdb->daemon.sd);
638 memset(&addr, 0, sizeof(addr));
639 addr.sun_family = AF_UNIX;
640 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
642 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
643 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
647 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
648 chmod(ctdb->daemon.name, 0700) != 0) {
649 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
654 if (listen(ctdb->daemon.sd, 100) != 0) {
655 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
662 close(ctdb->daemon.sd);
663 ctdb->daemon.sd = -1;
667 static void sig_child_handler(struct event_context *ev,
668 struct signal_event *se, int signum, int count,
672 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
677 pid = waitpid(-1, &status, WNOHANG);
679 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
683 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
688 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
692 ctdb_fatal(ctdb, "Failed to run setup event\n");
695 ctdb_run_notification_script(ctdb, "setup");
697 /* tell all other nodes we've just started up */
698 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
699 0, CTDB_CONTROL_STARTUP, 0,
700 CTDB_CTRL_FLAG_NOREPLY,
701 tdb_null, NULL, NULL);
705 start the protocol going as a daemon
707 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
710 struct fd_event *fde;
711 const char *domain_socket_name;
712 struct signal_event *se;
714 /* get rid of any old sockets */
715 unlink(ctdb->daemon.name);
717 /* create a unix domain stream socket to listen to */
718 res = ux_socket_bind(ctdb);
720 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
724 if (do_fork && fork()) {
728 tdb_reopen_all(False);
733 if (open("/dev/null", O_RDONLY) != 0) {
734 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
738 block_signal(SIGPIPE);
740 ctdbd_pid = getpid();
743 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
745 ctdb_high_priority(ctdb);
747 /* ensure the socket is deleted on exit of the daemon */
748 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
749 if (domain_socket_name == NULL) {
750 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
754 ctdb->ev = event_context_init(NULL);
755 tevent_loop_allow_nesting(ctdb->ev);
756 ret = ctdb_init_tevent_logging(ctdb);
758 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
762 ctdb_set_child_logging(ctdb);
764 /* initialize statistics collection */
765 ctdb_statistics_init(ctdb);
767 /* force initial recovery for election */
768 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
770 if (strcmp(ctdb->transport, "tcp") == 0) {
771 int ctdb_tcp_init(struct ctdb_context *);
772 ret = ctdb_tcp_init(ctdb);
774 #ifdef USE_INFINIBAND
775 if (strcmp(ctdb->transport, "ib") == 0) {
776 int ctdb_ibw_init(struct ctdb_context *);
777 ret = ctdb_ibw_init(ctdb);
781 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
785 if (ctdb->methods == NULL) {
786 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
787 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
790 /* initialise the transport */
791 if (ctdb->methods->initialise(ctdb) != 0) {
792 ctdb_fatal(ctdb, "transport failed to initialise");
795 /* attach to existing databases */
796 if (ctdb_attach_databases(ctdb) != 0) {
797 ctdb_fatal(ctdb, "Failed to attach to databases\n");
800 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
802 ctdb_fatal(ctdb, "Failed to run init event\n");
804 ctdb_run_notification_script(ctdb, "init");
806 /* start frozen, then let the first election sort things out */
807 if (ctdb_blocking_freeze(ctdb)) {
808 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
811 /* now start accepting clients, only can do this once frozen */
812 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
814 ctdb_accept_client, ctdb);
815 tevent_fd_set_auto_close(fde);
817 /* release any IPs we hold from previous runs of the daemon */
818 ctdb_release_all_ips(ctdb);
820 /* start the transport going */
821 ctdb_start_transport(ctdb);
823 /* set up a handler to pick up sigchld */
824 se = event_add_signal(ctdb->ev, ctdb,
829 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
833 ret = ctdb_event_script_callback(ctdb,
835 ctdb_setup_event_callback,
841 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
846 if (start_syslog_daemon(ctdb)) {
847 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
852 ctdb_lockdown_memory(ctdb);
854 /* go into a wait loop to allow other nodes to complete */
855 event_loop_wait(ctdb->ev);
857 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
862 allocate a packet for use in daemon<->daemon communication
864 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
866 enum ctdb_operation operation,
867 size_t length, size_t slength,
871 struct ctdb_req_header *hdr;
873 length = MAX(length, slength);
874 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
876 if (ctdb->methods == NULL) {
877 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
878 operation, (unsigned)length));
882 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
884 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
885 operation, (unsigned)length));
888 talloc_set_name_const(hdr, type);
889 memset(hdr, 0, slength);
890 hdr->length = length;
891 hdr->operation = operation;
892 hdr->ctdb_magic = CTDB_MAGIC;
893 hdr->ctdb_version = CTDB_VERSION;
894 hdr->generation = ctdb->vnn_map->generation;
895 hdr->srcnode = ctdb->pnn;
900 struct daemon_control_state {
901 struct daemon_control_state *next, *prev;
902 struct ctdb_client *client;
903 struct ctdb_req_control *c;
905 struct ctdb_node *node;
909 callback when a control reply comes in
911 static void daemon_control_callback(struct ctdb_context *ctdb,
912 int32_t status, TDB_DATA data,
913 const char *errormsg,
916 struct daemon_control_state *state = talloc_get_type(private_data,
917 struct daemon_control_state);
918 struct ctdb_client *client = state->client;
919 struct ctdb_reply_control *r;
923 /* construct a message to send to the client containing the data */
924 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
926 len += strlen(errormsg);
928 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
929 struct ctdb_reply_control);
930 CTDB_NO_MEMORY_VOID(ctdb, r);
932 r->hdr.reqid = state->reqid;
934 r->datalen = data.dsize;
936 memcpy(&r->data[0], data.dptr, data.dsize);
938 r->errorlen = strlen(errormsg);
939 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
942 ret = daemon_queue_send(client, &r->hdr);
949 fail all pending controls to a disconnected node
951 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
953 struct daemon_control_state *state;
954 while ((state = node->pending_controls)) {
955 DLIST_REMOVE(node->pending_controls, state);
956 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
957 "node is disconnected", state);
962 destroy a daemon_control_state
964 static int daemon_control_destructor(struct daemon_control_state *state)
967 DLIST_REMOVE(state->node->pending_controls, state);
973 this is called when the ctdb daemon received a ctdb request control
974 from a local client over the unix domain socket
976 static void daemon_request_control_from_client(struct ctdb_client *client,
977 struct ctdb_req_control *c)
981 struct daemon_control_state *state;
982 TALLOC_CTX *tmp_ctx = talloc_new(client);
984 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
985 c->hdr.destnode = client->ctdb->pnn;
988 state = talloc(client, struct daemon_control_state);
989 CTDB_NO_MEMORY_VOID(client->ctdb, state);
991 state->client = client;
992 state->c = talloc_steal(state, c);
993 state->reqid = c->hdr.reqid;
994 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
995 state->node = client->ctdb->nodes[c->hdr.destnode];
996 DLIST_ADD(state->node->pending_controls, state);
1001 talloc_set_destructor(state, daemon_control_destructor);
1003 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1004 talloc_steal(tmp_ctx, state);
1007 data.dptr = &c->data[0];
1008 data.dsize = c->datalen;
1009 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1010 c->srvid, c->opcode, client->client_id,
1012 data, daemon_control_callback,
1015 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1019 talloc_free(tmp_ctx);
1023 register a call function
1025 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1026 ctdb_fn_t fn, int id)
1028 struct ctdb_registered_call *call;
1029 struct ctdb_db_context *ctdb_db;
1031 ctdb_db = find_ctdb_db(ctdb, db_id);
1032 if (ctdb_db == NULL) {
1036 call = talloc(ctdb_db, struct ctdb_registered_call);
1040 DLIST_ADD(ctdb_db->calls, call);
1047 this local messaging handler is ugly, but is needed to prevent
1048 recursion in ctdb_send_message() when the destination node is the
1049 same as the source node
1051 struct ctdb_local_message {
1052 struct ctdb_context *ctdb;
1057 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1058 struct timeval t, void *private_data)
1060 struct ctdb_local_message *m = talloc_get_type(private_data,
1061 struct ctdb_local_message);
1064 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1066 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1067 (unsigned long long)m->srvid));
1072 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1074 struct ctdb_local_message *m;
1075 m = talloc(ctdb, struct ctdb_local_message);
1076 CTDB_NO_MEMORY(ctdb, m);
1081 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1082 if (m->data.dptr == NULL) {
1087 /* this needs to be done as an event to prevent recursion */
1088 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1095 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1096 uint64_t srvid, TDB_DATA data)
1098 struct ctdb_req_message *r;
1101 if (ctdb->methods == NULL) {
1102 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1106 /* see if this is a message to ourselves */
1107 if (pnn == ctdb->pnn) {
1108 return ctdb_local_message(ctdb, srvid, data);
1111 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1112 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1113 struct ctdb_req_message);
1114 CTDB_NO_MEMORY(ctdb, r);
1116 r->hdr.destnode = pnn;
1118 r->datalen = data.dsize;
1119 memcpy(&r->data[0], data.dptr, data.dsize);
1121 ctdb_queue_packet(ctdb, &r->hdr);
1129 struct ctdb_client_notify_list {
1130 struct ctdb_client_notify_list *next, *prev;
1131 struct ctdb_context *ctdb;
1137 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1141 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1143 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1145 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1151 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1153 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1154 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1155 struct ctdb_client_notify_list *nl;
1157 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1159 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1160 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1164 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1165 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1170 if (client == NULL) {
1171 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1175 for(nl=client->notify; nl; nl=nl->next) {
1176 if (nl->srvid == notify->srvid) {
1181 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1185 nl = talloc(client, struct ctdb_client_notify_list);
1186 CTDB_NO_MEMORY(ctdb, nl);
1188 nl->srvid = notify->srvid;
1189 nl->data.dsize = notify->len;
1190 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1191 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1192 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1194 DLIST_ADD(client->notify, nl);
1195 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1200 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1202 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1203 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1204 struct ctdb_client_notify_list *nl;
1206 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1208 if (client == NULL) {
1209 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1213 for(nl=client->notify; nl; nl=nl->next) {
1214 if (nl->srvid == notify->srvid) {
1219 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1223 DLIST_REMOVE(client->notify, nl);
1224 talloc_set_destructor(nl, NULL);
1230 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1232 struct ctdb_client_pid_list *client_pid;
1234 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1235 if (client_pid->pid == pid) {
1236 return client_pid->client;
1243 /* This control is used by samba when probing if a process (of a samba daemon)
1245 Samba does this when it needs/wants to check if a subrecord in one of the
1246 databases is still valied, or if it is stale and can be removed.
1247 If the node is in unhealthy or stopped state we just kill of the samba
1248 process holding htis sub-record and return to the calling samba that
1249 the process does not exist.
1250 This allows us to forcefully recall subrecords registered by samba processes
1251 on banned and stopped nodes.
1253 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1255 struct ctdb_client *client;
1257 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1258 client = ctdb_find_client_by_pid(ctdb, pid);
1259 if (client != NULL) {
1260 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1261 talloc_free(client);
1266 return kill(pid, 0);