4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start periodic cleanup of holdback cleanup */
79 ctdb_start_holdback_cleanup(ctdb);
81 /* start listening for recovery daemon pings */
82 ctdb_control_recd_ping(ctdb);
85 static void block_signal(int signum)
89 memset(&act, 0, sizeof(act));
91 act.sa_handler = SIG_IGN;
92 sigemptyset(&act.sa_mask);
93 sigaddset(&act.sa_mask, signum);
94 sigaction(signum, &act, NULL);
99 send a packet to a client
101 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
103 client->ctdb->statistics.client_packets_sent++;
104 if (hdr->operation == CTDB_REQ_MESSAGE) {
105 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
106 DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
110 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
114 message handler for when we are in daemon mode. This redirects the message
117 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
118 TDB_DATA data, void *private_data)
120 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
121 struct ctdb_req_message *r;
124 /* construct a message to send to the client containing the data */
125 len = offsetof(struct ctdb_req_message, data) + data.dsize;
126 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
127 len, struct ctdb_req_message);
128 CTDB_NO_MEMORY_VOID(ctdb, r);
130 talloc_set_name_const(r, "req_message packet");
133 r->datalen = data.dsize;
134 memcpy(&r->data[0], data.dptr, data.dsize);
136 daemon_queue_send(client, &r->hdr);
142 this is called when the ctdb daemon received a ctdb request to
143 set the srvid from the client
145 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
147 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
149 if (client == NULL) {
150 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
153 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
155 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
156 (unsigned long long)srvid));
158 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
159 (unsigned long long)srvid));
166 this is called when the ctdb daemon received a ctdb request to
167 remove a srvid from the client
169 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
171 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
172 if (client == NULL) {
173 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
176 return ctdb_deregister_message_handler(ctdb, srvid, client);
181 destroy a ctdb_client
183 static int ctdb_client_destructor(struct ctdb_client *client)
185 struct ctdb_db_context *ctdb_db;
187 ctdb_takeover_client_destructor_hook(client);
188 ctdb_reqid_remove(client->ctdb, client->client_id);
189 if (client->ctdb->statistics.num_clients) {
190 client->ctdb->statistics.num_clients--;
193 if (client->num_persistent_updates != 0) {
194 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
195 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
197 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
199 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
200 "commit active. Forcing recovery.\n"));
201 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
202 ctdb_db->transaction_active = false;
210 this is called when the ctdb daemon received a ctdb request message
211 from a local client over the unix domain socket
213 static void daemon_request_message_from_client(struct ctdb_client *client,
214 struct ctdb_req_message *c)
219 /* maybe the message is for another client on this node */
220 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
221 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
225 /* its for a remote node */
226 data.dptr = &c->data[0];
227 data.dsize = c->datalen;
228 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
231 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
237 struct daemon_call_state {
238 struct ctdb_client *client;
240 struct ctdb_call *call;
241 struct timeval start_time;
245 complete a call from a client
247 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
249 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
250 struct daemon_call_state);
251 struct ctdb_reply_call *r;
254 struct ctdb_client *client = dstate->client;
255 struct ctdb_db_context *ctdb_db = state->ctdb_db;
257 talloc_steal(client, dstate);
258 talloc_steal(dstate, dstate->call);
260 res = ctdb_daemon_call_recv(state, dstate->call);
262 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
263 if (client->ctdb->statistics.pending_calls > 0) {
264 client->ctdb->statistics.pending_calls--;
266 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
270 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
271 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
272 length, struct ctdb_reply_call);
274 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
275 if (client->ctdb->statistics.pending_calls > 0) {
276 client->ctdb->statistics.pending_calls--;
278 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
281 r->hdr.reqid = dstate->reqid;
282 r->datalen = dstate->call->reply_data.dsize;
283 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
285 res = daemon_queue_send(client, &r->hdr);
287 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
289 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
291 if (client->ctdb->statistics.pending_calls > 0) {
292 client->ctdb->statistics.pending_calls--;
296 struct ctdb_daemon_packet_wrap {
297 struct ctdb_context *ctdb;
302 a wrapper to catch disconnected clients
304 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
306 struct ctdb_client *client;
307 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
308 struct ctdb_daemon_packet_wrap);
310 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
314 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
315 if (client == NULL) {
316 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
324 daemon_incoming_packet(client, hdr);
329 this is called when the ctdb daemon received a ctdb request call
330 from a local client over the unix domain socket
332 static void daemon_request_call_from_client(struct ctdb_client *client,
333 struct ctdb_req_call *c)
335 struct ctdb_call_state *state;
336 struct ctdb_db_context *ctdb_db;
337 struct daemon_call_state *dstate;
338 struct ctdb_call *call;
339 struct ctdb_ltdb_header header;
342 struct ctdb_context *ctdb = client->ctdb;
343 struct ctdb_daemon_packet_wrap *w;
345 ctdb->statistics.total_calls++;
346 if (client->ctdb->statistics.pending_calls > 0) {
347 ctdb->statistics.pending_calls++;
350 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
352 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
354 if (client->ctdb->statistics.pending_calls > 0) {
355 ctdb->statistics.pending_calls--;
361 key.dsize = c->keylen;
363 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
364 CTDB_NO_MEMORY_VOID(ctdb, w);
367 w->client_id = client->client_id;
369 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
370 (struct ctdb_req_header *)c, &data,
371 daemon_incoming_packet_wrap, w, True);
373 /* will retry later */
374 if (client->ctdb->statistics.pending_calls > 0) {
375 ctdb->statistics.pending_calls--;
383 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
384 if (client->ctdb->statistics.pending_calls > 0) {
385 ctdb->statistics.pending_calls--;
390 dstate = talloc(client, struct daemon_call_state);
391 if (dstate == NULL) {
392 ctdb_ltdb_unlock(ctdb_db, key);
393 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
394 if (client->ctdb->statistics.pending_calls > 0) {
395 ctdb->statistics.pending_calls--;
399 dstate->start_time = timeval_current();
400 dstate->client = client;
401 dstate->reqid = c->hdr.reqid;
402 talloc_steal(dstate, data.dptr);
404 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
406 ctdb_ltdb_unlock(ctdb_db, key);
407 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
408 if (client->ctdb->statistics.pending_calls > 0) {
409 ctdb->statistics.pending_calls--;
411 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
415 call->call_id = c->callid;
417 call->call_data.dptr = c->data + c->keylen;
418 call->call_data.dsize = c->calldatalen;
419 call->flags = c->flags;
421 if (header.dmaster == ctdb->pnn) {
422 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
424 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
427 ctdb_ltdb_unlock(ctdb_db, key);
430 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
431 if (client->ctdb->statistics.pending_calls > 0) {
432 ctdb->statistics.pending_calls--;
434 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
437 talloc_steal(state, dstate);
438 talloc_steal(client, state);
440 state->async.fn = daemon_call_from_client_callback;
441 state->async.private_data = dstate;
445 static void daemon_request_control_from_client(struct ctdb_client *client,
446 struct ctdb_req_control *c);
448 /* data contains a packet from the client */
449 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
451 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
453 struct ctdb_context *ctdb = client->ctdb;
455 /* place the packet as a child of a tmp_ctx. We then use
456 talloc_free() below to free it. If any of the calls want
457 to keep it, then they will steal it somewhere else, and the
458 talloc_free() will be a no-op */
459 tmp_ctx = talloc_new(client);
460 talloc_steal(tmp_ctx, hdr);
462 if (hdr->ctdb_magic != CTDB_MAGIC) {
463 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
467 if (hdr->ctdb_version != CTDB_VERSION) {
468 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
472 switch (hdr->operation) {
474 ctdb->statistics.client.req_call++;
475 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
478 case CTDB_REQ_MESSAGE:
479 ctdb->statistics.client.req_message++;
480 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
483 case CTDB_REQ_CONTROL:
484 ctdb->statistics.client.req_control++;
485 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
489 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
494 talloc_free(tmp_ctx);
498 called when the daemon gets a incoming packet
500 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
502 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
503 struct ctdb_req_header *hdr;
510 client->ctdb->statistics.client_packets_recv++;
512 if (cnt < sizeof(*hdr)) {
513 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
517 hdr = (struct ctdb_req_header *)data;
518 if (cnt != hdr->length) {
519 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
520 (unsigned)hdr->length, (unsigned)cnt);
524 if (hdr->ctdb_magic != CTDB_MAGIC) {
525 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
529 if (hdr->ctdb_version != CTDB_VERSION) {
530 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
534 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
535 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
536 hdr->srcnode, hdr->destnode));
538 /* it is the responsibility of the incoming packet function to free 'data' */
539 daemon_incoming_packet(client, hdr);
543 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
545 if (client_pid->ctdb->client_pids != NULL) {
546 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
553 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
554 uint16_t flags, void *private_data)
556 struct sockaddr_un addr;
559 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
560 struct ctdb_client *client;
561 struct ctdb_client_pid_list *client_pid;
563 struct peercred_struct cr;
564 socklen_t crl = sizeof(struct peercred_struct);
567 socklen_t crl = sizeof(struct ucred);
570 memset(&addr, 0, sizeof(addr));
572 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
578 set_close_on_exec(fd);
580 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
582 client = talloc_zero(ctdb, struct ctdb_client);
584 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
586 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
588 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
593 client->client_id = ctdb_reqid_new(ctdb, client);
594 client->pid = cr.pid;
596 client_pid = talloc(client, struct ctdb_client_pid_list);
597 if (client_pid == NULL) {
598 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
603 client_pid->ctdb = ctdb;
604 client_pid->pid = cr.pid;
605 client_pid->client = client;
607 DLIST_ADD(ctdb->client_pids, client_pid);
609 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
610 ctdb_daemon_read_cb, client);
612 talloc_set_destructor(client, ctdb_client_destructor);
613 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
614 ctdb->statistics.num_clients++;
620 create a unix domain socket and bind it
621 return a file descriptor open on the socket
623 static int ux_socket_bind(struct ctdb_context *ctdb)
625 struct sockaddr_un addr;
627 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
628 if (ctdb->daemon.sd == -1) {
632 set_close_on_exec(ctdb->daemon.sd);
633 set_nonblocking(ctdb->daemon.sd);
635 memset(&addr, 0, sizeof(addr));
636 addr.sun_family = AF_UNIX;
637 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
639 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
640 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
644 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
645 chmod(ctdb->daemon.name, 0700) != 0) {
646 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
651 if (listen(ctdb->daemon.sd, 100) != 0) {
652 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
659 close(ctdb->daemon.sd);
660 ctdb->daemon.sd = -1;
664 static void sig_child_handler(struct event_context *ev,
665 struct signal_event *se, int signum, int count,
669 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
674 pid = waitpid(-1, &status, WNOHANG);
676 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
680 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
686 start the protocol going as a daemon
688 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
691 struct fd_event *fde;
692 const char *domain_socket_name;
693 struct signal_event *se;
695 /* get rid of any old sockets */
696 unlink(ctdb->daemon.name);
698 /* create a unix domain stream socket to listen to */
699 res = ux_socket_bind(ctdb);
701 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
705 if (do_fork && fork()) {
709 tdb_reopen_all(False);
714 if (open("/dev/null", O_RDONLY) != 0) {
715 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
719 block_signal(SIGPIPE);
721 ctdbd_pid = getpid();
724 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
726 if (ctdb->do_setsched) {
727 /* try to set us up as realtime */
728 ctdb_set_scheduler(ctdb);
731 /* ensure the socket is deleted on exit of the daemon */
732 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
733 if (domain_socket_name == NULL) {
734 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
738 ctdb->ev = event_context_init(NULL);
740 ctdb_set_child_logging(ctdb);
742 /* force initial recovery for election */
743 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
745 if (strcmp(ctdb->transport, "tcp") == 0) {
746 int ctdb_tcp_init(struct ctdb_context *);
747 ret = ctdb_tcp_init(ctdb);
749 #ifdef USE_INFINIBAND
750 if (strcmp(ctdb->transport, "ib") == 0) {
751 int ctdb_ibw_init(struct ctdb_context *);
752 ret = ctdb_ibw_init(ctdb);
756 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
760 if (ctdb->methods == NULL) {
761 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
762 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
765 /* initialise the transport */
766 if (ctdb->methods->initialise(ctdb) != 0) {
767 ctdb_fatal(ctdb, "transport failed to initialise");
770 /* attach to any existing persistent databases */
771 if (ctdb_attach_persistent(ctdb) != 0) {
772 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");
775 /* start frozen, then let the first election sort things out */
776 if (ctdb_blocking_freeze(ctdb)) {
777 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
780 /* now start accepting clients, only can do this once frozen */
781 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
782 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
783 ctdb_accept_client, ctdb);
785 /* tell all other nodes we've just started up */
786 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
787 0, CTDB_CONTROL_STARTUP, 0,
788 CTDB_CTRL_FLAG_NOREPLY,
789 tdb_null, NULL, NULL);
791 /* release any IPs we hold from previous runs of the daemon */
792 ctdb_release_all_ips(ctdb);
794 /* start the transport going */
795 ctdb_start_transport(ctdb);
797 /* set up a handler to pick up sigchld */
798 se = event_add_signal(ctdb->ev, ctdb,
803 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
808 if (start_syslog_daemon(ctdb)) {
809 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
815 /* go into a wait loop to allow other nodes to complete */
816 event_loop_wait(ctdb->ev);
818 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
823 allocate a packet for use in daemon<->daemon communication
825 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
827 enum ctdb_operation operation,
828 size_t length, size_t slength,
832 struct ctdb_req_header *hdr;
834 length = MAX(length, slength);
835 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
837 if (ctdb->methods == NULL) {
838 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
839 operation, (unsigned)length));
843 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
845 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
846 operation, (unsigned)length));
849 talloc_set_name_const(hdr, type);
850 memset(hdr, 0, slength);
851 hdr->length = length;
852 hdr->operation = operation;
853 hdr->ctdb_magic = CTDB_MAGIC;
854 hdr->ctdb_version = CTDB_VERSION;
855 hdr->generation = ctdb->vnn_map->generation;
856 hdr->srcnode = ctdb->pnn;
861 struct daemon_control_state {
862 struct daemon_control_state *next, *prev;
863 struct ctdb_client *client;
864 struct ctdb_req_control *c;
866 struct ctdb_node *node;
870 callback when a control reply comes in
872 static void daemon_control_callback(struct ctdb_context *ctdb,
873 int32_t status, TDB_DATA data,
874 const char *errormsg,
877 struct daemon_control_state *state = talloc_get_type(private_data,
878 struct daemon_control_state);
879 struct ctdb_client *client = state->client;
880 struct ctdb_reply_control *r;
883 /* construct a message to send to the client containing the data */
884 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
886 len += strlen(errormsg);
888 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
889 struct ctdb_reply_control);
890 CTDB_NO_MEMORY_VOID(ctdb, r);
892 r->hdr.reqid = state->reqid;
894 r->datalen = data.dsize;
896 memcpy(&r->data[0], data.dptr, data.dsize);
898 r->errorlen = strlen(errormsg);
899 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
902 daemon_queue_send(client, &r->hdr);
908 fail all pending controls to a disconnected node
910 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
912 struct daemon_control_state *state;
913 while ((state = node->pending_controls)) {
914 DLIST_REMOVE(node->pending_controls, state);
915 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
916 "node is disconnected", state);
921 destroy a daemon_control_state
923 static int daemon_control_destructor(struct daemon_control_state *state)
926 DLIST_REMOVE(state->node->pending_controls, state);
932 this is called when the ctdb daemon received a ctdb request control
933 from a local client over the unix domain socket
935 static void daemon_request_control_from_client(struct ctdb_client *client,
936 struct ctdb_req_control *c)
940 struct daemon_control_state *state;
941 TALLOC_CTX *tmp_ctx = talloc_new(client);
943 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
944 c->hdr.destnode = client->ctdb->pnn;
947 state = talloc(client, struct daemon_control_state);
948 CTDB_NO_MEMORY_VOID(client->ctdb, state);
950 state->client = client;
951 state->c = talloc_steal(state, c);
952 state->reqid = c->hdr.reqid;
953 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
954 state->node = client->ctdb->nodes[c->hdr.destnode];
955 DLIST_ADD(state->node->pending_controls, state);
960 talloc_set_destructor(state, daemon_control_destructor);
962 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
963 talloc_steal(tmp_ctx, state);
966 data.dptr = &c->data[0];
967 data.dsize = c->datalen;
968 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
969 c->srvid, c->opcode, client->client_id,
971 data, daemon_control_callback,
974 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
978 talloc_free(tmp_ctx);
982 register a call function
984 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
985 ctdb_fn_t fn, int id)
987 struct ctdb_registered_call *call;
988 struct ctdb_db_context *ctdb_db;
990 ctdb_db = find_ctdb_db(ctdb, db_id);
991 if (ctdb_db == NULL) {
995 call = talloc(ctdb_db, struct ctdb_registered_call);
999 DLIST_ADD(ctdb_db->calls, call);
1006 this local messaging handler is ugly, but is needed to prevent
1007 recursion in ctdb_send_message() when the destination node is the
1008 same as the source node
1010 struct ctdb_local_message {
1011 struct ctdb_context *ctdb;
1016 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1017 struct timeval t, void *private_data)
1019 struct ctdb_local_message *m = talloc_get_type(private_data,
1020 struct ctdb_local_message);
1023 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1025 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1026 (unsigned long long)m->srvid));
1031 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1033 struct ctdb_local_message *m;
1034 m = talloc(ctdb, struct ctdb_local_message);
1035 CTDB_NO_MEMORY(ctdb, m);
1040 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1041 if (m->data.dptr == NULL) {
1046 /* this needs to be done as an event to prevent recursion */
1047 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1054 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1055 uint64_t srvid, TDB_DATA data)
1057 struct ctdb_req_message *r;
1060 if (ctdb->methods == NULL) {
1061 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1065 /* see if this is a message to ourselves */
1066 if (pnn == ctdb->pnn) {
1067 return ctdb_local_message(ctdb, srvid, data);
1070 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1071 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1072 struct ctdb_req_message);
1073 CTDB_NO_MEMORY(ctdb, r);
1075 r->hdr.destnode = pnn;
1077 r->datalen = data.dsize;
1078 memcpy(&r->data[0], data.dptr, data.dsize);
1080 ctdb_queue_packet(ctdb, &r->hdr);
1088 struct ctdb_client_notify_list {
1089 struct ctdb_client_notify_list *next, *prev;
1090 struct ctdb_context *ctdb;
1096 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1100 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1102 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1104 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1110 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1112 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1113 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1114 struct ctdb_client_notify_list *nl;
1116 DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1118 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1119 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1123 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1124 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1129 if (client == NULL) {
1130 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1134 for(nl=client->notify; nl; nl=nl->next) {
1135 if (nl->srvid == notify->srvid) {
1140 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1144 nl = talloc(client, struct ctdb_client_notify_list);
1145 CTDB_NO_MEMORY(ctdb, nl);
1147 nl->srvid = notify->srvid;
1148 nl->data.dsize = notify->len;
1149 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1150 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1151 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1153 DLIST_ADD(client->notify, nl);
1154 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1159 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1161 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1162 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1163 struct ctdb_client_notify_list *nl;
1165 DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1167 if (client == NULL) {
1168 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1172 for(nl=client->notify; nl; nl=nl->next) {
1173 if (nl->srvid == notify->srvid) {
1178 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1182 DLIST_REMOVE(client->notify, nl);
1183 talloc_set_destructor(nl, NULL);
1189 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1191 struct ctdb_client_pid_list *client_pid;
1193 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1194 if (client_pid->pid == pid) {
1195 return client_pid->client;
1202 /* This control is used by samba when probing if a process (of a samba daemon)
1204 Samba does this when it needs/wants to check if a subrecord in one of the
1205 databases is still valied, or if it is stale and can be removed.
1206 If the node is in unhealthy or stopped state we just kill of the samba
1207 process holding htis sub-record and return to the calling samba that
1208 the process does not exist.
1209 This allows us to forcefully recall subrecords registered by samba processes
1210 on banned and stopped nodes.
1212 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1214 struct ctdb_client *client;
1216 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1217 client = ctdb_find_client_by_pid(ctdb, pid);
1218 if (client != NULL) {
1219 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1220 talloc_free(client);
1225 return kill(pid, 0);