4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
49 struct timeval t, void *private_data)
51 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
53 if (getpid() != ctdbd_pid) {
57 event_add_timed(ctdb->ev, ctdb,
58 timeval_current_ofs(1, 0),
59 ctdb_time_tick, ctdb);
62 /* Used to trigger a dummy event once per second, to make
63 * detection of hangs more reliable.
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
67 event_add_timed(ctdb->ev, ctdb,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick, ctdb);
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
76 if (ctdb->methods == NULL) {
77 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
81 /* start the transport running */
82 if (ctdb->methods->start(ctdb) != 0) {
83 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84 ctdb_fatal(ctdb, "transport failed to start");
87 /* start the recovery daemon process */
88 if (ctdb_start_recoverd(ctdb) != 0) {
89 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
93 /* Make sure we log something when the daemon terminates */
94 atexit(print_exit_message);
96 /* start monitoring for connected/disconnected nodes */
97 ctdb_start_keepalive(ctdb);
99 /* start monitoring for node health */
100 ctdb_start_monitoring(ctdb);
102 /* start periodic update of tcp tickle lists */
103 ctdb_start_tcp_tickle_update(ctdb);
105 /* start listening for recovery daemon pings */
106 ctdb_control_recd_ping(ctdb);
108 /* start listening to timer ticks */
109 ctdb_start_time_tickd(ctdb);
112 static void block_signal(int signum)
114 struct sigaction act;
116 memset(&act, 0, sizeof(act));
118 act.sa_handler = SIG_IGN;
119 sigemptyset(&act.sa_mask);
120 sigaddset(&act.sa_mask, signum);
121 sigaction(signum, &act, NULL);
126 send a packet to a client
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
130 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131 if (hdr->operation == CTDB_REQ_MESSAGE) {
132 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
142 message handler for when we are in daemon mode. This redirects the message
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
146 TDB_DATA data, void *private_data)
148 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149 struct ctdb_req_message *r;
152 /* construct a message to send to the client containing the data */
153 len = offsetof(struct ctdb_req_message, data) + data.dsize;
154 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
155 len, struct ctdb_req_message);
156 CTDB_NO_MEMORY_VOID(ctdb, r);
158 talloc_set_name_const(r, "req_message packet");
161 r->datalen = data.dsize;
162 memcpy(&r->data[0], data.dptr, data.dsize);
164 daemon_queue_send(client, &r->hdr);
170 this is called when the ctdb daemon received a ctdb request to
171 set the srvid from the client
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
175 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
177 if (client == NULL) {
178 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
181 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
183 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
184 (unsigned long long)srvid));
186 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
187 (unsigned long long)srvid));
194 this is called when the ctdb daemon received a ctdb request to
195 remove a srvid from the client
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200 if (client == NULL) {
201 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
204 return ctdb_deregister_message_handler(ctdb, srvid, client);
207 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
214 if ((indata.dsize % sizeof(uint64_t)) != 0) {
215 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
216 "size=%d\n", (int)indata.dsize));
220 ids = (uint64_t *)indata.dptr;
221 num_ids = indata.dsize / 8;
223 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
224 if (results == NULL) {
225 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
228 for (i=0; i<num_ids; i++) {
229 struct ctdb_message_list *ml;
230 for (ml=ctdb->message_list; ml; ml=ml->next) {
231 if (ml->srvid == ids[i]) {
236 results[i/8] |= (1 << (i%8));
239 outdata->dptr = (uint8_t *)results;
240 outdata->dsize = talloc_get_size(results);
245 destroy a ctdb_client
247 static int ctdb_client_destructor(struct ctdb_client *client)
249 struct ctdb_db_context *ctdb_db;
251 ctdb_takeover_client_destructor_hook(client);
252 ctdb_reqid_remove(client->ctdb, client->client_id);
253 CTDB_DECREMENT_STAT(client->ctdb, num_clients);
255 if (client->num_persistent_updates != 0) {
256 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
257 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
259 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
261 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
262 "commit active. Forcing recovery.\n"));
263 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
265 /* legacy trans2 transaction state: */
266 ctdb_db->transaction_active = false;
269 * trans3 transaction state:
271 * The destructor sets the pointer to NULL.
273 talloc_free(ctdb_db->persistent_state);
281 this is called when the ctdb daemon received a ctdb request message
282 from a local client over the unix domain socket
284 static void daemon_request_message_from_client(struct ctdb_client *client,
285 struct ctdb_req_message *c)
290 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
291 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
294 /* maybe the message is for another client on this node */
295 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
296 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
300 /* its for a remote node */
301 data.dptr = &c->data[0];
302 data.dsize = c->datalen;
303 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
306 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
312 struct daemon_call_state {
313 struct ctdb_client *client;
315 struct ctdb_call *call;
316 struct timeval start_time;
318 /* readonly request ? */
319 uint32_t readonly_fetch;
320 uint32_t client_callid;
324 complete a call from a client
326 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
328 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
329 struct daemon_call_state);
330 struct ctdb_reply_call *r;
333 struct ctdb_client *client = dstate->client;
334 struct ctdb_db_context *ctdb_db = state->ctdb_db;
336 talloc_steal(client, dstate);
337 talloc_steal(dstate, dstate->call);
339 res = ctdb_daemon_call_recv(state, dstate->call);
341 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
342 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
344 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
348 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
349 /* If the client asked for readonly FETCH, we remapped this to
350 FETCH_WITH_HEADER when calling the daemon. So we must
351 strip the extra header off the reply data before passing
352 it back to the client.
354 if (dstate->readonly_fetch
355 && dstate->client_callid == CTDB_FETCH_FUNC) {
356 length -= sizeof(struct ctdb_ltdb_header);
359 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
360 length, struct ctdb_reply_call);
362 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
363 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
364 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
367 r->hdr.reqid = dstate->reqid;
368 r->status = dstate->call->status;
370 if (dstate->readonly_fetch
371 && dstate->client_callid == CTDB_FETCH_FUNC) {
372 /* client only asked for a FETCH so we must strip off
373 the extra ctdb_ltdb header
375 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
376 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
378 r->datalen = dstate->call->reply_data.dsize;
379 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
382 res = daemon_queue_send(client, &r->hdr);
384 /* client is dead - return immediately */
388 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
390 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
391 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
395 struct ctdb_daemon_packet_wrap {
396 struct ctdb_context *ctdb;
401 a wrapper to catch disconnected clients
403 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
405 struct ctdb_client *client;
406 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
407 struct ctdb_daemon_packet_wrap);
409 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
413 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
414 if (client == NULL) {
415 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
423 daemon_incoming_packet(client, hdr);
426 struct ctdb_deferred_fetch_call {
427 struct ctdb_deferred_fetch_call *next, *prev;
428 struct ctdb_req_call *c;
429 struct ctdb_daemon_packet_wrap *w;
432 struct ctdb_deferred_fetch_queue {
433 struct ctdb_deferred_fetch_call *deferred_calls;
436 struct ctdb_deferred_requeue {
437 struct ctdb_deferred_fetch_call *dfc;
438 struct ctdb_client *client;
441 /* called from a timer event and starts reprocessing the deferred call.*/
442 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
443 struct timeval t, void *private_data)
445 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
446 struct ctdb_client *client = dfr->client;
448 talloc_steal(client, dfr->dfc->c);
449 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
453 /* the referral context is destroyed either after a timeout or when the initial
454 fetch-lock has finished.
455 at this stage, immediately start reprocessing the queued up deferred
456 calls so they get reprocessed immediately (and since we are dmaster at
457 this stage, trigger the waiting smbd processes to pick up and aquire the
460 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
463 /* need to reprocess the packets from the queue explicitely instead of
464 just using a normal destructor since we want, need, to
465 call the clients in the same oder as the requests queued up
467 while (dfq->deferred_calls != NULL) {
468 struct ctdb_client *client;
469 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
470 struct ctdb_deferred_requeue *dfr;
472 DLIST_REMOVE(dfq->deferred_calls, dfc);
474 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
475 if (client == NULL) {
476 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
481 /* process it by pushing it back onto the eventloop */
482 dfr = talloc(client, struct ctdb_deferred_requeue);
484 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
488 dfr->dfc = talloc_steal(dfr, dfc);
489 dfr->client = client;
491 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
497 /* insert the new deferral context into the rb tree.
498 there should never be a pre-existing context here, but check for it
499 warn and destroy the previous context if there is already a deferral context
502 static void *insert_dfq_callback(void *parm, void *data)
505 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
511 /* if the original fetch-lock did not complete within a reasonable time,
512 free the context and context for all deferred requests to cause them to be
513 re-inserted into the event system.
515 static void dfq_timeout(struct event_context *ev, struct timed_event *te,
516 struct timeval t, void *private_data)
518 talloc_free(private_data);
521 /* This function is used in the local daemon to register a KEY in a database
523 While the remote fetch is in-flight, any futher attempts to re-fetch the
524 same record will be deferred until the fetch completes.
526 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
529 struct ctdb_deferred_fetch_queue *dfq;
531 k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
533 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
537 k[0] = (call->key.dsize + 3) / 4 + 1;
538 memcpy(&k[1], call->key.dptr, call->key.dsize);
540 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
542 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
546 dfq->deferred_calls = NULL;
548 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
550 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
552 /* if the fetch havent completed in 30 seconds, just tear it all down
553 and let it try again as the events are reissued */
554 event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
560 /* check if this is a duplicate request to a fetch already in-flight
561 if it is, make this call deferred to be reprocessed later when
562 the in-flight fetch completes.
564 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
567 struct ctdb_deferred_fetch_queue *dfq;
568 struct ctdb_deferred_fetch_call *dfc;
570 k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
572 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
576 k[0] = (key.dsize + 3) / 4 + 1;
577 memcpy(&k[1], key.dptr, key.dsize);
579 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
588 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
590 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
594 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
595 if (dfc->w == NULL) {
596 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
601 dfc->c = talloc_steal(dfc, c);
602 dfc->w->ctdb = ctdb_db->ctdb;
603 dfc->w->client_id = client->client_id;
605 DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
612 this is called when the ctdb daemon received a ctdb request call
613 from a local client over the unix domain socket
615 static void daemon_request_call_from_client(struct ctdb_client *client,
616 struct ctdb_req_call *c)
618 struct ctdb_call_state *state;
619 struct ctdb_db_context *ctdb_db;
620 struct daemon_call_state *dstate;
621 struct ctdb_call *call;
622 struct ctdb_ltdb_header header;
625 struct ctdb_context *ctdb = client->ctdb;
626 struct ctdb_daemon_packet_wrap *w;
628 CTDB_INCREMENT_STAT(ctdb, total_calls);
629 CTDB_DECREMENT_STAT(ctdb, pending_calls);
631 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
633 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
635 CTDB_DECREMENT_STAT(ctdb, pending_calls);
639 if (ctdb_db->unhealthy_reason) {
641 * this is just a warning, as the tdb should be empty anyway,
642 * and only persistent databases can be unhealthy, which doesn't
643 * use this code patch
645 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
646 ctdb_db->db_name, ctdb_db->unhealthy_reason));
650 key.dsize = c->keylen;
652 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
653 CTDB_NO_MEMORY_VOID(ctdb, w);
656 w->client_id = client->client_id;
658 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
659 (struct ctdb_req_header *)c, &data,
660 daemon_incoming_packet_wrap, w, True);
662 /* will retry later */
663 CTDB_DECREMENT_STAT(ctdb, pending_calls);
670 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
671 CTDB_DECREMENT_STAT(ctdb, pending_calls);
676 /* check if this fetch request is a duplicate for a
677 request we already have in flight. If so defer it until
678 the first request completes.
680 if (ctdb->tunable.fetch_collapse == 1) {
681 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
682 ret = ctdb_ltdb_unlock(ctdb_db, key);
684 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
690 /* Dont do READONLY if we dont have a tracking database */
691 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
692 c->flags &= ~CTDB_WANT_READONLY;
695 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
696 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
697 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
698 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
699 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
700 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
702 /* and clear out the tracking data */
703 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
704 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
708 /* if we are revoking, we must defer all other calls until the revoke
711 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
712 talloc_free(data.dptr);
713 ret = ctdb_ltdb_unlock(ctdb_db, key);
715 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
716 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
721 if ((header.dmaster == ctdb->pnn)
722 && (!(c->flags & CTDB_WANT_READONLY))
723 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
724 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
725 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
726 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
728 ret = ctdb_ltdb_unlock(ctdb_db, key);
730 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
731 ctdb_fatal(ctdb, "Failed to start record revoke");
733 talloc_free(data.dptr);
735 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
736 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
742 dstate = talloc(client, struct daemon_call_state);
743 if (dstate == NULL) {
744 ret = ctdb_ltdb_unlock(ctdb_db, key);
746 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
749 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
750 CTDB_DECREMENT_STAT(ctdb, pending_calls);
753 dstate->start_time = timeval_current();
754 dstate->client = client;
755 dstate->reqid = c->hdr.reqid;
756 talloc_steal(dstate, data.dptr);
758 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
760 ret = ctdb_ltdb_unlock(ctdb_db, key);
762 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
765 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
766 CTDB_DECREMENT_STAT(ctdb, pending_calls);
767 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
771 dstate->readonly_fetch = 0;
772 call->call_id = c->callid;
774 call->call_data.dptr = c->data + c->keylen;
775 call->call_data.dsize = c->calldatalen;
776 call->flags = c->flags;
778 if (c->flags & CTDB_WANT_READONLY) {
779 /* client wants readonly record, so translate this into a
780 fetch with header. remember what the client asked for
781 so we can remap the reply back to the proper format for
782 the client in the reply
784 dstate->client_callid = call->call_id;
785 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
786 dstate->readonly_fetch = 1;
789 if (header.dmaster == ctdb->pnn) {
790 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
792 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
793 if (ctdb->tunable.fetch_collapse == 1) {
794 /* This request triggered a remote fetch-lock.
795 set up a deferral for this key so any additional
796 fetch-locks are deferred until the current one
799 setup_deferred_fetch_locks(ctdb_db, call);
803 ret = ctdb_ltdb_unlock(ctdb_db, key);
805 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
809 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
810 CTDB_DECREMENT_STAT(ctdb, pending_calls);
811 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
814 talloc_steal(state, dstate);
815 talloc_steal(client, state);
817 state->async.fn = daemon_call_from_client_callback;
818 state->async.private_data = dstate;
822 static void daemon_request_control_from_client(struct ctdb_client *client,
823 struct ctdb_req_control *c);
825 /* data contains a packet from the client */
826 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
828 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
830 struct ctdb_context *ctdb = client->ctdb;
832 /* place the packet as a child of a tmp_ctx. We then use
833 talloc_free() below to free it. If any of the calls want
834 to keep it, then they will steal it somewhere else, and the
835 talloc_free() will be a no-op */
836 tmp_ctx = talloc_new(client);
837 talloc_steal(tmp_ctx, hdr);
839 if (hdr->ctdb_magic != CTDB_MAGIC) {
840 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
844 if (hdr->ctdb_version != CTDB_VERSION) {
845 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
849 switch (hdr->operation) {
851 CTDB_INCREMENT_STAT(ctdb, client.req_call);
852 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
855 case CTDB_REQ_MESSAGE:
856 CTDB_INCREMENT_STAT(ctdb, client.req_message);
857 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
860 case CTDB_REQ_CONTROL:
861 CTDB_INCREMENT_STAT(ctdb, client.req_control);
862 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
866 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
871 talloc_free(tmp_ctx);
875 called when the daemon gets a incoming packet
877 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
879 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
880 struct ctdb_req_header *hdr;
887 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
889 if (cnt < sizeof(*hdr)) {
890 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
894 hdr = (struct ctdb_req_header *)data;
895 if (cnt != hdr->length) {
896 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
897 (unsigned)hdr->length, (unsigned)cnt);
901 if (hdr->ctdb_magic != CTDB_MAGIC) {
902 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
906 if (hdr->ctdb_version != CTDB_VERSION) {
907 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
911 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
912 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
913 hdr->srcnode, hdr->destnode));
915 /* it is the responsibility of the incoming packet function to free 'data' */
916 daemon_incoming_packet(client, hdr);
920 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
922 if (client_pid->ctdb->client_pids != NULL) {
923 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
930 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
931 uint16_t flags, void *private_data)
933 struct sockaddr_un addr;
936 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
937 struct ctdb_client *client;
938 struct ctdb_client_pid_list *client_pid;
941 memset(&addr, 0, sizeof(addr));
943 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
949 set_close_on_exec(fd);
951 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
953 client = talloc_zero(ctdb, struct ctdb_client);
954 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
955 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
960 client->client_id = ctdb_reqid_new(ctdb, client);
961 client->pid = peer_pid;
963 client_pid = talloc(client, struct ctdb_client_pid_list);
964 if (client_pid == NULL) {
965 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
970 client_pid->ctdb = ctdb;
971 client_pid->pid = peer_pid;
972 client_pid->client = client;
974 DLIST_ADD(ctdb->client_pids, client_pid);
976 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
977 ctdb_daemon_read_cb, client,
978 "client-%u", client->pid);
980 talloc_set_destructor(client, ctdb_client_destructor);
981 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
982 CTDB_INCREMENT_STAT(ctdb, num_clients);
988 create a unix domain socket and bind it
989 return a file descriptor open on the socket
991 static int ux_socket_bind(struct ctdb_context *ctdb)
993 struct sockaddr_un addr;
995 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
996 if (ctdb->daemon.sd == -1) {
1000 set_close_on_exec(ctdb->daemon.sd);
1001 set_nonblocking(ctdb->daemon.sd);
1003 memset(&addr, 0, sizeof(addr));
1004 addr.sun_family = AF_UNIX;
1005 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
1007 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1008 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1012 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1013 chmod(ctdb->daemon.name, 0700) != 0) {
1014 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1019 if (listen(ctdb->daemon.sd, 100) != 0) {
1020 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1027 close(ctdb->daemon.sd);
1028 ctdb->daemon.sd = -1;
1032 static void sig_child_handler(struct event_context *ev,
1033 struct signal_event *se, int signum, int count,
1037 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
1042 pid = waitpid(-1, &status, WNOHANG);
1044 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
1048 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
1053 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1057 ctdb_fatal(ctdb, "Failed to run setup event\n");
1060 ctdb_run_notification_script(ctdb, "setup");
1062 /* tell all other nodes we've just started up */
1063 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1064 0, CTDB_CONTROL_STARTUP, 0,
1065 CTDB_CTRL_FLAG_NOREPLY,
1066 tdb_null, NULL, NULL);
1070 start the protocol going as a daemon
1072 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
1075 struct fd_event *fde;
1076 const char *domain_socket_name;
1077 struct signal_event *se;
1079 /* get rid of any old sockets */
1080 unlink(ctdb->daemon.name);
1082 /* create a unix domain stream socket to listen to */
1083 res = ux_socket_bind(ctdb);
1085 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
1089 if (do_fork && fork()) {
1093 tdb_reopen_all(False);
1098 if (open("/dev/null", O_RDONLY) != 0) {
1099 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1103 block_signal(SIGPIPE);
1105 ctdbd_pid = getpid();
1106 ctdb->ctdbd_pid = ctdbd_pid;
1109 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
1111 if (ctdb->do_setsched) {
1112 /* try to set us up as realtime */
1113 ctdb_set_scheduler(ctdb);
1116 /* ensure the socket is deleted on exit of the daemon */
1117 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1118 if (domain_socket_name == NULL) {
1119 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1123 ctdb->ev = event_context_init(NULL);
1124 tevent_loop_allow_nesting(ctdb->ev);
1125 ret = ctdb_init_tevent_logging(ctdb);
1127 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1131 ctdb_set_child_logging(ctdb);
1133 /* initialize statistics collection */
1134 ctdb_statistics_init(ctdb);
1136 /* force initial recovery for election */
1137 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1139 if (strcmp(ctdb->transport, "tcp") == 0) {
1140 int ctdb_tcp_init(struct ctdb_context *);
1141 ret = ctdb_tcp_init(ctdb);
1143 #ifdef USE_INFINIBAND
1144 if (strcmp(ctdb->transport, "ib") == 0) {
1145 int ctdb_ibw_init(struct ctdb_context *);
1146 ret = ctdb_ibw_init(ctdb);
1150 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1154 if (ctdb->methods == NULL) {
1155 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1156 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1159 /* initialise the transport */
1160 if (ctdb->methods->initialise(ctdb) != 0) {
1161 ctdb_fatal(ctdb, "transport failed to initialise");
1163 if (public_address_list) {
1164 ret = ctdb_set_public_addresses(ctdb, public_address_list);
1166 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1172 /* attach to existing databases */
1173 if (ctdb_attach_databases(ctdb) != 0) {
1174 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1177 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1179 ctdb_fatal(ctdb, "Failed to run init event\n");
1181 ctdb_run_notification_script(ctdb, "init");
1183 /* start frozen, then let the first election sort things out */
1184 if (ctdb_blocking_freeze(ctdb)) {
1185 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1188 /* now start accepting clients, only can do this once frozen */
1189 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
1191 ctdb_accept_client, ctdb);
1192 tevent_fd_set_auto_close(fde);
1194 /* release any IPs we hold from previous runs of the daemon */
1195 if (ctdb->tunable.disable_ip_failover == 0) {
1196 ctdb_release_all_ips(ctdb);
1199 /* start the transport going */
1200 ctdb_start_transport(ctdb);
1202 /* set up a handler to pick up sigchld */
1203 se = event_add_signal(ctdb->ev, ctdb,
1208 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1212 ret = ctdb_event_script_callback(ctdb,
1214 ctdb_setup_event_callback,
1221 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1226 if (start_syslog_daemon(ctdb)) {
1227 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1232 ctdb_lockdown_memory(ctdb);
1234 /* go into a wait loop to allow other nodes to complete */
1235 event_loop_wait(ctdb->ev);
1237 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1242 allocate a packet for use in daemon<->daemon communication
1244 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1245 TALLOC_CTX *mem_ctx,
1246 enum ctdb_operation operation,
1247 size_t length, size_t slength,
1251 struct ctdb_req_header *hdr;
1253 length = MAX(length, slength);
1254 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1256 if (ctdb->methods == NULL) {
1257 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1258 operation, (unsigned)length));
1262 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1264 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1265 operation, (unsigned)length));
1268 talloc_set_name_const(hdr, type);
1269 memset(hdr, 0, slength);
1270 hdr->length = length;
1271 hdr->operation = operation;
1272 hdr->ctdb_magic = CTDB_MAGIC;
1273 hdr->ctdb_version = CTDB_VERSION;
1274 hdr->generation = ctdb->vnn_map->generation;
1275 hdr->srcnode = ctdb->pnn;
1280 struct daemon_control_state {
1281 struct daemon_control_state *next, *prev;
1282 struct ctdb_client *client;
1283 struct ctdb_req_control *c;
1285 struct ctdb_node *node;
1289 callback when a control reply comes in
1291 static void daemon_control_callback(struct ctdb_context *ctdb,
1292 int32_t status, TDB_DATA data,
1293 const char *errormsg,
1296 struct daemon_control_state *state = talloc_get_type(private_data,
1297 struct daemon_control_state);
1298 struct ctdb_client *client = state->client;
1299 struct ctdb_reply_control *r;
1303 /* construct a message to send to the client containing the data */
1304 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1306 len += strlen(errormsg);
1308 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1309 struct ctdb_reply_control);
1310 CTDB_NO_MEMORY_VOID(ctdb, r);
1312 r->hdr.reqid = state->reqid;
1314 r->datalen = data.dsize;
1316 memcpy(&r->data[0], data.dptr, data.dsize);
1318 r->errorlen = strlen(errormsg);
1319 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1322 ret = daemon_queue_send(client, &r->hdr);
1329 fail all pending controls to a disconnected node
1331 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1333 struct daemon_control_state *state;
1334 while ((state = node->pending_controls)) {
1335 DLIST_REMOVE(node->pending_controls, state);
1336 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1337 "node is disconnected", state);
1342 destroy a daemon_control_state
1344 static int daemon_control_destructor(struct daemon_control_state *state)
1347 DLIST_REMOVE(state->node->pending_controls, state);
1353 this is called when the ctdb daemon received a ctdb request control
1354 from a local client over the unix domain socket
1356 static void daemon_request_control_from_client(struct ctdb_client *client,
1357 struct ctdb_req_control *c)
1361 struct daemon_control_state *state;
1362 TALLOC_CTX *tmp_ctx = talloc_new(client);
1364 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1365 c->hdr.destnode = client->ctdb->pnn;
1368 state = talloc(client, struct daemon_control_state);
1369 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1371 state->client = client;
1372 state->c = talloc_steal(state, c);
1373 state->reqid = c->hdr.reqid;
1374 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1375 state->node = client->ctdb->nodes[c->hdr.destnode];
1376 DLIST_ADD(state->node->pending_controls, state);
1381 talloc_set_destructor(state, daemon_control_destructor);
1383 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1384 talloc_steal(tmp_ctx, state);
1387 data.dptr = &c->data[0];
1388 data.dsize = c->datalen;
1389 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1390 c->srvid, c->opcode, client->client_id,
1392 data, daemon_control_callback,
1395 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1399 talloc_free(tmp_ctx);
1403 register a call function
1405 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1406 ctdb_fn_t fn, int id)
1408 struct ctdb_registered_call *call;
1409 struct ctdb_db_context *ctdb_db;
1411 ctdb_db = find_ctdb_db(ctdb, db_id);
1412 if (ctdb_db == NULL) {
1416 call = talloc(ctdb_db, struct ctdb_registered_call);
1420 DLIST_ADD(ctdb_db->calls, call);
1427 this local messaging handler is ugly, but is needed to prevent
1428 recursion in ctdb_send_message() when the destination node is the
1429 same as the source node
1431 struct ctdb_local_message {
1432 struct ctdb_context *ctdb;
1437 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1438 struct timeval t, void *private_data)
1440 struct ctdb_local_message *m = talloc_get_type(private_data,
1441 struct ctdb_local_message);
1444 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1446 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1447 (unsigned long long)m->srvid));
1452 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1454 struct ctdb_local_message *m;
1455 m = talloc(ctdb, struct ctdb_local_message);
1456 CTDB_NO_MEMORY(ctdb, m);
1461 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1462 if (m->data.dptr == NULL) {
1467 /* this needs to be done as an event to prevent recursion */
1468 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1475 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1476 uint64_t srvid, TDB_DATA data)
1478 struct ctdb_req_message *r;
1481 if (ctdb->methods == NULL) {
1482 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1486 /* see if this is a message to ourselves */
1487 if (pnn == ctdb->pnn) {
1488 return ctdb_local_message(ctdb, srvid, data);
1491 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1492 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1493 struct ctdb_req_message);
1494 CTDB_NO_MEMORY(ctdb, r);
1496 r->hdr.destnode = pnn;
1498 r->datalen = data.dsize;
1499 memcpy(&r->data[0], data.dptr, data.dsize);
1501 ctdb_queue_packet(ctdb, &r->hdr);
1509 struct ctdb_client_notify_list {
1510 struct ctdb_client_notify_list *next, *prev;
1511 struct ctdb_context *ctdb;
1517 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1521 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1523 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1525 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1531 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1533 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1534 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1535 struct ctdb_client_notify_list *nl;
1537 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1539 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1540 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1544 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1545 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1550 if (client == NULL) {
1551 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1555 for(nl=client->notify; nl; nl=nl->next) {
1556 if (nl->srvid == notify->srvid) {
1561 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1565 nl = talloc(client, struct ctdb_client_notify_list);
1566 CTDB_NO_MEMORY(ctdb, nl);
1568 nl->srvid = notify->srvid;
1569 nl->data.dsize = notify->len;
1570 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1571 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1572 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1574 DLIST_ADD(client->notify, nl);
1575 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1580 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1582 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1583 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1584 struct ctdb_client_notify_list *nl;
1586 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1588 if (client == NULL) {
1589 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1593 for(nl=client->notify; nl; nl=nl->next) {
1594 if (nl->srvid == notify->srvid) {
1599 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1603 DLIST_REMOVE(client->notify, nl);
1604 talloc_set_destructor(nl, NULL);
1610 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1612 struct ctdb_client_pid_list *client_pid;
1614 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1615 if (client_pid->pid == pid) {
1616 return client_pid->client;
1623 /* This control is used by samba when probing if a process (of a samba daemon)
1625 Samba does this when it needs/wants to check if a subrecord in one of the
1626 databases is still valied, or if it is stale and can be removed.
1627 If the node is in unhealthy or stopped state we just kill of the samba
1628 process holding htis sub-record and return to the calling samba that
1629 the process does not exist.
1630 This allows us to forcefully recall subrecords registered by samba processes
1631 on banned and stopped nodes.
1633 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1635 struct ctdb_client *client;
1637 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1638 client = ctdb_find_client_by_pid(ctdb, pid);
1639 if (client != NULL) {
1640 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1641 talloc_free(client);
1646 return kill(pid, 0);