4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
107 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
111 message handler for when we are in daemon mode. This redirects the message
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
115 TDB_DATA data, void *private_data)
117 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118 struct ctdb_req_message *r;
121 /* construct a message to send to the client containing the data */
122 len = offsetof(struct ctdb_req_message, data) + data.dsize;
123 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
124 len, struct ctdb_req_message);
125 CTDB_NO_MEMORY_VOID(ctdb, r);
127 talloc_set_name_const(r, "req_message packet");
130 r->datalen = data.dsize;
131 memcpy(&r->data[0], data.dptr, data.dsize);
133 daemon_queue_send(client, &r->hdr);
139 this is called when the ctdb daemon received a ctdb request to
140 set the srvid from the client
142 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146 if (client == NULL) {
147 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
150 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
153 (unsigned long long)srvid));
155 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
156 (unsigned long long)srvid));
163 this is called when the ctdb daemon received a ctdb request to
164 remove a srvid from the client
166 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
169 if (client == NULL) {
170 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
173 return ctdb_deregister_message_handler(ctdb, srvid, client);
178 destroy a ctdb_client
180 static int ctdb_client_destructor(struct ctdb_client *client)
182 struct ctdb_db_context *ctdb_db;
184 ctdb_takeover_client_destructor_hook(client);
185 ctdb_reqid_remove(client->ctdb, client->client_id);
186 if (client->ctdb->statistics.num_clients) {
187 client->ctdb->statistics.num_clients--;
190 if (client->num_persistent_updates != 0) {
191 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
192 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
197 "commit active. Forcing recovery.\n"));
198 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
199 ctdb_db->transaction_active = false;
207 this is called when the ctdb daemon received a ctdb request message
208 from a local client over the unix domain socket
210 static void daemon_request_message_from_client(struct ctdb_client *client,
211 struct ctdb_req_message *c)
216 /* maybe the message is for another client on this node */
217 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
218 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
222 /* its for a remote node */
223 data.dptr = &c->data[0];
224 data.dsize = c->datalen;
225 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
228 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
234 struct daemon_call_state {
235 struct ctdb_client *client;
237 struct ctdb_call *call;
238 struct timeval start_time;
242 complete a call from a client
244 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
247 struct daemon_call_state);
248 struct ctdb_reply_call *r;
251 struct ctdb_client *client = dstate->client;
252 struct ctdb_db_context *ctdb_db = state->ctdb_db;
254 talloc_steal(client, dstate);
255 talloc_steal(dstate, dstate->call);
257 res = ctdb_daemon_call_recv(state, dstate->call);
259 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
260 if (client->ctdb->statistics.pending_calls > 0) {
261 client->ctdb->statistics.pending_calls--;
263 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
267 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
268 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
269 length, struct ctdb_reply_call);
271 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
272 if (client->ctdb->statistics.pending_calls > 0) {
273 client->ctdb->statistics.pending_calls--;
275 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
278 r->hdr.reqid = dstate->reqid;
279 r->datalen = dstate->call->reply_data.dsize;
280 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282 res = daemon_queue_send(client, &r->hdr);
284 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
286 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
288 if (client->ctdb->statistics.pending_calls > 0) {
289 client->ctdb->statistics.pending_calls--;
293 struct ctdb_daemon_packet_wrap {
294 struct ctdb_context *ctdb;
299 a wrapper to catch disconnected clients
301 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
303 struct ctdb_client *client;
304 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
305 struct ctdb_daemon_packet_wrap);
307 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
311 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
312 if (client == NULL) {
313 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
321 daemon_incoming_packet(client, hdr);
326 this is called when the ctdb daemon received a ctdb request call
327 from a local client over the unix domain socket
329 static void daemon_request_call_from_client(struct ctdb_client *client,
330 struct ctdb_req_call *c)
332 struct ctdb_call_state *state;
333 struct ctdb_db_context *ctdb_db;
334 struct daemon_call_state *dstate;
335 struct ctdb_call *call;
336 struct ctdb_ltdb_header header;
339 struct ctdb_context *ctdb = client->ctdb;
340 struct ctdb_daemon_packet_wrap *w;
342 ctdb->statistics.total_calls++;
343 if (client->ctdb->statistics.pending_calls > 0) {
344 ctdb->statistics.pending_calls++;
347 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
349 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
351 if (client->ctdb->statistics.pending_calls > 0) {
352 ctdb->statistics.pending_calls--;
357 if (ctdb_db->unhealthy_reason) {
359 * this is just a warning, as the tdb should be empty anyway,
360 * and only persistent databases can be unhealthy, which doesn't
361 * use this code patch
363 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
364 ctdb_db->db_name, ctdb_db->unhealthy_reason));
368 key.dsize = c->keylen;
370 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
371 CTDB_NO_MEMORY_VOID(ctdb, w);
374 w->client_id = client->client_id;
376 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
377 (struct ctdb_req_header *)c, &data,
378 daemon_incoming_packet_wrap, w, True);
380 /* will retry later */
381 if (client->ctdb->statistics.pending_calls > 0) {
382 ctdb->statistics.pending_calls--;
390 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
391 if (client->ctdb->statistics.pending_calls > 0) {
392 ctdb->statistics.pending_calls--;
397 dstate = talloc(client, struct daemon_call_state);
398 if (dstate == NULL) {
399 ctdb_ltdb_unlock(ctdb_db, key);
400 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
401 if (client->ctdb->statistics.pending_calls > 0) {
402 ctdb->statistics.pending_calls--;
406 dstate->start_time = timeval_current();
407 dstate->client = client;
408 dstate->reqid = c->hdr.reqid;
409 talloc_steal(dstate, data.dptr);
411 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
413 ctdb_ltdb_unlock(ctdb_db, key);
414 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
415 if (client->ctdb->statistics.pending_calls > 0) {
416 ctdb->statistics.pending_calls--;
418 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
422 call->call_id = c->callid;
424 call->call_data.dptr = c->data + c->keylen;
425 call->call_data.dsize = c->calldatalen;
426 call->flags = c->flags;
428 if (header.dmaster == ctdb->pnn) {
429 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
431 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
434 ctdb_ltdb_unlock(ctdb_db, key);
437 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
438 if (client->ctdb->statistics.pending_calls > 0) {
439 ctdb->statistics.pending_calls--;
441 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
444 talloc_steal(state, dstate);
445 talloc_steal(client, state);
447 state->async.fn = daemon_call_from_client_callback;
448 state->async.private_data = dstate;
452 static void daemon_request_control_from_client(struct ctdb_client *client,
453 struct ctdb_req_control *c);
455 /* data contains a packet from the client */
456 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
458 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
460 struct ctdb_context *ctdb = client->ctdb;
462 /* place the packet as a child of a tmp_ctx. We then use
463 talloc_free() below to free it. If any of the calls want
464 to keep it, then they will steal it somewhere else, and the
465 talloc_free() will be a no-op */
466 tmp_ctx = talloc_new(client);
467 talloc_steal(tmp_ctx, hdr);
469 if (hdr->ctdb_magic != CTDB_MAGIC) {
470 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
474 if (hdr->ctdb_version != CTDB_VERSION) {
475 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
479 switch (hdr->operation) {
481 ctdb->statistics.client.req_call++;
482 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
485 case CTDB_REQ_MESSAGE:
486 ctdb->statistics.client.req_message++;
487 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
490 case CTDB_REQ_CONTROL:
491 ctdb->statistics.client.req_control++;
492 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
496 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
501 talloc_free(tmp_ctx);
505 called when the daemon gets a incoming packet
507 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
509 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
510 struct ctdb_req_header *hdr;
517 client->ctdb->statistics.client_packets_recv++;
519 if (cnt < sizeof(*hdr)) {
520 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
524 hdr = (struct ctdb_req_header *)data;
525 if (cnt != hdr->length) {
526 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
527 (unsigned)hdr->length, (unsigned)cnt);
531 if (hdr->ctdb_magic != CTDB_MAGIC) {
532 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
536 if (hdr->ctdb_version != CTDB_VERSION) {
537 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
541 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
542 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
543 hdr->srcnode, hdr->destnode));
545 /* it is the responsibility of the incoming packet function to free 'data' */
546 daemon_incoming_packet(client, hdr);
550 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
552 if (client_pid->ctdb->client_pids != NULL) {
553 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
560 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
561 uint16_t flags, void *private_data)
563 struct sockaddr_un addr;
566 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
567 struct ctdb_client *client;
568 struct ctdb_client_pid_list *client_pid;
570 struct peercred_struct cr;
571 socklen_t crl = sizeof(struct peercred_struct);
574 socklen_t crl = sizeof(struct ucred);
577 memset(&addr, 0, sizeof(addr));
579 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
585 set_close_on_exec(fd);
587 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
589 client = talloc_zero(ctdb, struct ctdb_client);
591 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
593 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
595 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
600 client->client_id = ctdb_reqid_new(ctdb, client);
601 client->pid = cr.pid;
603 client_pid = talloc(client, struct ctdb_client_pid_list);
604 if (client_pid == NULL) {
605 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
610 client_pid->ctdb = ctdb;
611 client_pid->pid = cr.pid;
612 client_pid->client = client;
614 DLIST_ADD(ctdb->client_pids, client_pid);
616 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
617 ctdb_daemon_read_cb, client);
619 talloc_set_destructor(client, ctdb_client_destructor);
620 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
621 ctdb->statistics.num_clients++;
627 create a unix domain socket and bind it
628 return a file descriptor open on the socket
630 static int ux_socket_bind(struct ctdb_context *ctdb)
632 struct sockaddr_un addr;
634 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
635 if (ctdb->daemon.sd == -1) {
639 set_close_on_exec(ctdb->daemon.sd);
640 set_nonblocking(ctdb->daemon.sd);
642 memset(&addr, 0, sizeof(addr));
643 addr.sun_family = AF_UNIX;
644 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
646 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
647 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
651 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
652 chmod(ctdb->daemon.name, 0700) != 0) {
653 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
658 if (listen(ctdb->daemon.sd, 100) != 0) {
659 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
666 close(ctdb->daemon.sd);
667 ctdb->daemon.sd = -1;
671 static void sig_child_handler(struct event_context *ev,
672 struct signal_event *se, int signum, int count,
676 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
681 pid = waitpid(-1, &status, WNOHANG);
683 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
687 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
693 start the protocol going as a daemon
695 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
698 struct fd_event *fde;
699 const char *domain_socket_name;
700 struct signal_event *se;
702 /* get rid of any old sockets */
703 unlink(ctdb->daemon.name);
705 /* create a unix domain stream socket to listen to */
706 res = ux_socket_bind(ctdb);
708 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
712 if (do_fork && fork()) {
716 tdb_reopen_all(False);
721 if (open("/dev/null", O_RDONLY) != 0) {
722 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
726 block_signal(SIGPIPE);
728 ctdbd_pid = getpid();
731 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
733 ctdb_high_priority(ctdb);
735 /* ensure the socket is deleted on exit of the daemon */
736 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
737 if (domain_socket_name == NULL) {
738 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
742 ctdb->ev = event_context_init(NULL);
744 ctdb_set_child_logging(ctdb);
746 /* force initial recovery for election */
747 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
749 if (strcmp(ctdb->transport, "tcp") == 0) {
750 int ctdb_tcp_init(struct ctdb_context *);
751 ret = ctdb_tcp_init(ctdb);
753 #ifdef USE_INFINIBAND
754 if (strcmp(ctdb->transport, "ib") == 0) {
755 int ctdb_ibw_init(struct ctdb_context *);
756 ret = ctdb_ibw_init(ctdb);
760 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
764 if (ctdb->methods == NULL) {
765 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
766 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
769 /* initialise the transport */
770 if (ctdb->methods->initialise(ctdb) != 0) {
771 ctdb_fatal(ctdb, "transport failed to initialise");
774 /* attach to existing databases */
775 if (ctdb_attach_databases(ctdb) != 0) {
776 ctdb_fatal(ctdb, "Failed to attach to databases\n");
779 /* start frozen, then let the first election sort things out */
780 if (ctdb_blocking_freeze(ctdb)) {
781 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
784 /* now start accepting clients, only can do this once frozen */
785 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
786 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
787 ctdb_accept_client, ctdb);
789 /* tell all other nodes we've just started up */
790 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
791 0, CTDB_CONTROL_STARTUP, 0,
792 CTDB_CTRL_FLAG_NOREPLY,
793 tdb_null, NULL, NULL);
795 /* release any IPs we hold from previous runs of the daemon */
796 ctdb_release_all_ips(ctdb);
798 /* start the transport going */
799 ctdb_start_transport(ctdb);
801 /* set up a handler to pick up sigchld */
802 se = event_add_signal(ctdb->ev, ctdb,
807 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
812 if (start_syslog_daemon(ctdb)) {
813 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
818 ctdb_lockdown_memory(ctdb);
820 /* go into a wait loop to allow other nodes to complete */
821 event_loop_wait(ctdb->ev);
823 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
828 allocate a packet for use in daemon<->daemon communication
830 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
832 enum ctdb_operation operation,
833 size_t length, size_t slength,
837 struct ctdb_req_header *hdr;
839 length = MAX(length, slength);
840 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
842 if (ctdb->methods == NULL) {
843 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
844 operation, (unsigned)length));
848 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
850 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
851 operation, (unsigned)length));
854 talloc_set_name_const(hdr, type);
855 memset(hdr, 0, slength);
856 hdr->length = length;
857 hdr->operation = operation;
858 hdr->ctdb_magic = CTDB_MAGIC;
859 hdr->ctdb_version = CTDB_VERSION;
860 hdr->generation = ctdb->vnn_map->generation;
861 hdr->srcnode = ctdb->pnn;
866 struct daemon_control_state {
867 struct daemon_control_state *next, *prev;
868 struct ctdb_client *client;
869 struct ctdb_req_control *c;
871 struct ctdb_node *node;
875 callback when a control reply comes in
877 static void daemon_control_callback(struct ctdb_context *ctdb,
878 int32_t status, TDB_DATA data,
879 const char *errormsg,
882 struct daemon_control_state *state = talloc_get_type(private_data,
883 struct daemon_control_state);
884 struct ctdb_client *client = state->client;
885 struct ctdb_reply_control *r;
888 /* construct a message to send to the client containing the data */
889 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
891 len += strlen(errormsg);
893 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
894 struct ctdb_reply_control);
895 CTDB_NO_MEMORY_VOID(ctdb, r);
897 r->hdr.reqid = state->reqid;
899 r->datalen = data.dsize;
901 memcpy(&r->data[0], data.dptr, data.dsize);
903 r->errorlen = strlen(errormsg);
904 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
907 daemon_queue_send(client, &r->hdr);
913 fail all pending controls to a disconnected node
915 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
917 struct daemon_control_state *state;
918 while ((state = node->pending_controls)) {
919 DLIST_REMOVE(node->pending_controls, state);
920 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
921 "node is disconnected", state);
926 destroy a daemon_control_state
928 static int daemon_control_destructor(struct daemon_control_state *state)
931 DLIST_REMOVE(state->node->pending_controls, state);
937 this is called when the ctdb daemon received a ctdb request control
938 from a local client over the unix domain socket
940 static void daemon_request_control_from_client(struct ctdb_client *client,
941 struct ctdb_req_control *c)
945 struct daemon_control_state *state;
946 TALLOC_CTX *tmp_ctx = talloc_new(client);
948 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
949 c->hdr.destnode = client->ctdb->pnn;
952 state = talloc(client, struct daemon_control_state);
953 CTDB_NO_MEMORY_VOID(client->ctdb, state);
955 state->client = client;
956 state->c = talloc_steal(state, c);
957 state->reqid = c->hdr.reqid;
958 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
959 state->node = client->ctdb->nodes[c->hdr.destnode];
960 DLIST_ADD(state->node->pending_controls, state);
965 talloc_set_destructor(state, daemon_control_destructor);
967 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
968 talloc_steal(tmp_ctx, state);
971 data.dptr = &c->data[0];
972 data.dsize = c->datalen;
973 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
974 c->srvid, c->opcode, client->client_id,
976 data, daemon_control_callback,
979 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
983 talloc_free(tmp_ctx);
987 register a call function
989 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
990 ctdb_fn_t fn, int id)
992 struct ctdb_registered_call *call;
993 struct ctdb_db_context *ctdb_db;
995 ctdb_db = find_ctdb_db(ctdb, db_id);
996 if (ctdb_db == NULL) {
1000 call = talloc(ctdb_db, struct ctdb_registered_call);
1004 DLIST_ADD(ctdb_db->calls, call);
1011 this local messaging handler is ugly, but is needed to prevent
1012 recursion in ctdb_send_message() when the destination node is the
1013 same as the source node
1015 struct ctdb_local_message {
1016 struct ctdb_context *ctdb;
1021 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1022 struct timeval t, void *private_data)
1024 struct ctdb_local_message *m = talloc_get_type(private_data,
1025 struct ctdb_local_message);
1028 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1030 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1031 (unsigned long long)m->srvid));
1036 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1038 struct ctdb_local_message *m;
1039 m = talloc(ctdb, struct ctdb_local_message);
1040 CTDB_NO_MEMORY(ctdb, m);
1045 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1046 if (m->data.dptr == NULL) {
1051 /* this needs to be done as an event to prevent recursion */
1052 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1059 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1060 uint64_t srvid, TDB_DATA data)
1062 struct ctdb_req_message *r;
1065 if (ctdb->methods == NULL) {
1066 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1070 /* see if this is a message to ourselves */
1071 if (pnn == ctdb->pnn) {
1072 return ctdb_local_message(ctdb, srvid, data);
1075 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1076 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1077 struct ctdb_req_message);
1078 CTDB_NO_MEMORY(ctdb, r);
1080 r->hdr.destnode = pnn;
1082 r->datalen = data.dsize;
1083 memcpy(&r->data[0], data.dptr, data.dsize);
1085 ctdb_queue_packet(ctdb, &r->hdr);
1093 struct ctdb_client_notify_list {
1094 struct ctdb_client_notify_list *next, *prev;
1095 struct ctdb_context *ctdb;
1101 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1105 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1107 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1109 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1115 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1117 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1118 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1119 struct ctdb_client_notify_list *nl;
1121 DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1123 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1124 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1128 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1129 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1134 if (client == NULL) {
1135 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1139 for(nl=client->notify; nl; nl=nl->next) {
1140 if (nl->srvid == notify->srvid) {
1145 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1149 nl = talloc(client, struct ctdb_client_notify_list);
1150 CTDB_NO_MEMORY(ctdb, nl);
1152 nl->srvid = notify->srvid;
1153 nl->data.dsize = notify->len;
1154 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1155 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1156 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1158 DLIST_ADD(client->notify, nl);
1159 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1164 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1166 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1167 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1168 struct ctdb_client_notify_list *nl;
1170 DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1172 if (client == NULL) {
1173 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1177 for(nl=client->notify; nl; nl=nl->next) {
1178 if (nl->srvid == notify->srvid) {
1183 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1187 DLIST_REMOVE(client->notify, nl);
1188 talloc_set_destructor(nl, NULL);
1194 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1196 struct ctdb_client_pid_list *client_pid;
1198 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1199 if (client_pid->pid == pid) {
1200 return client_pid->client;
1207 /* This control is used by samba when probing if a process (of a samba daemon)
1209 Samba does this when it needs/wants to check if a subrecord in one of the
1210 databases is still valied, or if it is stale and can be removed.
1211 If the node is in unhealthy or stopped state we just kill of the samba
1212 process holding htis sub-record and return to the calling samba that
1213 the process does not exist.
1214 This allows us to forcefully recall subrecords registered by samba processes
1215 on banned and stopped nodes.
1217 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1219 struct ctdb_client *client;
1221 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1222 client = ctdb_find_client_by_pid(ctdb, pid);
1223 if (client != NULL) {
1224 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1225 talloc_free(client);
1230 return kill(pid, 0);