4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
33 struct ctdb_client_pid_list {
34 struct ctdb_client_pid_list *next, *prev;
35 struct ctdb_context *ctdb;
37 struct ctdb_client *client;
40 const char *ctdbd_pidfile = NULL;
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
44 static void print_exit_message(void)
46 if (debug_extra != NULL && debug_extra[0] != '\0') {
47 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
49 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
55 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
56 struct timeval t, void *private_data)
58 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
60 if (getpid() != ctdbd_pid) {
64 event_add_timed(ctdb->ev, ctdb,
65 timeval_current_ofs(1, 0),
66 ctdb_time_tick, ctdb);
69 /* Used to trigger a dummy event once per second, to make
70 * detection of hangs more reliable.
72 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
74 event_add_timed(ctdb->ev, ctdb,
75 timeval_current_ofs(1, 0),
76 ctdb_time_tick, ctdb);
79 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
81 /* start monitoring for connected/disconnected nodes */
82 ctdb_start_keepalive(ctdb);
84 /* start monitoring for node health */
85 ctdb_start_monitoring(ctdb);
87 /* start periodic update of tcp tickle lists */
88 ctdb_start_tcp_tickle_update(ctdb);
90 /* start listening for recovery daemon pings */
91 ctdb_control_recd_ping(ctdb);
93 /* start listening to timer ticks */
94 ctdb_start_time_tickd(ctdb);
97 static void block_signal(int signum)
101 memset(&act, 0, sizeof(act));
103 act.sa_handler = SIG_IGN;
104 sigemptyset(&act.sa_mask);
105 sigaddset(&act.sa_mask, signum);
106 sigaction(signum, &act, NULL);
111 send a packet to a client
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
115 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116 if (hdr->operation == CTDB_REQ_MESSAGE) {
117 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
123 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
127 message handler for when we are in daemon mode. This redirects the message
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
131 TDB_DATA data, void *private_data)
133 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134 struct ctdb_req_message *r;
137 /* construct a message to send to the client containing the data */
138 len = offsetof(struct ctdb_req_message, data) + data.dsize;
139 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
140 len, struct ctdb_req_message);
141 CTDB_NO_MEMORY_VOID(ctdb, r);
143 talloc_set_name_const(r, "req_message packet");
146 r->datalen = data.dsize;
147 memcpy(&r->data[0], data.dptr, data.dsize);
149 daemon_queue_send(client, &r->hdr);
155 this is called when the ctdb daemon received a ctdb request to
156 set the srvid from the client
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
160 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
162 if (client == NULL) {
163 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
166 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
168 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
169 (unsigned long long)srvid));
171 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
172 (unsigned long long)srvid));
179 this is called when the ctdb daemon received a ctdb request to
180 remove a srvid from the client
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
184 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185 if (client == NULL) {
186 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
189 return ctdb_deregister_message_handler(ctdb, srvid, client);
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
199 if ((indata.dsize % sizeof(uint64_t)) != 0) {
200 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201 "size=%d\n", (int)indata.dsize));
205 ids = (uint64_t *)indata.dptr;
206 num_ids = indata.dsize / 8;
208 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209 if (results == NULL) {
210 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
213 for (i=0; i<num_ids; i++) {
214 if (ctdb_check_message_handler(ctdb, ids[i])) {
215 results[i/8] |= (1 << (i%8));
218 outdata->dptr = (uint8_t *)results;
219 outdata->dsize = talloc_get_size(results);
224 destroy a ctdb_client
226 static int ctdb_client_destructor(struct ctdb_client *client)
228 struct ctdb_db_context *ctdb_db;
230 ctdb_takeover_client_destructor_hook(client);
231 ctdb_reqid_remove(client->ctdb, client->client_id);
232 client->ctdb->num_clients--;
234 if (client->num_persistent_updates != 0) {
235 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
238 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
240 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241 "commit active. Forcing recovery.\n"));
242 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
244 /* legacy trans2 transaction state: */
245 ctdb_db->transaction_active = false;
248 * trans3 transaction state:
250 * The destructor sets the pointer to NULL.
252 talloc_free(ctdb_db->persistent_state);
260 this is called when the ctdb daemon received a ctdb request message
261 from a local client over the unix domain socket
263 static void daemon_request_message_from_client(struct ctdb_client *client,
264 struct ctdb_req_message *c)
269 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
270 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
273 /* maybe the message is for another client on this node */
274 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
275 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
279 /* its for a remote node */
280 data.dptr = &c->data[0];
281 data.dsize = c->datalen;
282 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
285 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
291 struct daemon_call_state {
292 struct ctdb_client *client;
294 struct ctdb_call *call;
295 struct timeval start_time;
297 /* readonly request ? */
298 uint32_t readonly_fetch;
299 uint32_t client_callid;
303 complete a call from a client
305 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
307 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
308 struct daemon_call_state);
309 struct ctdb_reply_call *r;
312 struct ctdb_client *client = dstate->client;
313 struct ctdb_db_context *ctdb_db = state->ctdb_db;
315 talloc_steal(client, dstate);
316 talloc_steal(dstate, dstate->call);
318 res = ctdb_daemon_call_recv(state, dstate->call);
320 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
321 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
323 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
327 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
328 /* If the client asked for readonly FETCH, we remapped this to
329 FETCH_WITH_HEADER when calling the daemon. So we must
330 strip the extra header off the reply data before passing
331 it back to the client.
333 if (dstate->readonly_fetch
334 && dstate->client_callid == CTDB_FETCH_FUNC) {
335 length -= sizeof(struct ctdb_ltdb_header);
338 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
339 length, struct ctdb_reply_call);
341 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
342 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
343 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
346 r->hdr.reqid = dstate->reqid;
347 r->status = dstate->call->status;
349 if (dstate->readonly_fetch
350 && dstate->client_callid == CTDB_FETCH_FUNC) {
351 /* client only asked for a FETCH so we must strip off
352 the extra ctdb_ltdb header
354 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
355 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
357 r->datalen = dstate->call->reply_data.dsize;
358 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
361 res = daemon_queue_send(client, &r->hdr);
363 /* client is dead - return immediately */
367 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
369 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
370 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
374 struct ctdb_daemon_packet_wrap {
375 struct ctdb_context *ctdb;
380 a wrapper to catch disconnected clients
382 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
384 struct ctdb_client *client;
385 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
386 struct ctdb_daemon_packet_wrap);
388 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
392 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
393 if (client == NULL) {
394 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
402 daemon_incoming_packet(client, hdr);
405 struct ctdb_deferred_fetch_call {
406 struct ctdb_deferred_fetch_call *next, *prev;
407 struct ctdb_req_call *c;
408 struct ctdb_daemon_packet_wrap *w;
411 struct ctdb_deferred_fetch_queue {
412 struct ctdb_deferred_fetch_call *deferred_calls;
415 struct ctdb_deferred_requeue {
416 struct ctdb_deferred_fetch_call *dfc;
417 struct ctdb_client *client;
420 /* called from a timer event and starts reprocessing the deferred call.*/
421 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
422 struct timeval t, void *private_data)
424 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
425 struct ctdb_client *client = dfr->client;
427 talloc_steal(client, dfr->dfc->c);
428 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
432 /* the referral context is destroyed either after a timeout or when the initial
433 fetch-lock has finished.
434 at this stage, immediately start reprocessing the queued up deferred
435 calls so they get reprocessed immediately (and since we are dmaster at
436 this stage, trigger the waiting smbd processes to pick up and aquire the
439 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
442 /* need to reprocess the packets from the queue explicitely instead of
443 just using a normal destructor since we want, need, to
444 call the clients in the same oder as the requests queued up
446 while (dfq->deferred_calls != NULL) {
447 struct ctdb_client *client;
448 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
449 struct ctdb_deferred_requeue *dfr;
451 DLIST_REMOVE(dfq->deferred_calls, dfc);
453 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
454 if (client == NULL) {
455 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
460 /* process it by pushing it back onto the eventloop */
461 dfr = talloc(client, struct ctdb_deferred_requeue);
463 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
467 dfr->dfc = talloc_steal(dfr, dfc);
468 dfr->client = client;
470 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
476 /* insert the new deferral context into the rb tree.
477 there should never be a pre-existing context here, but check for it
478 warn and destroy the previous context if there is already a deferral context
481 static void *insert_dfq_callback(void *parm, void *data)
484 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
490 /* if the original fetch-lock did not complete within a reasonable time,
491 free the context and context for all deferred requests to cause them to be
492 re-inserted into the event system.
494 static void dfq_timeout(struct event_context *ev, struct timed_event *te,
495 struct timeval t, void *private_data)
497 talloc_free(private_data);
500 /* This function is used in the local daemon to register a KEY in a database
502 While the remote fetch is in-flight, any futher attempts to re-fetch the
503 same record will be deferred until the fetch completes.
505 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
508 struct ctdb_deferred_fetch_queue *dfq;
510 k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
512 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
516 k[0] = (call->key.dsize + 3) / 4 + 1;
517 memcpy(&k[1], call->key.dptr, call->key.dsize);
519 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
521 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
525 dfq->deferred_calls = NULL;
527 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
529 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
531 /* if the fetch havent completed in 30 seconds, just tear it all down
532 and let it try again as the events are reissued */
533 event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
539 /* check if this is a duplicate request to a fetch already in-flight
540 if it is, make this call deferred to be reprocessed later when
541 the in-flight fetch completes.
543 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
546 struct ctdb_deferred_fetch_queue *dfq;
547 struct ctdb_deferred_fetch_call *dfc;
549 k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
551 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
555 k[0] = (key.dsize + 3) / 4 + 1;
556 memcpy(&k[1], key.dptr, key.dsize);
558 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
567 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
569 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
573 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
574 if (dfc->w == NULL) {
575 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
580 dfc->c = talloc_steal(dfc, c);
581 dfc->w->ctdb = ctdb_db->ctdb;
582 dfc->w->client_id = client->client_id;
584 DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
591 this is called when the ctdb daemon received a ctdb request call
592 from a local client over the unix domain socket
594 static void daemon_request_call_from_client(struct ctdb_client *client,
595 struct ctdb_req_call *c)
597 struct ctdb_call_state *state;
598 struct ctdb_db_context *ctdb_db;
599 struct daemon_call_state *dstate;
600 struct ctdb_call *call;
601 struct ctdb_ltdb_header header;
604 struct ctdb_context *ctdb = client->ctdb;
605 struct ctdb_daemon_packet_wrap *w;
607 CTDB_INCREMENT_STAT(ctdb, total_calls);
608 CTDB_DECREMENT_STAT(ctdb, pending_calls);
610 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
612 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
614 CTDB_DECREMENT_STAT(ctdb, pending_calls);
618 if (ctdb_db->unhealthy_reason) {
620 * this is just a warning, as the tdb should be empty anyway,
621 * and only persistent databases can be unhealthy, which doesn't
622 * use this code patch
624 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
625 ctdb_db->db_name, ctdb_db->unhealthy_reason));
629 key.dsize = c->keylen;
631 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
632 CTDB_NO_MEMORY_VOID(ctdb, w);
635 w->client_id = client->client_id;
637 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
638 (struct ctdb_req_header *)c, &data,
639 daemon_incoming_packet_wrap, w, true);
641 /* will retry later */
642 CTDB_DECREMENT_STAT(ctdb, pending_calls);
649 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
650 CTDB_DECREMENT_STAT(ctdb, pending_calls);
655 /* check if this fetch request is a duplicate for a
656 request we already have in flight. If so defer it until
657 the first request completes.
659 if (ctdb->tunable.fetch_collapse == 1) {
660 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
661 ret = ctdb_ltdb_unlock(ctdb_db, key);
663 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
669 /* Dont do READONLY if we dont have a tracking database */
670 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
671 c->flags &= ~CTDB_WANT_READONLY;
674 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
675 header.flags &= ~CTDB_REC_RO_FLAGS;
676 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
677 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
678 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
679 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
681 /* and clear out the tracking data */
682 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
683 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
687 /* if we are revoking, we must defer all other calls until the revoke
690 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
691 talloc_free(data.dptr);
692 ret = ctdb_ltdb_unlock(ctdb_db, key);
694 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
695 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
700 if ((header.dmaster == ctdb->pnn)
701 && (!(c->flags & CTDB_WANT_READONLY))
702 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
703 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
704 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
705 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
707 ret = ctdb_ltdb_unlock(ctdb_db, key);
709 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
710 ctdb_fatal(ctdb, "Failed to start record revoke");
712 talloc_free(data.dptr);
714 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
715 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
721 dstate = talloc(client, struct daemon_call_state);
722 if (dstate == NULL) {
723 ret = ctdb_ltdb_unlock(ctdb_db, key);
725 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
728 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
729 CTDB_DECREMENT_STAT(ctdb, pending_calls);
732 dstate->start_time = timeval_current();
733 dstate->client = client;
734 dstate->reqid = c->hdr.reqid;
735 talloc_steal(dstate, data.dptr);
737 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
739 ret = ctdb_ltdb_unlock(ctdb_db, key);
741 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
744 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
745 CTDB_DECREMENT_STAT(ctdb, pending_calls);
746 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
750 dstate->readonly_fetch = 0;
751 call->call_id = c->callid;
753 call->call_data.dptr = c->data + c->keylen;
754 call->call_data.dsize = c->calldatalen;
755 call->flags = c->flags;
757 if (c->flags & CTDB_WANT_READONLY) {
758 /* client wants readonly record, so translate this into a
759 fetch with header. remember what the client asked for
760 so we can remap the reply back to the proper format for
761 the client in the reply
763 dstate->client_callid = call->call_id;
764 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
765 dstate->readonly_fetch = 1;
768 if (header.dmaster == ctdb->pnn) {
769 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
771 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
772 if (ctdb->tunable.fetch_collapse == 1) {
773 /* This request triggered a remote fetch-lock.
774 set up a deferral for this key so any additional
775 fetch-locks are deferred until the current one
778 setup_deferred_fetch_locks(ctdb_db, call);
782 ret = ctdb_ltdb_unlock(ctdb_db, key);
784 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
788 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
789 CTDB_DECREMENT_STAT(ctdb, pending_calls);
790 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
793 talloc_steal(state, dstate);
794 talloc_steal(client, state);
796 state->async.fn = daemon_call_from_client_callback;
797 state->async.private_data = dstate;
801 static void daemon_request_control_from_client(struct ctdb_client *client,
802 struct ctdb_req_control *c);
804 /* data contains a packet from the client */
805 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
807 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
809 struct ctdb_context *ctdb = client->ctdb;
811 /* place the packet as a child of a tmp_ctx. We then use
812 talloc_free() below to free it. If any of the calls want
813 to keep it, then they will steal it somewhere else, and the
814 talloc_free() will be a no-op */
815 tmp_ctx = talloc_new(client);
816 talloc_steal(tmp_ctx, hdr);
818 if (hdr->ctdb_magic != CTDB_MAGIC) {
819 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
823 if (hdr->ctdb_version != CTDB_VERSION) {
824 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
828 switch (hdr->operation) {
830 CTDB_INCREMENT_STAT(ctdb, client.req_call);
831 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
834 case CTDB_REQ_MESSAGE:
835 CTDB_INCREMENT_STAT(ctdb, client.req_message);
836 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
839 case CTDB_REQ_CONTROL:
840 CTDB_INCREMENT_STAT(ctdb, client.req_control);
841 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
845 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
850 talloc_free(tmp_ctx);
854 called when the daemon gets a incoming packet
856 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
858 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
859 struct ctdb_req_header *hdr;
866 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
868 if (cnt < sizeof(*hdr)) {
869 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
873 hdr = (struct ctdb_req_header *)data;
874 if (cnt != hdr->length) {
875 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
876 (unsigned)hdr->length, (unsigned)cnt);
880 if (hdr->ctdb_magic != CTDB_MAGIC) {
881 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
885 if (hdr->ctdb_version != CTDB_VERSION) {
886 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
890 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
891 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
892 hdr->srcnode, hdr->destnode));
894 /* it is the responsibility of the incoming packet function to free 'data' */
895 daemon_incoming_packet(client, hdr);
899 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
901 if (client_pid->ctdb->client_pids != NULL) {
902 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
909 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
910 uint16_t flags, void *private_data)
912 struct sockaddr_un addr;
915 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
916 struct ctdb_client *client;
917 struct ctdb_client_pid_list *client_pid;
920 memset(&addr, 0, sizeof(addr));
922 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
928 set_close_on_exec(fd);
930 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
932 client = talloc_zero(ctdb, struct ctdb_client);
933 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
934 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
939 client->client_id = ctdb_reqid_new(ctdb, client);
940 client->pid = peer_pid;
942 client_pid = talloc(client, struct ctdb_client_pid_list);
943 if (client_pid == NULL) {
944 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
949 client_pid->ctdb = ctdb;
950 client_pid->pid = peer_pid;
951 client_pid->client = client;
953 DLIST_ADD(ctdb->client_pids, client_pid);
955 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
956 ctdb_daemon_read_cb, client,
957 "client-%u", client->pid);
959 talloc_set_destructor(client, ctdb_client_destructor);
960 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
967 create a unix domain socket and bind it
968 return a file descriptor open on the socket
970 static int ux_socket_bind(struct ctdb_context *ctdb)
972 struct sockaddr_un addr;
974 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
975 if (ctdb->daemon.sd == -1) {
979 set_close_on_exec(ctdb->daemon.sd);
980 set_nonblocking(ctdb->daemon.sd);
982 memset(&addr, 0, sizeof(addr));
983 addr.sun_family = AF_UNIX;
984 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
986 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
987 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
991 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
992 chmod(ctdb->daemon.name, 0700) != 0) {
993 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
998 if (listen(ctdb->daemon.sd, 100) != 0) {
999 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1006 close(ctdb->daemon.sd);
1007 ctdb->daemon.sd = -1;
1011 static void initialise_node_flags (struct ctdb_context *ctdb)
1013 if (ctdb->pnn == -1) {
1014 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1017 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1019 /* do we start out in DISABLED mode? */
1020 if (ctdb->start_as_disabled != 0) {
1021 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1022 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1024 /* do we start out in STOPPED mode? */
1025 if (ctdb->start_as_stopped != 0) {
1026 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1027 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1031 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1035 DEBUG(DEBUG_ALERT,("Failed to run setup event - exiting\n"));
1038 ctdb_run_notification_script(ctdb, "setup");
1040 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_FIRST_RECOVERY);
1042 /* tell all other nodes we've just started up */
1043 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1044 0, CTDB_CONTROL_STARTUP, 0,
1045 CTDB_CTRL_FLAG_NOREPLY,
1046 tdb_null, NULL, NULL);
1048 /* Start the recovery daemon */
1049 if (ctdb_start_recoverd(ctdb) != 0) {
1050 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1054 ctdb_start_periodic_events(ctdb);
1057 static struct timeval tevent_before_wait_ts;
1058 static struct timeval tevent_after_wait_ts;
1060 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1063 struct timeval diff;
1066 if (getpid() != ctdbd_pid) {
1070 now = timeval_current();
1073 case TEVENT_TRACE_BEFORE_WAIT:
1074 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1075 diff = timeval_until(&tevent_after_wait_ts, &now);
1076 if (diff.tv_sec > 3) {
1078 ("Handling event took %ld seconds!\n",
1082 tevent_before_wait_ts = now;
1085 case TEVENT_TRACE_AFTER_WAIT:
1086 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1087 diff = timeval_until(&tevent_before_wait_ts, &now);
1088 if (diff.tv_sec > 3) {
1090 ("No event for %ld seconds!\n",
1094 tevent_after_wait_ts = now;
1098 /* Do nothing for future tevent trace points */ ;
1102 static void ctdb_remove_pidfile(void)
1104 if (ctdbd_pidfile != NULL && !ctdb_is_child_process()) {
1105 if (unlink(ctdbd_pidfile) == 0) {
1106 DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1109 DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1115 static void ctdb_create_pidfile(pid_t pid)
1117 if (ctdbd_pidfile != NULL) {
1120 fp = fopen(ctdbd_pidfile, "w");
1123 ("Failed to open PID file %s\n", ctdbd_pidfile));
1127 fprintf(fp, "%d\n", pid);
1129 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1130 atexit(ctdb_remove_pidfile);
1135 start the protocol going as a daemon
1137 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
1140 struct fd_event *fde;
1141 const char *domain_socket_name;
1143 /* get rid of any old sockets */
1144 unlink(ctdb->daemon.name);
1146 /* create a unix domain stream socket to listen to */
1147 res = ux_socket_bind(ctdb);
1149 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
1153 if (do_fork && fork()) {
1157 tdb_reopen_all(false);
1162 if (open("/dev/null", O_RDONLY) != 0) {
1163 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1167 block_signal(SIGPIPE);
1169 ctdbd_pid = getpid();
1170 ctdb->ctdbd_pid = ctdbd_pid;
1171 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1172 CTDB_VERSION_STRING, ctdbd_pid));
1173 ctdb_create_pidfile(ctdb->ctdbd_pid);
1175 if (ctdb->do_setsched) {
1176 /* try to set us up as realtime */
1177 ctdb_set_scheduler(ctdb);
1180 /* ensure the socket is deleted on exit of the daemon */
1181 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1182 if (domain_socket_name == NULL) {
1183 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1187 ctdb->ev = event_context_init(NULL);
1188 tevent_loop_allow_nesting(ctdb->ev);
1189 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, NULL);
1190 ret = ctdb_init_tevent_logging(ctdb);
1192 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1196 /* set up a handler to pick up sigchld */
1197 if (ctdb_init_sigchld(ctdb) == NULL) {
1198 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1202 ctdb_set_child_logging(ctdb);
1204 if (start_syslog_daemon(ctdb)) {
1205 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1210 /* initialize statistics collection */
1211 ctdb_statistics_init(ctdb);
1213 /* force initial recovery for election */
1214 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1216 if (strcmp(ctdb->transport, "tcp") == 0) {
1217 int ctdb_tcp_init(struct ctdb_context *);
1218 ret = ctdb_tcp_init(ctdb);
1220 #ifdef USE_INFINIBAND
1221 if (strcmp(ctdb->transport, "ib") == 0) {
1222 int ctdb_ibw_init(struct ctdb_context *);
1223 ret = ctdb_ibw_init(ctdb);
1227 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1231 if (ctdb->methods == NULL) {
1232 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1233 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1236 /* initialise the transport */
1237 if (ctdb->methods->initialise(ctdb) != 0) {
1238 ctdb_fatal(ctdb, "transport failed to initialise");
1241 initialise_node_flags(ctdb);
1243 if (public_address_list) {
1244 ctdb->public_addresses_file = public_address_list;
1245 ret = ctdb_set_public_addresses(ctdb, true);
1247 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1250 if (ctdb->do_checkpublicip) {
1251 ctdb_start_monitoring_interfaces(ctdb);
1256 /* attach to existing databases */
1257 if (ctdb_attach_databases(ctdb) != 0) {
1258 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1261 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1262 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1264 ctdb_fatal(ctdb, "Failed to run init event\n");
1266 ctdb_run_notification_script(ctdb, "init");
1268 /* start frozen, then let the first election sort things out */
1269 if (!ctdb_blocking_freeze(ctdb)) {
1270 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1273 /* now start accepting clients, only can do this once frozen */
1274 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
1276 ctdb_accept_client, ctdb);
1277 tevent_fd_set_auto_close(fde);
1279 /* release any IPs we hold from previous runs of the daemon */
1280 if (ctdb->tunable.disable_ip_failover == 0) {
1281 ctdb_release_all_ips(ctdb);
1285 /* Make sure we log something when the daemon terminates */
1286 atexit(print_exit_message);
1288 /* Start the transport */
1289 if (ctdb->methods->start(ctdb) != 0) {
1290 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1291 ctdb_fatal(ctdb, "transport failed to start");
1294 /* Recovery daemon and timed events are started from the
1295 * callback, only after the setup event completes
1298 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1299 ret = ctdb_event_script_callback(ctdb,
1301 ctdb_setup_event_callback,
1308 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1312 ctdb_lockdown_memory(ctdb);
1314 /* go into a wait loop to allow other nodes to complete */
1315 event_loop_wait(ctdb->ev);
1317 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1322 allocate a packet for use in daemon<->daemon communication
1324 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1325 TALLOC_CTX *mem_ctx,
1326 enum ctdb_operation operation,
1327 size_t length, size_t slength,
1331 struct ctdb_req_header *hdr;
1333 length = MAX(length, slength);
1334 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1336 if (ctdb->methods == NULL) {
1337 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1338 operation, (unsigned)length));
1342 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1344 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1345 operation, (unsigned)length));
1348 talloc_set_name_const(hdr, type);
1349 memset(hdr, 0, slength);
1350 hdr->length = length;
1351 hdr->operation = operation;
1352 hdr->ctdb_magic = CTDB_MAGIC;
1353 hdr->ctdb_version = CTDB_VERSION;
1354 hdr->generation = ctdb->vnn_map->generation;
1355 hdr->srcnode = ctdb->pnn;
1360 struct daemon_control_state {
1361 struct daemon_control_state *next, *prev;
1362 struct ctdb_client *client;
1363 struct ctdb_req_control *c;
1365 struct ctdb_node *node;
1369 callback when a control reply comes in
1371 static void daemon_control_callback(struct ctdb_context *ctdb,
1372 int32_t status, TDB_DATA data,
1373 const char *errormsg,
1376 struct daemon_control_state *state = talloc_get_type(private_data,
1377 struct daemon_control_state);
1378 struct ctdb_client *client = state->client;
1379 struct ctdb_reply_control *r;
1383 /* construct a message to send to the client containing the data */
1384 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1386 len += strlen(errormsg);
1388 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1389 struct ctdb_reply_control);
1390 CTDB_NO_MEMORY_VOID(ctdb, r);
1392 r->hdr.reqid = state->reqid;
1394 r->datalen = data.dsize;
1396 memcpy(&r->data[0], data.dptr, data.dsize);
1398 r->errorlen = strlen(errormsg);
1399 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1402 ret = daemon_queue_send(client, &r->hdr);
1409 fail all pending controls to a disconnected node
1411 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1413 struct daemon_control_state *state;
1414 while ((state = node->pending_controls)) {
1415 DLIST_REMOVE(node->pending_controls, state);
1416 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1417 "node is disconnected", state);
1422 destroy a daemon_control_state
1424 static int daemon_control_destructor(struct daemon_control_state *state)
1427 DLIST_REMOVE(state->node->pending_controls, state);
1433 this is called when the ctdb daemon received a ctdb request control
1434 from a local client over the unix domain socket
1436 static void daemon_request_control_from_client(struct ctdb_client *client,
1437 struct ctdb_req_control *c)
1441 struct daemon_control_state *state;
1442 TALLOC_CTX *tmp_ctx = talloc_new(client);
1444 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1445 c->hdr.destnode = client->ctdb->pnn;
1448 state = talloc(client, struct daemon_control_state);
1449 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1451 state->client = client;
1452 state->c = talloc_steal(state, c);
1453 state->reqid = c->hdr.reqid;
1454 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1455 state->node = client->ctdb->nodes[c->hdr.destnode];
1456 DLIST_ADD(state->node->pending_controls, state);
1461 talloc_set_destructor(state, daemon_control_destructor);
1463 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1464 talloc_steal(tmp_ctx, state);
1467 data.dptr = &c->data[0];
1468 data.dsize = c->datalen;
1469 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1470 c->srvid, c->opcode, client->client_id,
1472 data, daemon_control_callback,
1475 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1479 talloc_free(tmp_ctx);
1483 register a call function
1485 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1486 ctdb_fn_t fn, int id)
1488 struct ctdb_registered_call *call;
1489 struct ctdb_db_context *ctdb_db;
1491 ctdb_db = find_ctdb_db(ctdb, db_id);
1492 if (ctdb_db == NULL) {
1496 call = talloc(ctdb_db, struct ctdb_registered_call);
1500 DLIST_ADD(ctdb_db->calls, call);
1507 this local messaging handler is ugly, but is needed to prevent
1508 recursion in ctdb_send_message() when the destination node is the
1509 same as the source node
1511 struct ctdb_local_message {
1512 struct ctdb_context *ctdb;
1517 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1518 struct timeval t, void *private_data)
1520 struct ctdb_local_message *m = talloc_get_type(private_data,
1521 struct ctdb_local_message);
1524 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1526 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1527 (unsigned long long)m->srvid));
1532 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1534 struct ctdb_local_message *m;
1535 m = talloc(ctdb, struct ctdb_local_message);
1536 CTDB_NO_MEMORY(ctdb, m);
1541 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1542 if (m->data.dptr == NULL) {
1547 /* this needs to be done as an event to prevent recursion */
1548 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1555 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1556 uint64_t srvid, TDB_DATA data)
1558 struct ctdb_req_message *r;
1561 if (ctdb->methods == NULL) {
1562 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1566 /* see if this is a message to ourselves */
1567 if (pnn == ctdb->pnn) {
1568 return ctdb_local_message(ctdb, srvid, data);
1571 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1572 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1573 struct ctdb_req_message);
1574 CTDB_NO_MEMORY(ctdb, r);
1576 r->hdr.destnode = pnn;
1578 r->datalen = data.dsize;
1579 memcpy(&r->data[0], data.dptr, data.dsize);
1581 ctdb_queue_packet(ctdb, &r->hdr);
1589 struct ctdb_client_notify_list {
1590 struct ctdb_client_notify_list *next, *prev;
1591 struct ctdb_context *ctdb;
1597 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1601 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1603 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1605 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1611 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1613 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1614 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1615 struct ctdb_client_notify_list *nl;
1617 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1619 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1620 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1624 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1625 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1630 if (client == NULL) {
1631 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1635 for(nl=client->notify; nl; nl=nl->next) {
1636 if (nl->srvid == notify->srvid) {
1641 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1645 nl = talloc(client, struct ctdb_client_notify_list);
1646 CTDB_NO_MEMORY(ctdb, nl);
1648 nl->srvid = notify->srvid;
1649 nl->data.dsize = notify->len;
1650 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1651 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1652 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1654 DLIST_ADD(client->notify, nl);
1655 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1660 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1662 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1663 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1664 struct ctdb_client_notify_list *nl;
1666 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1668 if (client == NULL) {
1669 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1673 for(nl=client->notify; nl; nl=nl->next) {
1674 if (nl->srvid == notify->srvid) {
1679 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1683 DLIST_REMOVE(client->notify, nl);
1684 talloc_set_destructor(nl, NULL);
1690 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1692 struct ctdb_client_pid_list *client_pid;
1694 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1695 if (client_pid->pid == pid) {
1696 return client_pid->client;
1703 /* This control is used by samba when probing if a process (of a samba daemon)
1705 Samba does this when it needs/wants to check if a subrecord in one of the
1706 databases is still valied, or if it is stale and can be removed.
1707 If the node is in unhealthy or stopped state we just kill of the samba
1708 process holding htis sub-record and return to the calling samba that
1709 the process does not exist.
1710 This allows us to forcefully recall subrecords registered by samba processes
1711 on banned and stopped nodes.
1713 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1715 struct ctdb_client *client;
1717 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1718 client = ctdb_find_client_by_pid(ctdb, pid);
1719 if (client != NULL) {
1720 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1721 talloc_free(client);
1726 return kill(pid, 0);