4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
107 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
111 message handler for when we are in daemon mode. This redirects the message
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
115 TDB_DATA data, void *private_data)
117 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118 struct ctdb_req_message *r;
121 /* construct a message to send to the client containing the data */
122 len = offsetof(struct ctdb_req_message, data) + data.dsize;
123 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
124 len, struct ctdb_req_message);
125 CTDB_NO_MEMORY_VOID(ctdb, r);
127 talloc_set_name_const(r, "req_message packet");
130 r->datalen = data.dsize;
131 memcpy(&r->data[0], data.dptr, data.dsize);
133 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 if (client->ctdb->statistics.num_clients) {
188 client->ctdb->statistics.num_clients--;
191 if (client->num_persistent_updates != 0) {
192 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
195 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
197 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198 "commit active. Forcing recovery.\n"));
199 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200 ctdb_db->transaction_active = false;
208 this is called when the ctdb daemon received a ctdb request message
209 from a local client over the unix domain socket
211 static void daemon_request_message_from_client(struct ctdb_client *client,
212 struct ctdb_req_message *c)
217 /* maybe the message is for another client on this node */
218 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
223 /* its for a remote node */
224 data.dptr = &c->data[0];
225 data.dsize = c->datalen;
226 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
229 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
235 struct daemon_call_state {
236 struct ctdb_client *client;
238 struct ctdb_call *call;
239 struct timeval start_time;
243 complete a call from a client
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
247 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
248 struct daemon_call_state);
249 struct ctdb_reply_call *r;
252 struct ctdb_client *client = dstate->client;
253 struct ctdb_db_context *ctdb_db = state->ctdb_db;
255 talloc_steal(client, dstate);
256 talloc_steal(dstate, dstate->call);
258 res = ctdb_daemon_call_recv(state, dstate->call);
260 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261 if (client->ctdb->statistics.pending_calls > 0) {
262 client->ctdb->statistics.pending_calls--;
264 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
268 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
270 length, struct ctdb_reply_call);
272 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273 if (client->ctdb->statistics.pending_calls > 0) {
274 client->ctdb->statistics.pending_calls--;
276 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
279 r->hdr.reqid = dstate->reqid;
280 r->datalen = dstate->call->reply_data.dsize;
281 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
283 res = daemon_queue_send(client, &r->hdr);
285 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
287 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
289 if (client->ctdb->statistics.pending_calls > 0) {
290 client->ctdb->statistics.pending_calls--;
294 struct ctdb_daemon_packet_wrap {
295 struct ctdb_context *ctdb;
300 a wrapper to catch disconnected clients
302 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
304 struct ctdb_client *client;
305 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
306 struct ctdb_daemon_packet_wrap);
308 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
312 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
313 if (client == NULL) {
314 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
322 daemon_incoming_packet(client, hdr);
327 this is called when the ctdb daemon received a ctdb request call
328 from a local client over the unix domain socket
330 static void daemon_request_call_from_client(struct ctdb_client *client,
331 struct ctdb_req_call *c)
333 struct ctdb_call_state *state;
334 struct ctdb_db_context *ctdb_db;
335 struct daemon_call_state *dstate;
336 struct ctdb_call *call;
337 struct ctdb_ltdb_header header;
340 struct ctdb_context *ctdb = client->ctdb;
341 struct ctdb_daemon_packet_wrap *w;
343 ctdb->statistics.total_calls++;
344 if (client->ctdb->statistics.pending_calls > 0) {
345 ctdb->statistics.pending_calls++;
348 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
350 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
352 if (client->ctdb->statistics.pending_calls > 0) {
353 ctdb->statistics.pending_calls--;
359 key.dsize = c->keylen;
361 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
362 CTDB_NO_MEMORY_VOID(ctdb, w);
365 w->client_id = client->client_id;
367 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
368 (struct ctdb_req_header *)c, &data,
369 daemon_incoming_packet_wrap, w, True);
371 /* will retry later */
372 if (client->ctdb->statistics.pending_calls > 0) {
373 ctdb->statistics.pending_calls--;
381 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
382 if (client->ctdb->statistics.pending_calls > 0) {
383 ctdb->statistics.pending_calls--;
388 dstate = talloc(client, struct daemon_call_state);
389 if (dstate == NULL) {
390 ctdb_ltdb_unlock(ctdb_db, key);
391 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
392 if (client->ctdb->statistics.pending_calls > 0) {
393 ctdb->statistics.pending_calls--;
397 dstate->start_time = timeval_current();
398 dstate->client = client;
399 dstate->reqid = c->hdr.reqid;
400 talloc_steal(dstate, data.dptr);
402 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
404 ctdb_ltdb_unlock(ctdb_db, key);
405 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
406 if (client->ctdb->statistics.pending_calls > 0) {
407 ctdb->statistics.pending_calls--;
409 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
413 call->call_id = c->callid;
415 call->call_data.dptr = c->data + c->keylen;
416 call->call_data.dsize = c->calldatalen;
417 call->flags = c->flags;
419 if (header.dmaster == ctdb->pnn) {
420 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
422 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
425 ctdb_ltdb_unlock(ctdb_db, key);
428 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
429 if (client->ctdb->statistics.pending_calls > 0) {
430 ctdb->statistics.pending_calls--;
432 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
435 talloc_steal(state, dstate);
436 talloc_steal(client, state);
438 state->async.fn = daemon_call_from_client_callback;
439 state->async.private_data = dstate;
443 static void daemon_request_control_from_client(struct ctdb_client *client,
444 struct ctdb_req_control *c);
446 /* data contains a packet from the client */
447 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
449 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
451 struct ctdb_context *ctdb = client->ctdb;
453 /* place the packet as a child of a tmp_ctx. We then use
454 talloc_free() below to free it. If any of the calls want
455 to keep it, then they will steal it somewhere else, and the
456 talloc_free() will be a no-op */
457 tmp_ctx = talloc_new(client);
458 talloc_steal(tmp_ctx, hdr);
460 if (hdr->ctdb_magic != CTDB_MAGIC) {
461 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
465 if (hdr->ctdb_version != CTDB_VERSION) {
466 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
470 switch (hdr->operation) {
472 ctdb->statistics.client.req_call++;
473 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
476 case CTDB_REQ_MESSAGE:
477 ctdb->statistics.client.req_message++;
478 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
481 case CTDB_REQ_CONTROL:
482 ctdb->statistics.client.req_control++;
483 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
487 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
492 talloc_free(tmp_ctx);
496 called when the daemon gets a incoming packet
498 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
500 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
501 struct ctdb_req_header *hdr;
508 client->ctdb->statistics.client_packets_recv++;
510 if (cnt < sizeof(*hdr)) {
511 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
515 hdr = (struct ctdb_req_header *)data;
516 if (cnt != hdr->length) {
517 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
518 (unsigned)hdr->length, (unsigned)cnt);
522 if (hdr->ctdb_magic != CTDB_MAGIC) {
523 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
527 if (hdr->ctdb_version != CTDB_VERSION) {
528 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
532 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
533 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
534 hdr->srcnode, hdr->destnode));
536 /* it is the responsibility of the incoming packet function to free 'data' */
537 daemon_incoming_packet(client, hdr);
541 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
543 if (client_pid->ctdb->client_pids != NULL) {
544 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
551 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
552 uint16_t flags, void *private_data)
554 struct sockaddr_un addr;
557 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
558 struct ctdb_client *client;
559 struct ctdb_client_pid_list *client_pid;
561 struct peercred_struct cr;
562 socklen_t crl = sizeof(struct peercred_struct);
565 socklen_t crl = sizeof(struct ucred);
568 memset(&addr, 0, sizeof(addr));
570 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
576 set_close_on_exec(fd);
578 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
580 client = talloc_zero(ctdb, struct ctdb_client);
582 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
584 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
586 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
591 client->client_id = ctdb_reqid_new(ctdb, client);
592 client->pid = cr.pid;
594 client_pid = talloc(client, struct ctdb_client_pid_list);
595 if (client_pid == NULL) {
596 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
601 client_pid->ctdb = ctdb;
602 client_pid->pid = cr.pid;
603 client_pid->client = client;
605 DLIST_ADD(ctdb->client_pids, client_pid);
607 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
608 ctdb_daemon_read_cb, client);
610 talloc_set_destructor(client, ctdb_client_destructor);
611 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
612 ctdb->statistics.num_clients++;
618 create a unix domain socket and bind it
619 return a file descriptor open on the socket
621 static int ux_socket_bind(struct ctdb_context *ctdb)
623 struct sockaddr_un addr;
625 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
626 if (ctdb->daemon.sd == -1) {
630 set_close_on_exec(ctdb->daemon.sd);
631 set_nonblocking(ctdb->daemon.sd);
633 memset(&addr, 0, sizeof(addr));
634 addr.sun_family = AF_UNIX;
635 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
637 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
638 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
642 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
643 chmod(ctdb->daemon.name, 0700) != 0) {
644 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
649 if (listen(ctdb->daemon.sd, 100) != 0) {
650 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
657 close(ctdb->daemon.sd);
658 ctdb->daemon.sd = -1;
662 static void sig_child_handler(struct event_context *ev,
663 struct signal_event *se, int signum, int count,
667 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
672 pid = waitpid(-1, &status, WNOHANG);
674 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
678 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
684 start the protocol going as a daemon
686 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
689 struct fd_event *fde;
690 const char *domain_socket_name;
691 struct signal_event *se;
693 /* get rid of any old sockets */
694 unlink(ctdb->daemon.name);
696 /* create a unix domain stream socket to listen to */
697 res = ux_socket_bind(ctdb);
699 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
703 if (do_fork && fork()) {
707 tdb_reopen_all(False);
712 if (open("/dev/null", O_RDONLY) != 0) {
713 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
717 block_signal(SIGPIPE);
719 ctdbd_pid = getpid();
722 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
724 if (ctdb->do_setsched) {
725 /* try to set us up as realtime */
726 ctdb_set_scheduler(ctdb);
729 /* ensure the socket is deleted on exit of the daemon */
730 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
731 if (domain_socket_name == NULL) {
732 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
736 ctdb->ev = event_context_init(NULL);
738 ctdb_set_child_logging(ctdb);
740 /* force initial recovery for election */
741 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
743 if (strcmp(ctdb->transport, "tcp") == 0) {
744 int ctdb_tcp_init(struct ctdb_context *);
745 ret = ctdb_tcp_init(ctdb);
747 #ifdef USE_INFINIBAND
748 if (strcmp(ctdb->transport, "ib") == 0) {
749 int ctdb_ibw_init(struct ctdb_context *);
750 ret = ctdb_ibw_init(ctdb);
754 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
758 if (ctdb->methods == NULL) {
759 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
760 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
763 /* initialise the transport */
764 if (ctdb->methods->initialise(ctdb) != 0) {
765 ctdb_fatal(ctdb, "transport failed to initialise");
768 /* attach to any existing persistent databases */
769 if (ctdb_attach_persistent(ctdb) != 0) {
770 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");
773 /* start frozen, then let the first election sort things out */
774 if (ctdb_blocking_freeze(ctdb)) {
775 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
778 /* now start accepting clients, only can do this once frozen */
779 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
780 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
781 ctdb_accept_client, ctdb);
783 /* tell all other nodes we've just started up */
784 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
785 0, CTDB_CONTROL_STARTUP, 0,
786 CTDB_CTRL_FLAG_NOREPLY,
787 tdb_null, NULL, NULL);
789 /* release any IPs we hold from previous runs of the daemon */
790 ctdb_release_all_ips(ctdb);
792 /* start the transport going */
793 ctdb_start_transport(ctdb);
795 /* set up a handler to pick up sigchld */
796 se = event_add_signal(ctdb->ev, ctdb,
801 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
806 if (start_syslog_daemon(ctdb)) {
807 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
813 /* go into a wait loop to allow other nodes to complete */
814 event_loop_wait(ctdb->ev);
816 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
821 allocate a packet for use in daemon<->daemon communication
823 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
825 enum ctdb_operation operation,
826 size_t length, size_t slength,
830 struct ctdb_req_header *hdr;
832 length = MAX(length, slength);
833 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
835 if (ctdb->methods == NULL) {
836 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
837 operation, (unsigned)length));
841 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
843 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
844 operation, (unsigned)length));
847 talloc_set_name_const(hdr, type);
848 memset(hdr, 0, slength);
849 hdr->length = length;
850 hdr->operation = operation;
851 hdr->ctdb_magic = CTDB_MAGIC;
852 hdr->ctdb_version = CTDB_VERSION;
853 hdr->generation = ctdb->vnn_map->generation;
854 hdr->srcnode = ctdb->pnn;
859 struct daemon_control_state {
860 struct daemon_control_state *next, *prev;
861 struct ctdb_client *client;
862 struct ctdb_req_control *c;
864 struct ctdb_node *node;
868 callback when a control reply comes in
870 static void daemon_control_callback(struct ctdb_context *ctdb,
871 int32_t status, TDB_DATA data,
872 const char *errormsg,
875 struct daemon_control_state *state = talloc_get_type(private_data,
876 struct daemon_control_state);
877 struct ctdb_client *client = state->client;
878 struct ctdb_reply_control *r;
881 /* construct a message to send to the client containing the data */
882 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
884 len += strlen(errormsg);
886 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
887 struct ctdb_reply_control);
888 CTDB_NO_MEMORY_VOID(ctdb, r);
890 r->hdr.reqid = state->reqid;
892 r->datalen = data.dsize;
894 memcpy(&r->data[0], data.dptr, data.dsize);
896 r->errorlen = strlen(errormsg);
897 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
900 daemon_queue_send(client, &r->hdr);
906 fail all pending controls to a disconnected node
908 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
910 struct daemon_control_state *state;
911 while ((state = node->pending_controls)) {
912 DLIST_REMOVE(node->pending_controls, state);
913 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
914 "node is disconnected", state);
919 destroy a daemon_control_state
921 static int daemon_control_destructor(struct daemon_control_state *state)
924 DLIST_REMOVE(state->node->pending_controls, state);
930 this is called when the ctdb daemon received a ctdb request control
931 from a local client over the unix domain socket
933 static void daemon_request_control_from_client(struct ctdb_client *client,
934 struct ctdb_req_control *c)
938 struct daemon_control_state *state;
939 TALLOC_CTX *tmp_ctx = talloc_new(client);
941 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
942 c->hdr.destnode = client->ctdb->pnn;
945 state = talloc(client, struct daemon_control_state);
946 CTDB_NO_MEMORY_VOID(client->ctdb, state);
948 state->client = client;
949 state->c = talloc_steal(state, c);
950 state->reqid = c->hdr.reqid;
951 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
952 state->node = client->ctdb->nodes[c->hdr.destnode];
953 DLIST_ADD(state->node->pending_controls, state);
958 talloc_set_destructor(state, daemon_control_destructor);
960 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
961 talloc_steal(tmp_ctx, state);
964 data.dptr = &c->data[0];
965 data.dsize = c->datalen;
966 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
967 c->srvid, c->opcode, client->client_id,
969 data, daemon_control_callback,
972 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
976 talloc_free(tmp_ctx);
980 register a call function
982 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
983 ctdb_fn_t fn, int id)
985 struct ctdb_registered_call *call;
986 struct ctdb_db_context *ctdb_db;
988 ctdb_db = find_ctdb_db(ctdb, db_id);
989 if (ctdb_db == NULL) {
993 call = talloc(ctdb_db, struct ctdb_registered_call);
997 DLIST_ADD(ctdb_db->calls, call);
1004 this local messaging handler is ugly, but is needed to prevent
1005 recursion in ctdb_send_message() when the destination node is the
1006 same as the source node
1008 struct ctdb_local_message {
1009 struct ctdb_context *ctdb;
1014 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1015 struct timeval t, void *private_data)
1017 struct ctdb_local_message *m = talloc_get_type(private_data,
1018 struct ctdb_local_message);
1021 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1023 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1024 (unsigned long long)m->srvid));
1029 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1031 struct ctdb_local_message *m;
1032 m = talloc(ctdb, struct ctdb_local_message);
1033 CTDB_NO_MEMORY(ctdb, m);
1038 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1039 if (m->data.dptr == NULL) {
1044 /* this needs to be done as an event to prevent recursion */
1045 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1052 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1053 uint64_t srvid, TDB_DATA data)
1055 struct ctdb_req_message *r;
1058 if (ctdb->methods == NULL) {
1059 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1063 /* see if this is a message to ourselves */
1064 if (pnn == ctdb->pnn) {
1065 return ctdb_local_message(ctdb, srvid, data);
1068 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1069 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1070 struct ctdb_req_message);
1071 CTDB_NO_MEMORY(ctdb, r);
1073 r->hdr.destnode = pnn;
1075 r->datalen = data.dsize;
1076 memcpy(&r->data[0], data.dptr, data.dsize);
1078 ctdb_queue_packet(ctdb, &r->hdr);
1086 struct ctdb_client_notify_list {
1087 struct ctdb_client_notify_list *next, *prev;
1088 struct ctdb_context *ctdb;
1094 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1098 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1100 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1102 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1108 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1110 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1111 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1112 struct ctdb_client_notify_list *nl;
1114 DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1116 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1117 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1121 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1122 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1127 if (client == NULL) {
1128 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1132 for(nl=client->notify; nl; nl=nl->next) {
1133 if (nl->srvid == notify->srvid) {
1138 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1142 nl = talloc(client, struct ctdb_client_notify_list);
1143 CTDB_NO_MEMORY(ctdb, nl);
1145 nl->srvid = notify->srvid;
1146 nl->data.dsize = notify->len;
1147 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1148 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1149 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1151 DLIST_ADD(client->notify, nl);
1152 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1157 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1159 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1160 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1161 struct ctdb_client_notify_list *nl;
1163 DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1165 if (client == NULL) {
1166 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1170 for(nl=client->notify; nl; nl=nl->next) {
1171 if (nl->srvid == notify->srvid) {
1176 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1180 DLIST_REMOVE(client->notify, nl);
1181 talloc_set_destructor(nl, NULL);
1187 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1189 struct ctdb_client_pid_list *client_pid;
1191 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1192 if (client_pid->pid == pid) {
1193 return client_pid->client;
1200 /* This control is used by samba when probing if a process (of a samba daemon)
1202 Samba does this when it needs/wants to check if a subrecord in one of the
1203 databases is still valied, or if it is stale and can be removed.
1204 If the node is in unhealthy or stopped state we just kill of the samba
1205 process holding htis sub-record and return to the calling samba that
1206 the process does not exist.
1207 This allows us to forcefully recall subrecords registered by samba processes
1208 on banned and stopped nodes.
1210 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1212 struct ctdb_client *client;
1214 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1215 client = ctdb_find_client_by_pid(ctdb, pid);
1216 if (client != NULL) {
1217 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1218 talloc_free(client);
1223 return kill(pid, 0);