4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/blocking.h"
38 #include "ctdb_version.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
42 #include "common/rb_tree.h"
43 #include "common/reqid.h"
44 #include "common/system.h"
45 #include "common/common.h"
46 #include "common/logging.h"
48 struct ctdb_client_pid_list {
49 struct ctdb_client_pid_list *next, *prev;
50 struct ctdb_context *ctdb;
52 struct ctdb_client *client;
55 const char *ctdbd_pidfile = NULL;
57 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
59 static void print_exit_message(void)
61 if (debug_extra != NULL && debug_extra[0] != '\0') {
62 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
64 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
66 /* Wait a second to allow pending log messages to be flushed */
73 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
74 struct timeval t, void *private_data)
76 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
78 if (getpid() != ctdb->ctdbd_pid) {
82 tevent_add_timer(ctdb->ev, ctdb,
83 timeval_current_ofs(1, 0),
84 ctdb_time_tick, ctdb);
87 /* Used to trigger a dummy event once per second, to make
88 * detection of hangs more reliable.
90 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
92 tevent_add_timer(ctdb->ev, ctdb,
93 timeval_current_ofs(1, 0),
94 ctdb_time_tick, ctdb);
97 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
99 /* start monitoring for connected/disconnected nodes */
100 ctdb_start_keepalive(ctdb);
102 /* start periodic update of tcp tickle lists */
103 ctdb_start_tcp_tickle_update(ctdb);
105 /* start listening for recovery daemon pings */
106 ctdb_control_recd_ping(ctdb);
108 /* start listening to timer ticks */
109 ctdb_start_time_tickd(ctdb);
112 static void ignore_signal(int signum)
114 struct sigaction act;
116 memset(&act, 0, sizeof(act));
118 act.sa_handler = SIG_IGN;
119 sigemptyset(&act.sa_mask);
120 sigaddset(&act.sa_mask, signum);
121 sigaction(signum, &act, NULL);
126 send a packet to a client
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
130 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131 if (hdr->operation == CTDB_REQ_MESSAGE) {
132 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
142 message handler for when we are in daemon mode. This redirects the message
145 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
148 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149 struct ctdb_req_message_old *r;
152 /* construct a message to send to the client containing the data */
153 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
154 r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
155 len, struct ctdb_req_message_old);
156 CTDB_NO_MEMORY_VOID(client->ctdb, r);
158 talloc_set_name_const(r, "req_message packet");
161 r->datalen = data.dsize;
162 memcpy(&r->data[0], data.dptr, data.dsize);
164 daemon_queue_send(client, &r->hdr);
170 this is called when the ctdb daemon received a ctdb request to
171 set the srvid from the client
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
175 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
177 if (client == NULL) {
178 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
181 res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
184 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
185 (unsigned long long)srvid));
187 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
188 (unsigned long long)srvid));
195 this is called when the ctdb daemon received a ctdb request to
196 remove a srvid from the client
198 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
200 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
201 if (client == NULL) {
202 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
205 return srvid_deregister(ctdb->srv, srvid, client);
208 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
215 if ((indata.dsize % sizeof(uint64_t)) != 0) {
216 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
217 "size=%d\n", (int)indata.dsize));
221 ids = (uint64_t *)indata.dptr;
222 num_ids = indata.dsize / 8;
224 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
225 if (results == NULL) {
226 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
229 for (i=0; i<num_ids; i++) {
230 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
231 results[i/8] |= (1 << (i%8));
234 outdata->dptr = (uint8_t *)results;
235 outdata->dsize = talloc_get_size(results);
240 destroy a ctdb_client
242 static int ctdb_client_destructor(struct ctdb_client *client)
244 struct ctdb_db_context *ctdb_db;
246 ctdb_takeover_client_destructor_hook(client);
247 reqid_remove(client->ctdb->idr, client->client_id);
248 client->ctdb->num_clients--;
250 if (client->num_persistent_updates != 0) {
251 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
252 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
254 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
256 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
257 "commit active. Forcing recovery.\n"));
258 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
261 * trans3 transaction state:
263 * The destructor sets the pointer to NULL.
265 talloc_free(ctdb_db->persistent_state);
273 this is called when the ctdb daemon received a ctdb request message
274 from a local client over the unix domain socket
276 static void daemon_request_message_from_client(struct ctdb_client *client,
277 struct ctdb_req_message_old *c)
282 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
283 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
286 /* maybe the message is for another client on this node */
287 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
288 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292 /* its for a remote node */
293 data.dptr = &c->data[0];
294 data.dsize = c->datalen;
295 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
298 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
304 struct daemon_call_state {
305 struct ctdb_client *client;
307 struct ctdb_call *call;
308 struct timeval start_time;
310 /* readonly request ? */
311 uint32_t readonly_fetch;
312 uint32_t client_callid;
316 complete a call from a client
318 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
320 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
321 struct daemon_call_state);
322 struct ctdb_reply_call_old *r;
325 struct ctdb_client *client = dstate->client;
326 struct ctdb_db_context *ctdb_db = state->ctdb_db;
328 talloc_steal(client, dstate);
329 talloc_steal(dstate, dstate->call);
331 res = ctdb_daemon_call_recv(state, dstate->call);
333 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
334 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
336 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340 length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
341 /* If the client asked for readonly FETCH, we remapped this to
342 FETCH_WITH_HEADER when calling the daemon. So we must
343 strip the extra header off the reply data before passing
344 it back to the client.
346 if (dstate->readonly_fetch
347 && dstate->client_callid == CTDB_FETCH_FUNC) {
348 length -= sizeof(struct ctdb_ltdb_header);
351 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
352 length, struct ctdb_reply_call_old);
354 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
355 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
356 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
359 r->hdr.reqid = dstate->reqid;
360 r->status = dstate->call->status;
362 if (dstate->readonly_fetch
363 && dstate->client_callid == CTDB_FETCH_FUNC) {
364 /* client only asked for a FETCH so we must strip off
365 the extra ctdb_ltdb header
367 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
368 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
370 r->datalen = dstate->call->reply_data.dsize;
371 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
374 res = daemon_queue_send(client, &r->hdr);
376 /* client is dead - return immediately */
380 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
382 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
383 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387 struct ctdb_daemon_packet_wrap {
388 struct ctdb_context *ctdb;
393 a wrapper to catch disconnected clients
395 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
397 struct ctdb_client *client;
398 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
399 struct ctdb_daemon_packet_wrap);
401 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405 client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
406 if (client == NULL) {
407 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
415 daemon_incoming_packet(client, hdr);
418 struct ctdb_deferred_fetch_call {
419 struct ctdb_deferred_fetch_call *next, *prev;
420 struct ctdb_req_call_old *c;
421 struct ctdb_daemon_packet_wrap *w;
424 struct ctdb_deferred_fetch_queue {
425 struct ctdb_deferred_fetch_call *deferred_calls;
428 struct ctdb_deferred_requeue {
429 struct ctdb_deferred_fetch_call *dfc;
430 struct ctdb_client *client;
433 /* called from a timer event and starts reprocessing the deferred call.*/
434 static void reprocess_deferred_call(struct tevent_context *ev,
435 struct tevent_timer *te,
436 struct timeval t, void *private_data)
438 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
439 struct ctdb_client *client = dfr->client;
441 talloc_steal(client, dfr->dfc->c);
442 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446 /* the referral context is destroyed either after a timeout or when the initial
447 fetch-lock has finished.
448 at this stage, immediately start reprocessing the queued up deferred
449 calls so they get reprocessed immediately (and since we are dmaster at
450 this stage, trigger the waiting smbd processes to pick up and aquire the
453 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
456 /* need to reprocess the packets from the queue explicitely instead of
457 just using a normal destructor since we want, need, to
458 call the clients in the same oder as the requests queued up
460 while (dfq->deferred_calls != NULL) {
461 struct ctdb_client *client;
462 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
463 struct ctdb_deferred_requeue *dfr;
465 DLIST_REMOVE(dfq->deferred_calls, dfc);
467 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
468 if (client == NULL) {
469 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
474 /* process it by pushing it back onto the eventloop */
475 dfr = talloc(client, struct ctdb_deferred_requeue);
477 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481 dfr->dfc = talloc_steal(dfr, dfc);
482 dfr->client = client;
484 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
485 reprocess_deferred_call, dfr);
491 /* insert the new deferral context into the rb tree.
492 there should never be a pre-existing context here, but check for it
493 warn and destroy the previous context if there is already a deferral context
496 static void *insert_dfq_callback(void *parm, void *data)
499 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
505 /* if the original fetch-lock did not complete within a reasonable time,
506 free the context and context for all deferred requests to cause them to be
507 re-inserted into the event system.
509 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
510 struct timeval t, void *private_data)
512 talloc_free(private_data);
515 /* This function is used in the local daemon to register a KEY in a database
517 While the remote fetch is in-flight, any futher attempts to re-fetch the
518 same record will be deferred until the fetch completes.
520 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
523 struct ctdb_deferred_fetch_queue *dfq;
525 k = ctdb_key_to_idkey(call, call->key);
527 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
533 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537 dfq->deferred_calls = NULL;
539 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
541 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
543 /* if the fetch havent completed in 30 seconds, just tear it all down
544 and let it try again as the events are reissued */
545 tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
552 /* check if this is a duplicate request to a fetch already in-flight
553 if it is, make this call deferred to be reprocessed later when
554 the in-flight fetch completes.
556 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
559 struct ctdb_deferred_fetch_queue *dfq;
560 struct ctdb_deferred_fetch_call *dfc;
562 k = ctdb_key_to_idkey(c, key);
564 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
577 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
579 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
584 if (dfc->w == NULL) {
585 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
590 dfc->c = talloc_steal(dfc, c);
591 dfc->w->ctdb = ctdb_db->ctdb;
592 dfc->w->client_id = client->client_id;
594 DLIST_ADD_END(dfq->deferred_calls, dfc);
601 this is called when the ctdb daemon received a ctdb request call
602 from a local client over the unix domain socket
604 static void daemon_request_call_from_client(struct ctdb_client *client,
605 struct ctdb_req_call_old *c)
607 struct ctdb_call_state *state;
608 struct ctdb_db_context *ctdb_db;
609 struct daemon_call_state *dstate;
610 struct ctdb_call *call;
611 struct ctdb_ltdb_header header;
614 struct ctdb_context *ctdb = client->ctdb;
615 struct ctdb_daemon_packet_wrap *w;
617 CTDB_INCREMENT_STAT(ctdb, total_calls);
618 CTDB_INCREMENT_STAT(ctdb, pending_calls);
620 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
622 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
624 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628 if (ctdb_db->unhealthy_reason) {
630 * this is just a warning, as the tdb should be empty anyway,
631 * and only persistent databases can be unhealthy, which doesn't
632 * use this code patch
634 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
635 ctdb_db->db_name, ctdb_db->unhealthy_reason));
639 key.dsize = c->keylen;
641 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
642 CTDB_NO_MEMORY_VOID(ctdb, w);
645 w->client_id = client->client_id;
647 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
648 (struct ctdb_req_header *)c, &data,
649 daemon_incoming_packet_wrap, w, true);
651 /* will retry later */
652 CTDB_DECREMENT_STAT(ctdb, pending_calls);
659 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
660 CTDB_DECREMENT_STAT(ctdb, pending_calls);
665 /* check if this fetch request is a duplicate for a
666 request we already have in flight. If so defer it until
667 the first request completes.
669 if (ctdb->tunable.fetch_collapse == 1) {
670 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
671 ret = ctdb_ltdb_unlock(ctdb_db, key);
673 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
675 CTDB_DECREMENT_STAT(ctdb, pending_calls);
680 /* Dont do READONLY if we don't have a tracking database */
681 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
682 c->flags &= ~CTDB_WANT_READONLY;
685 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
686 header.flags &= ~CTDB_REC_RO_FLAGS;
687 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
688 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
689 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
690 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
692 /* and clear out the tracking data */
693 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
694 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
698 /* if we are revoking, we must defer all other calls until the revoke
701 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
702 talloc_free(data.dptr);
703 ret = ctdb_ltdb_unlock(ctdb_db, key);
705 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
706 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
708 CTDB_DECREMENT_STAT(ctdb, pending_calls);
712 if ((header.dmaster == ctdb->pnn)
713 && (!(c->flags & CTDB_WANT_READONLY))
714 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
715 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
716 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
717 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
719 ret = ctdb_ltdb_unlock(ctdb_db, key);
721 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
722 ctdb_fatal(ctdb, "Failed to start record revoke");
724 talloc_free(data.dptr);
726 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
727 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
730 CTDB_DECREMENT_STAT(ctdb, pending_calls);
734 dstate = talloc(client, struct daemon_call_state);
735 if (dstate == NULL) {
736 ret = ctdb_ltdb_unlock(ctdb_db, key);
738 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
741 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
742 CTDB_DECREMENT_STAT(ctdb, pending_calls);
745 dstate->start_time = timeval_current();
746 dstate->client = client;
747 dstate->reqid = c->hdr.reqid;
748 talloc_steal(dstate, data.dptr);
750 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
752 ret = ctdb_ltdb_unlock(ctdb_db, key);
754 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
757 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
758 CTDB_DECREMENT_STAT(ctdb, pending_calls);
759 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
763 dstate->readonly_fetch = 0;
764 call->call_id = c->callid;
766 call->call_data.dptr = c->data + c->keylen;
767 call->call_data.dsize = c->calldatalen;
768 call->flags = c->flags;
770 if (c->flags & CTDB_WANT_READONLY) {
771 /* client wants readonly record, so translate this into a
772 fetch with header. remember what the client asked for
773 so we can remap the reply back to the proper format for
774 the client in the reply
776 dstate->client_callid = call->call_id;
777 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
778 dstate->readonly_fetch = 1;
781 if (header.dmaster == ctdb->pnn) {
782 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
784 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
785 if (ctdb->tunable.fetch_collapse == 1) {
786 /* This request triggered a remote fetch-lock.
787 set up a deferral for this key so any additional
788 fetch-locks are deferred until the current one
791 setup_deferred_fetch_locks(ctdb_db, call);
795 ret = ctdb_ltdb_unlock(ctdb_db, key);
797 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
801 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
802 CTDB_DECREMENT_STAT(ctdb, pending_calls);
803 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
806 talloc_steal(state, dstate);
807 talloc_steal(client, state);
809 state->async.fn = daemon_call_from_client_callback;
810 state->async.private_data = dstate;
814 static void daemon_request_control_from_client(struct ctdb_client *client,
815 struct ctdb_req_control_old *c);
817 /* data contains a packet from the client */
818 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
820 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
822 struct ctdb_context *ctdb = client->ctdb;
824 /* place the packet as a child of a tmp_ctx. We then use
825 talloc_free() below to free it. If any of the calls want
826 to keep it, then they will steal it somewhere else, and the
827 talloc_free() will be a no-op */
828 tmp_ctx = talloc_new(client);
829 talloc_steal(tmp_ctx, hdr);
831 if (hdr->ctdb_magic != CTDB_MAGIC) {
832 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
836 if (hdr->ctdb_version != CTDB_PROTOCOL) {
837 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
841 switch (hdr->operation) {
843 CTDB_INCREMENT_STAT(ctdb, client.req_call);
844 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
847 case CTDB_REQ_MESSAGE:
848 CTDB_INCREMENT_STAT(ctdb, client.req_message);
849 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
852 case CTDB_REQ_CONTROL:
853 CTDB_INCREMENT_STAT(ctdb, client.req_control);
854 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
858 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
863 talloc_free(tmp_ctx);
867 called when the daemon gets a incoming packet
869 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
871 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
872 struct ctdb_req_header *hdr;
879 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
881 if (cnt < sizeof(*hdr)) {
882 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
886 hdr = (struct ctdb_req_header *)data;
887 if (cnt != hdr->length) {
888 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
889 (unsigned)hdr->length, (unsigned)cnt);
893 if (hdr->ctdb_magic != CTDB_MAGIC) {
894 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
898 if (hdr->ctdb_version != CTDB_PROTOCOL) {
899 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
903 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
904 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
905 hdr->srcnode, hdr->destnode));
907 /* it is the responsibility of the incoming packet function to free 'data' */
908 daemon_incoming_packet(client, hdr);
912 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
914 if (client_pid->ctdb->client_pids != NULL) {
915 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
922 static void ctdb_accept_client(struct tevent_context *ev,
923 struct tevent_fd *fde, uint16_t flags,
926 struct sockaddr_un addr;
929 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
930 struct ctdb_client *client;
931 struct ctdb_client_pid_list *client_pid;
935 memset(&addr, 0, sizeof(addr));
937 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
942 ret = set_blocking(fd, false);
946 " failed to set socket non-blocking (%s)\n",
952 set_close_on_exec(fd);
954 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
956 client = talloc_zero(ctdb, struct ctdb_client);
957 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
958 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
963 client->client_id = reqid_new(ctdb->idr, client);
964 client->pid = peer_pid;
966 client_pid = talloc(client, struct ctdb_client_pid_list);
967 if (client_pid == NULL) {
968 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
973 client_pid->ctdb = ctdb;
974 client_pid->pid = peer_pid;
975 client_pid->client = client;
977 DLIST_ADD(ctdb->client_pids, client_pid);
979 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
980 ctdb_daemon_read_cb, client,
981 "client-%u", client->pid);
983 talloc_set_destructor(client, ctdb_client_destructor);
984 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
991 create a unix domain socket and bind it
992 return a file descriptor open on the socket
994 static int ux_socket_bind(struct ctdb_context *ctdb)
996 struct sockaddr_un addr;
999 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1000 if (ctdb->daemon.sd == -1) {
1004 memset(&addr, 0, sizeof(addr));
1005 addr.sun_family = AF_UNIX;
1006 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1008 /* First check if an old ctdbd might be running */
1009 if (connect(ctdb->daemon.sd,
1010 (struct sockaddr *)&addr, sizeof(addr)) == 0) {
1012 ("Something is already listening on ctdb socket '%s'\n",
1013 ctdb->daemon.name));
1017 /* Remove any old socket */
1018 unlink(ctdb->daemon.name);
1020 set_close_on_exec(ctdb->daemon.sd);
1022 ret = set_blocking(ctdb->daemon.sd, false);
1026 " failed to set socket non-blocking (%s)\n",
1031 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1032 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1036 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1037 chmod(ctdb->daemon.name, 0700) != 0) {
1038 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1043 if (listen(ctdb->daemon.sd, 100) != 0) {
1044 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1048 DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1049 ctdb->daemon.name));
1053 close(ctdb->daemon.sd);
1054 ctdb->daemon.sd = -1;
1058 static void initialise_node_flags (struct ctdb_context *ctdb)
1060 if (ctdb->pnn == -1) {
1061 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1064 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1066 /* do we start out in DISABLED mode? */
1067 if (ctdb->start_as_disabled != 0) {
1068 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1069 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1071 /* do we start out in STOPPED mode? */
1072 if (ctdb->start_as_stopped != 0) {
1073 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1074 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1078 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1082 ctdb_die(ctdb, "Failed to run setup event");
1084 ctdb_run_notification_script(ctdb, "setup");
1086 /* tell all other nodes we've just started up */
1087 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1088 0, CTDB_CONTROL_STARTUP, 0,
1089 CTDB_CTRL_FLAG_NOREPLY,
1090 tdb_null, NULL, NULL);
1092 /* Start the recovery daemon */
1093 if (ctdb_start_recoverd(ctdb) != 0) {
1094 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1098 ctdb_start_periodic_events(ctdb);
1100 ctdb_wait_for_first_recovery(ctdb);
1103 static struct timeval tevent_before_wait_ts;
1104 static struct timeval tevent_after_wait_ts;
1106 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1109 struct timeval diff;
1111 struct ctdb_context *ctdb =
1112 talloc_get_type(private_data, struct ctdb_context);
1114 if (getpid() != ctdb->ctdbd_pid) {
1118 now = timeval_current();
1121 case TEVENT_TRACE_BEFORE_WAIT:
1122 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1123 diff = timeval_until(&tevent_after_wait_ts, &now);
1124 if (diff.tv_sec > 3) {
1126 ("Handling event took %ld seconds!\n",
1127 (long)diff.tv_sec));
1130 tevent_before_wait_ts = now;
1133 case TEVENT_TRACE_AFTER_WAIT:
1134 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1135 diff = timeval_until(&tevent_before_wait_ts, &now);
1136 if (diff.tv_sec > 3) {
1138 ("No event for %ld seconds!\n",
1139 (long)diff.tv_sec));
1142 tevent_after_wait_ts = now;
1146 /* Do nothing for future tevent trace points */ ;
1150 static void ctdb_remove_pidfile(void)
1152 /* Only the main ctdbd's PID matches the SID */
1153 if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1154 if (unlink(ctdbd_pidfile) == 0) {
1155 DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1158 DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1164 static void ctdb_create_pidfile(pid_t pid)
1166 if (ctdbd_pidfile != NULL) {
1169 fp = fopen(ctdbd_pidfile, "w");
1172 ("Failed to open PID file %s\n", ctdbd_pidfile));
1176 fprintf(fp, "%d\n", pid);
1178 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1179 atexit(ctdb_remove_pidfile);
1183 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1187 /* initialize the vnn mapping table, skipping any deleted nodes */
1188 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1189 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1192 for (i = 0; i < ctdb->num_nodes; i++) {
1193 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1198 ctdb->vnn_map->generation = INVALID_GENERATION;
1199 ctdb->vnn_map->size = count;
1200 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1201 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1203 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1204 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1207 ctdb->vnn_map->map[j] = i;
1212 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1216 if (ctdb->address == NULL) {
1218 "Can not determine PNN - node address is not set\n");
1221 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1224 "Can not determine PNN - node address not found in node list\n");
1227 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1228 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1232 start the protocol going as a daemon
1234 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1237 struct tevent_fd *fde;
1239 /* create a unix domain stream socket to listen to */
1240 res = ux_socket_bind(ctdb);
1242 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1246 if (do_fork && fork()) {
1250 tdb_reopen_all(false);
1253 if (setsid() == -1) {
1254 ctdb_die(ctdb, "Failed to setsid()\n");
1257 if (open("/dev/null", O_RDONLY) != 0) {
1258 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1262 ignore_signal(SIGPIPE);
1263 ignore_signal(SIGUSR1);
1265 ctdb->ctdbd_pid = getpid();
1266 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1267 CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1268 ctdb_create_pidfile(ctdb->ctdbd_pid);
1270 /* Make sure we log something when the daemon terminates.
1271 * This must be the first exit handler to run (so the last to
1274 atexit(print_exit_message);
1276 if (ctdb->do_setsched) {
1277 /* try to set us up as realtime */
1278 if (!set_scheduler()) {
1281 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1284 ctdb->ev = tevent_context_init(NULL);
1285 if (ctdb->ev == NULL) {
1286 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1289 tevent_loop_allow_nesting(ctdb->ev);
1290 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1291 ret = ctdb_init_tevent_logging(ctdb);
1293 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1297 /* set up a handler to pick up sigchld */
1298 if (ctdb_init_sigchld(ctdb) == NULL) {
1299 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1303 ctdb_set_child_logging(ctdb);
1305 TALLOC_FREE(ctdb->srv);
1306 if (srvid_init(ctdb, &ctdb->srv) != 0) {
1307 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1311 /* initialize statistics collection */
1312 ctdb_statistics_init(ctdb);
1314 /* force initial recovery for election */
1315 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1317 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1318 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1320 ctdb_die(ctdb, "Failed to run init event\n");
1322 ctdb_run_notification_script(ctdb, "init");
1324 if (strcmp(ctdb->transport, "tcp") == 0) {
1325 ret = ctdb_tcp_init(ctdb);
1327 #ifdef USE_INFINIBAND
1328 if (strcmp(ctdb->transport, "ib") == 0) {
1329 ret = ctdb_ibw_init(ctdb);
1333 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1337 if (ctdb->methods == NULL) {
1338 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1339 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1342 /* Initialise the transport. This sets the node address if it
1343 * was not set via the command-line. */
1344 if (ctdb->methods->initialise(ctdb) != 0) {
1345 ctdb_fatal(ctdb, "transport failed to initialise");
1348 ctdb_set_my_pnn(ctdb);
1350 initialise_node_flags(ctdb);
1352 if (ctdb->public_addresses_file) {
1353 ret = ctdb_set_public_addresses(ctdb, true);
1355 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1360 ctdb_initialise_vnn_map(ctdb);
1362 /* attach to existing databases */
1363 if (ctdb_attach_databases(ctdb) != 0) {
1364 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1367 /* start frozen, then let the first election sort things out */
1368 if (!ctdb_blocking_freeze(ctdb)) {
1369 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1372 /* now start accepting clients, only can do this once frozen */
1373 fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1374 ctdb_accept_client, ctdb);
1376 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1378 tevent_fd_set_auto_close(fde);
1380 /* Start the transport */
1381 if (ctdb->methods->start(ctdb) != 0) {
1382 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1383 ctdb_fatal(ctdb, "transport failed to start");
1386 /* Recovery daemon and timed events are started from the
1387 * callback, only after the setup event completes
1390 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1391 ret = ctdb_event_script_callback(ctdb,
1393 ctdb_setup_event_callback,
1399 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1403 lockdown_memory(ctdb->valgrinding);
1405 /* go into a wait loop to allow other nodes to complete */
1406 tevent_loop_wait(ctdb->ev);
1408 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1413 allocate a packet for use in daemon<->daemon communication
1415 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1416 TALLOC_CTX *mem_ctx,
1417 enum ctdb_operation operation,
1418 size_t length, size_t slength,
1422 struct ctdb_req_header *hdr;
1424 length = MAX(length, slength);
1425 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1427 if (ctdb->methods == NULL) {
1428 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1429 operation, (unsigned)length));
1433 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1435 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1436 operation, (unsigned)length));
1439 talloc_set_name_const(hdr, type);
1440 memset(hdr, 0, slength);
1441 hdr->length = length;
1442 hdr->operation = operation;
1443 hdr->ctdb_magic = CTDB_MAGIC;
1444 hdr->ctdb_version = CTDB_PROTOCOL;
1445 hdr->generation = ctdb->vnn_map->generation;
1446 hdr->srcnode = ctdb->pnn;
1451 struct daemon_control_state {
1452 struct daemon_control_state *next, *prev;
1453 struct ctdb_client *client;
1454 struct ctdb_req_control_old *c;
1456 struct ctdb_node *node;
1460 callback when a control reply comes in
1462 static void daemon_control_callback(struct ctdb_context *ctdb,
1463 int32_t status, TDB_DATA data,
1464 const char *errormsg,
1467 struct daemon_control_state *state = talloc_get_type(private_data,
1468 struct daemon_control_state);
1469 struct ctdb_client *client = state->client;
1470 struct ctdb_reply_control_old *r;
1474 /* construct a message to send to the client containing the data */
1475 len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1477 len += strlen(errormsg);
1479 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1480 struct ctdb_reply_control_old);
1481 CTDB_NO_MEMORY_VOID(ctdb, r);
1483 r->hdr.reqid = state->reqid;
1485 r->datalen = data.dsize;
1487 memcpy(&r->data[0], data.dptr, data.dsize);
1489 r->errorlen = strlen(errormsg);
1490 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1493 ret = daemon_queue_send(client, &r->hdr);
1500 fail all pending controls to a disconnected node
1502 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1504 struct daemon_control_state *state;
1505 while ((state = node->pending_controls)) {
1506 DLIST_REMOVE(node->pending_controls, state);
1507 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1508 "node is disconnected", state);
1513 destroy a daemon_control_state
1515 static int daemon_control_destructor(struct daemon_control_state *state)
1518 DLIST_REMOVE(state->node->pending_controls, state);
1524 this is called when the ctdb daemon received a ctdb request control
1525 from a local client over the unix domain socket
1527 static void daemon_request_control_from_client(struct ctdb_client *client,
1528 struct ctdb_req_control_old *c)
1532 struct daemon_control_state *state;
1533 TALLOC_CTX *tmp_ctx = talloc_new(client);
1535 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1536 c->hdr.destnode = client->ctdb->pnn;
1539 state = talloc(client, struct daemon_control_state);
1540 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1542 state->client = client;
1543 state->c = talloc_steal(state, c);
1544 state->reqid = c->hdr.reqid;
1545 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1546 state->node = client->ctdb->nodes[c->hdr.destnode];
1547 DLIST_ADD(state->node->pending_controls, state);
1552 talloc_set_destructor(state, daemon_control_destructor);
1554 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1555 talloc_steal(tmp_ctx, state);
1558 data.dptr = &c->data[0];
1559 data.dsize = c->datalen;
1560 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1561 c->srvid, c->opcode, client->client_id,
1563 data, daemon_control_callback,
1566 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1570 talloc_free(tmp_ctx);
1574 register a call function
1576 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1577 ctdb_fn_t fn, int id)
1579 struct ctdb_registered_call *call;
1580 struct ctdb_db_context *ctdb_db;
1582 ctdb_db = find_ctdb_db(ctdb, db_id);
1583 if (ctdb_db == NULL) {
1587 call = talloc(ctdb_db, struct ctdb_registered_call);
1591 DLIST_ADD(ctdb_db->calls, call);
1598 this local messaging handler is ugly, but is needed to prevent
1599 recursion in ctdb_send_message() when the destination node is the
1600 same as the source node
1602 struct ctdb_local_message {
1603 struct ctdb_context *ctdb;
1608 static void ctdb_local_message_trigger(struct tevent_context *ev,
1609 struct tevent_timer *te,
1610 struct timeval t, void *private_data)
1612 struct ctdb_local_message *m = talloc_get_type(
1613 private_data, struct ctdb_local_message);
1615 srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1619 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1621 struct ctdb_local_message *m;
1622 m = talloc(ctdb, struct ctdb_local_message);
1623 CTDB_NO_MEMORY(ctdb, m);
1628 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1629 if (m->data.dptr == NULL) {
1634 /* this needs to be done as an event to prevent recursion */
1635 tevent_add_timer(ctdb->ev, m, timeval_zero(),
1636 ctdb_local_message_trigger, m);
1643 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1644 uint64_t srvid, TDB_DATA data)
1646 struct ctdb_req_message_old *r;
1649 if (ctdb->methods == NULL) {
1650 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1654 /* see if this is a message to ourselves */
1655 if (pnn == ctdb->pnn) {
1656 return ctdb_local_message(ctdb, srvid, data);
1659 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1660 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1661 struct ctdb_req_message_old);
1662 CTDB_NO_MEMORY(ctdb, r);
1664 r->hdr.destnode = pnn;
1666 r->datalen = data.dsize;
1667 memcpy(&r->data[0], data.dptr, data.dsize);
1669 ctdb_queue_packet(ctdb, &r->hdr);
1677 struct ctdb_client_notify_list {
1678 struct ctdb_client_notify_list *next, *prev;
1679 struct ctdb_context *ctdb;
1685 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1689 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1691 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1693 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1699 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1701 struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1702 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1703 struct ctdb_client_notify_list *nl;
1705 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1707 if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1708 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1712 if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1713 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1718 if (client == NULL) {
1719 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1723 for(nl=client->notify; nl; nl=nl->next) {
1724 if (nl->srvid == notify->srvid) {
1729 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1733 nl = talloc(client, struct ctdb_client_notify_list);
1734 CTDB_NO_MEMORY(ctdb, nl);
1736 nl->srvid = notify->srvid;
1737 nl->data.dsize = notify->len;
1738 nl->data.dptr = talloc_memdup(nl, notify->notify_data,
1740 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1742 DLIST_ADD(client->notify, nl);
1743 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1748 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1750 uint64_t srvid = *(uint64_t *)indata.dptr;
1751 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1752 struct ctdb_client_notify_list *nl;
1754 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1756 if (client == NULL) {
1757 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1761 for(nl=client->notify; nl; nl=nl->next) {
1762 if (nl->srvid == srvid) {
1767 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1771 DLIST_REMOVE(client->notify, nl);
1772 talloc_set_destructor(nl, NULL);
1778 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1780 struct ctdb_client_pid_list *client_pid;
1782 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1783 if (client_pid->pid == pid) {
1784 return client_pid->client;
1791 /* This control is used by samba when probing if a process (of a samba daemon)
1793 Samba does this when it needs/wants to check if a subrecord in one of the
1794 databases is still valied, or if it is stale and can be removed.
1795 If the node is in unhealthy or stopped state we just kill of the samba
1796 process holding htis sub-record and return to the calling samba that
1797 the process does not exist.
1798 This allows us to forcefully recall subrecords registered by samba processes
1799 on banned and stopped nodes.
1801 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1803 struct ctdb_client *client;
1805 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1806 client = ctdb_find_client_by_pid(ctdb, pid);
1807 if (client != NULL) {
1808 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1809 talloc_free(client);
1814 return kill(pid, 0);
1817 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1819 struct ctdb_node_map_old *node_map = NULL;
1821 CHECK_CONTROL_DATA_SIZE(0);
1823 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1824 if (node_map == NULL) {
1825 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1829 outdata->dptr = (unsigned char *)node_map;
1830 outdata->dsize = talloc_get_size(outdata->dptr);
1835 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1837 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1838 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1842 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1843 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1844 ctdb_stop_recoverd(ctdb);
1845 ctdb_stop_keepalive(ctdb);
1846 ctdb_stop_monitoring(ctdb);
1847 ctdb_release_all_ips(ctdb);
1848 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1849 if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1850 ctdb->methods->shutdown(ctdb);
1853 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1857 /* When forking the main daemon and the child process needs to connect
1858 * back to the daemon as a client process, this function can be used
1859 * to change the ctdb context from daemon into client mode. The child
1860 * process must be created using ctdb_fork() and not fork() -
1861 * ctdb_fork() does some necessary housekeeping.
1863 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
1868 /* Add extra information so we can identify this in the logs */
1870 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
1873 /* get a new event context */
1874 ctdb->ev = tevent_context_init(ctdb);
1875 if (ctdb->ev == NULL) {
1876 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1879 tevent_loop_allow_nesting(ctdb->ev);
1881 /* Connect to main CTDB daemon */
1882 ret = ctdb_socket_connect(ctdb);
1884 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1888 ctdb->can_send_controls = true;