#include "includes.h"
#include "db_wrap.h"
#include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
#include "lib/util/dlinklist.h"
#include "system/network.h"
#include "system/filesys.h"
#include "system/wait.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
#include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
#include <sys/socket.h>
-static void daemon_incoming_packet(void *, struct ctdb_req_header *);
+struct ctdb_client_pid_list {
+ struct ctdb_client_pid_list *next, *prev;
+ struct ctdb_context *ctdb;
+ pid_t pid;
+ struct ctdb_client *client;
+};
+static void daemon_incoming_packet(void *, struct ctdb_req_header *);
static void print_exit_message(void)
{
}
+
+static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+
+ if (getpid() != ctdbd_pid) {
+ return;
+ }
+
+ event_add_timed(ctdb->ev, ctdb,
+ timeval_current_ofs(1, 0),
+ ctdb_time_tick, ctdb);
+}
+
+/* Used to trigger a dummy event once per second, to make
+ * detection of hangs more reliable.
+ */
+static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
+{
+ event_add_timed(ctdb->ev, ctdb,
+ timeval_current_ofs(1, 0),
+ ctdb_time_tick, ctdb);
+}
+
+
/* called when the "startup" event script has finished */
static void ctdb_start_transport(struct ctdb_context *ctdb)
{
/* start listening for recovery daemon pings */
ctdb_control_recd_ping(ctdb);
+
+ /* start listening to timer ticks */
+ ctdb_start_time_tickd(ctdb);
}
static void block_signal(int signum)
*/
static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
{
- client->ctdb->statistics.client_packets_sent++;
+ CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
if (hdr->operation == CTDB_REQ_MESSAGE) {
if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
- DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
- return 0;
+ DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
+ talloc_free(client);
+ return -1;
}
}
return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
talloc_free(r);
}
-
/*
this is called when the ctdb daemon received a ctdb request to
(unsigned long long)srvid));
}
- /* this is a hack for Samba - we now know the pid of the Samba client */
- if ((srvid & 0xFFFFFFFF) == srvid &&
- kill(srvid, 0) == 0) {
- client->pid = srvid;
- DEBUG(DEBUG_INFO,(__location__ " Registered PID %u for client %u\n",
- (unsigned)client->pid, client_id));
- }
return res;
}
return ctdb_deregister_message_handler(ctdb, srvid, client);
}
+int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
+ TDB_DATA *outdata)
+{
+ uint64_t *ids;
+ int i, num_ids;
+ uint8_t *results;
+
+ if ((indata.dsize % sizeof(uint64_t)) != 0) {
+ DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
+ "size=%d\n", (int)indata.dsize));
+ return -1;
+ }
+
+ ids = (uint64_t *)indata.dptr;
+ num_ids = indata.dsize / 8;
+
+ results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
+ if (results == NULL) {
+ DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
+ return -1;
+ }
+ for (i=0; i<num_ids; i++) {
+ struct ctdb_message_list *ml;
+ for (ml=ctdb->message_list; ml; ml=ml->next) {
+ if (ml->srvid == ids[i]) {
+ break;
+ }
+ }
+ if (ml != NULL) {
+ results[i/8] |= (1 << (i%8));
+ }
+ }
+ outdata->dptr = (uint8_t *)results;
+ outdata->dsize = talloc_get_size(results);
+ return 0;
+}
/*
destroy a ctdb_client
ctdb_takeover_client_destructor_hook(client);
ctdb_reqid_remove(client->ctdb, client->client_id);
- if (client->ctdb->statistics.num_clients) {
- client->ctdb->statistics.num_clients--;
- }
+ CTDB_DECREMENT_STAT(client->ctdb, num_clients);
if (client->num_persistent_updates != 0) {
DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
"commit active. Forcing recovery.\n"));
client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+
+ /* legacy trans2 transaction state: */
ctdb_db->transaction_active = false;
+
+ /*
+ * trans3 transaction state:
+ *
+ * The destructor sets the pointer to NULL.
+ */
+ talloc_free(ctdb_db->persistent_state);
}
return 0;
res = ctdb_daemon_call_recv(state, dstate->call);
if (res != 0) {
DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
- if (client->ctdb->statistics.pending_calls > 0) {
- client->ctdb->statistics.pending_calls--;
- }
- ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
+ CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
+
+ CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
return;
}
length, struct ctdb_reply_call);
if (r == NULL) {
DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
- if (client->ctdb->statistics.pending_calls > 0) {
- client->ctdb->statistics.pending_calls--;
- }
- ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
+ CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
+ CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
return;
}
r->hdr.reqid = dstate->reqid;
r->datalen = dstate->call->reply_data.dsize;
+ r->status = dstate->call->status;
memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
res = daemon_queue_send(client, &r->hdr);
+ if (res == -1) {
+ /* client is dead - return immediately */
+ return;
+ }
if (res != 0) {
DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
}
- ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
+ CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
+ CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
talloc_free(dstate);
- if (client->ctdb->statistics.pending_calls > 0) {
- client->ctdb->statistics.pending_calls--;
- }
}
struct ctdb_daemon_packet_wrap {
daemon_incoming_packet(client, hdr);
}
+struct ctdb_deferred_fetch_call {
+ struct ctdb_deferred_fetch_call *next, *prev;
+ struct ctdb_req_call *c;
+ struct ctdb_daemon_packet_wrap *w;
+};
+
+struct ctdb_deferred_fetch_queue {
+ struct ctdb_deferred_fetch_call *deferred_calls;
+};
+
+struct ctdb_deferred_requeue {
+ struct ctdb_deferred_fetch_call *dfc;
+ struct ctdb_client *client;
+};
+
+/* called from a timer event and starts reprocessing the deferred call.*/
+static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
+ struct ctdb_client *client = dfr->client;
+
+ talloc_steal(client, dfr->dfc->c);
+ daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
+ talloc_free(dfr);
+}
+
+/* the referral context is destroyed either after a timeout or when the initial
+ fetch-lock has finished.
+ at this stage, immediately start reprocessing the queued up deferred
+ calls so they get reprocessed immediately (and since we are dmaster at
+ this stage, trigger the waiting smbd processes to pick up and aquire the
+ record right away.
+*/
+static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
+{
+
+ /* need to reprocess the packets from the queue explicitely instead of
+ just using a normal destructor since we want, need, to
+ call the clients in the same oder as the requests queued up
+ */
+ while (dfq->deferred_calls != NULL) {
+ struct ctdb_client *client;
+ struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
+ struct ctdb_deferred_requeue *dfr;
+
+ DLIST_REMOVE(dfq->deferred_calls, dfc);
+
+ client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
+ dfc->w->client_id));
+ continue;
+ }
+
+ /* process it by pushing it back onto the eventloop */
+ dfr = talloc(client, struct ctdb_deferred_requeue);
+ if (dfr == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
+ continue;
+ }
+
+ dfr->dfc = talloc_steal(dfr, dfc);
+ dfr->client = client;
+
+ event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
+ }
+
+ return 0;
+}
+
+/* insert the new deferral context into the rb tree.
+ there should never be a pre-existing context here, but check for it
+ warn and destroy the previous context if there is already a deferral context
+ for this key.
+*/
+static void *insert_dfq_callback(void *parm, void *data)
+{
+ if (data) {
+ DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
+ talloc_free(data);
+ }
+ return parm;
+}
+
+/* if the original fetch-lock did not complete within a reasonable time,
+ free the context and context for all deferred requests to cause them to be
+ re-inserted into the event system.
+*/
+static void dfq_timeout(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ talloc_free(private_data);
+}
+
+/* This function is used in the local daemon to register a KEY in a database
+ for being "fetched"
+ While the remote fetch is in-flight, any futher attempts to re-fetch the
+ same record will be deferred until the fetch completes.
+*/
+static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+ uint32_t *k;
+ struct ctdb_deferred_fetch_queue *dfq;
+
+ k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
+ if (k == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+ return -1;
+ }
+
+ k[0] = (call->key.dsize + 3) / 4 + 1;
+ memcpy(&k[1], call->key.dptr, call->key.dsize);
+
+ dfq = talloc(call, struct ctdb_deferred_fetch_queue);
+ if (dfq == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
+ talloc_free(k);
+ return -1;
+ }
+ dfq->deferred_calls = NULL;
+
+ trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
+
+ talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
+
+ /* if the fetch havent completed in 30 seconds, just tear it all down
+ and let it try again as the events are reissued */
+ event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
+
+ talloc_free(k);
+ return 0;
+}
+
+/* check if this is a duplicate request to a fetch already in-flight
+ if it is, make this call deferred to be reprocessed later when
+ the in-flight fetch completes.
+*/
+static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
+{
+ uint32_t *k;
+ struct ctdb_deferred_fetch_queue *dfq;
+ struct ctdb_deferred_fetch_call *dfc;
+
+ k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
+ if (k == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+ return -1;
+ }
+
+ k[0] = (key.dsize + 3) / 4 + 1;
+ memcpy(&k[1], key.dptr, key.dsize);
+
+ dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
+ if (dfq == NULL) {
+ talloc_free(k);
+ return -1;
+ }
+
+
+ talloc_free(k);
+
+ dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
+ if (dfc == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
+ return -1;
+ }
+
+ dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
+ if (dfc->w == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
+ talloc_free(dfc);
+ return -1;
+ }
+
+ dfc->c = talloc_steal(dfc, c);
+ dfc->w->ctdb = ctdb_db->ctdb;
+ dfc->w->client_id = client->client_id;
+
+ DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
+
+ return 0;
+}
+
/*
this is called when the ctdb daemon received a ctdb request call
struct ctdb_context *ctdb = client->ctdb;
struct ctdb_daemon_packet_wrap *w;
- ctdb->statistics.total_calls++;
- if (client->ctdb->statistics.pending_calls > 0) {
- ctdb->statistics.pending_calls++;
- }
+ CTDB_INCREMENT_STAT(ctdb, total_calls);
+ CTDB_DECREMENT_STAT(ctdb, pending_calls);
ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
if (!ctdb_db) {
DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
c->db_id));
- if (client->ctdb->statistics.pending_calls > 0) {
- ctdb->statistics.pending_calls--;
- }
+ CTDB_DECREMENT_STAT(ctdb, pending_calls);
return;
}
+ if (ctdb_db->unhealthy_reason) {
+ /*
+ * this is just a warning, as the tdb should be empty anyway,
+ * and only persistent databases can be unhealthy, which doesn't
+ * use this code patch
+ */
+ DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ }
+
key.dptr = c->data;
key.dsize = c->keylen;
daemon_incoming_packet_wrap, w, True);
if (ret == -2) {
/* will retry later */
- if (client->ctdb->statistics.pending_calls > 0) {
- ctdb->statistics.pending_calls--;
- }
+ CTDB_DECREMENT_STAT(ctdb, pending_calls);
return;
}
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
- if (client->ctdb->statistics.pending_calls > 0) {
- ctdb->statistics.pending_calls--;
+ CTDB_DECREMENT_STAT(ctdb, pending_calls);
+ return;
+ }
+
+ if (c->flags & CTDB_IMMEDIATE_MIGRATION) {
+ /* check if this fetch-lock request is a duplicate for a
+ request we already have in flight. If so defer it until
+ the first request completes.
+ */
+ if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
+ ret = ctdb_ltdb_unlock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+ }
+ return;
+ }
+ }
+
+ /* Dont do READONLY if we dont have a tracking database */
+ if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
+ c->flags &= ~CTDB_WANT_READONLY;
+ }
+
+ if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
+ header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
+ if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
+ }
+ /* and clear out the tracking data */
+ if (tdb_delete(ctdb_db->rottdb, key) != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
+ }
+ }
+
+ /* if we are revoking, we must defer all other calls until the revoke
+ * had completed.
+ */
+ if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
+ talloc_free(data.dptr);
+ ret = ctdb_ltdb_unlock(ctdb_db, key);
+
+ if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
+ ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
}
return;
}
+ if ((header.dmaster == ctdb->pnn)
+ && (!(c->flags & CTDB_WANT_READONLY))
+ && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
+ header.flags |= CTDB_REC_RO_REVOKING_READONLY;
+ if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
+ }
+ ret = ctdb_ltdb_unlock(ctdb_db, key);
+
+ if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "Failed to start record revoke");
+ }
+ talloc_free(data.dptr);
+
+ if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
+ ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+ }
+
+ return;
+ }
+
dstate = talloc(client, struct daemon_call_state);
if (dstate == NULL) {
- ctdb_ltdb_unlock(ctdb_db, key);
- DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
- if (client->ctdb->statistics.pending_calls > 0) {
- ctdb->statistics.pending_calls--;
+ ret = ctdb_ltdb_unlock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
}
+
+ DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
+ CTDB_DECREMENT_STAT(ctdb, pending_calls);
return;
}
dstate->start_time = timeval_current();
call = dstate->call = talloc_zero(dstate, struct ctdb_call);
if (call == NULL) {
- ctdb_ltdb_unlock(ctdb_db, key);
- DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
- if (client->ctdb->statistics.pending_calls > 0) {
- ctdb->statistics.pending_calls--;
+ ret = ctdb_ltdb_unlock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
}
- ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
+
+ DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
+ CTDB_DECREMENT_STAT(ctdb, pending_calls);
+ CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
return;
}
state = ctdb_call_local_send(ctdb_db, call, &header, &data);
} else {
state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+ if (call->flags & CTDB_IMMEDIATE_MIGRATION) {
+ /* This request triggered a remote fetch-lock.
+ set up a deferral for this key so any additional
+ fetch-locks are deferred until the current one
+ finishes.
+ */
+ setup_deferred_fetch_locks(ctdb_db, call);
+ }
}
- ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_ltdb_unlock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+ }
if (state == NULL) {
DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
- if (client->ctdb->statistics.pending_calls > 0) {
- ctdb->statistics.pending_calls--;
- }
- ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
+ CTDB_DECREMENT_STAT(ctdb, pending_calls);
+ CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
return;
}
talloc_steal(state, dstate);
switch (hdr->operation) {
case CTDB_REQ_CALL:
- ctdb->statistics.client.req_call++;
+ CTDB_INCREMENT_STAT(ctdb, client.req_call);
daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
break;
case CTDB_REQ_MESSAGE:
- ctdb->statistics.client.req_message++;
+ CTDB_INCREMENT_STAT(ctdb, client.req_message);
daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
break;
case CTDB_REQ_CONTROL:
- ctdb->statistics.client.req_control++;
+ CTDB_INCREMENT_STAT(ctdb, client.req_control);
daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
break;
return;
}
- client->ctdb->statistics.client_packets_recv++;
+ CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
if (cnt < sizeof(*hdr)) {
ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
daemon_incoming_packet(client, hdr);
}
+
+static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
+{
+ if (client_pid->ctdb->client_pids != NULL) {
+ DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
+ }
+
+ return 0;
+}
+
+
static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private_data)
{
int fd;
struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
struct ctdb_client *client;
-#ifdef _AIX
- struct peercred_struct cr;
- socklen_t crl = sizeof(struct peercred_struct);
-#else
- struct ucred cr;
- socklen_t crl = sizeof(struct ucred);
-#endif
+ struct ctdb_client_pid_list *client_pid;
+ pid_t peer_pid = 0;
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
client = talloc_zero(ctdb, struct ctdb_client);
-#ifdef _AIX
- if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
-#else
- if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
-#endif
- talloc_asprintf(client, "struct ctdb_client: pid:%u", (unsigned)cr.pid);
+ if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
+ DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
}
client->ctdb = ctdb;
client->fd = fd;
client->client_id = ctdb_reqid_new(ctdb, client);
- ctdb->statistics.num_clients++;
+ client->pid = peer_pid;
+
+ client_pid = talloc(client, struct ctdb_client_pid_list);
+ if (client_pid == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
+ close(fd);
+ talloc_free(client);
+ return;
+ }
+ client_pid->ctdb = ctdb;
+ client_pid->pid = peer_pid;
+ client_pid->client = client;
+
+ DLIST_ADD(ctdb->client_pids, client_pid);
client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
- ctdb_daemon_read_cb, client);
+ ctdb_daemon_read_cb, client,
+ "client-%u", client->pid);
talloc_set_destructor(client, ctdb_client_destructor);
+ talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
+ CTDB_INCREMENT_STAT(ctdb, num_clients);
}
}
}
+static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
+ void *private_data)
+{
+ if (status != 0) {
+ ctdb_fatal(ctdb, "Failed to run setup event\n");
+ return;
+ }
+ ctdb_run_notification_script(ctdb, "setup");
+
+ /* tell all other nodes we've just started up */
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
+ 0, CTDB_CONTROL_STARTUP, 0,
+ CTDB_CTRL_FLAG_NOREPLY,
+ tdb_null, NULL, NULL);
+}
+
/*
start the protocol going as a daemon
*/
-int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
+int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
{
int res, ret = -1;
struct fd_event *fde;
}
block_signal(SIGPIPE);
+ ctdbd_pid = getpid();
+ ctdb->ctdbd_pid = ctdbd_pid;
+
+
+ DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
+
if (ctdb->do_setsched) {
/* try to set us up as realtime */
ctdb_set_scheduler(ctdb);
}
ctdb->ev = event_context_init(NULL);
+ tevent_loop_allow_nesting(ctdb->ev);
+ ret = ctdb_init_tevent_logging(ctdb);
+ if (ret != 0) {
+ DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
+ exit(1);
+ }
ctdb_set_child_logging(ctdb);
+ /* initialize statistics collection */
+ ctdb_statistics_init(ctdb);
+
/* force initial recovery for election */
ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
if (ctdb->methods->initialise(ctdb) != 0) {
ctdb_fatal(ctdb, "transport failed to initialise");
}
+ if (public_address_list) {
+ ret = ctdb_set_public_addresses(ctdb, public_address_list);
+ if (ret == -1) {
+ DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
+ exit(1);
+ }
+ }
- /* attach to any existing persistent databases */
- if (ctdb_attach_persistent(ctdb) != 0) {
- ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");
+
+ /* attach to existing databases */
+ if (ctdb_attach_databases(ctdb) != 0) {
+ ctdb_fatal(ctdb, "Failed to attach to databases\n");
}
+ ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
+ if (ret != 0) {
+ ctdb_fatal(ctdb, "Failed to run init event\n");
+ }
+ ctdb_run_notification_script(ctdb, "init");
+
/* start frozen, then let the first election sort things out */
if (ctdb_blocking_freeze(ctdb)) {
ctdb_fatal(ctdb, "Failed to get initial freeze\n");
/* now start accepting clients, only can do this once frozen */
fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
- EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+ EVENT_FD_READ,
ctdb_accept_client, ctdb);
-
- /* tell all other nodes we've just started up */
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
- 0, CTDB_CONTROL_STARTUP, 0,
- CTDB_CTRL_FLAG_NOREPLY,
- tdb_null, NULL, NULL);
+ tevent_fd_set_auto_close(fde);
/* release any IPs we hold from previous runs of the daemon */
- ctdb_release_all_ips(ctdb);
+ if (ctdb->tunable.disable_ip_failover == 0) {
+ ctdb_release_all_ips(ctdb);
+ }
/* start the transport going */
ctdb_start_transport(ctdb);
DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
exit(1);
}
+
+ ret = ctdb_event_script_callback(ctdb,
+ ctdb,
+ ctdb_setup_event_callback,
+ ctdb,
+ false,
+ CTDB_EVENT_SETUP,
+ "%s",
+ "");
+ if (ret != 0) {
+ DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
+ exit(1);
+ }
+
+ if (use_syslog) {
+ if (start_syslog_daemon(ctdb)) {
+ DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
+ exit(10);
+ }
+ }
+
+ ctdb_lockdown_memory(ctdb);
/* go into a wait loop to allow other nodes to complete */
event_loop_wait(ctdb->ev);
size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
if (ctdb->methods == NULL) {
- DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
+ DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
operation, (unsigned)length));
return NULL;
}
struct ctdb_client *client = state->client;
struct ctdb_reply_control *r;
size_t len;
+ int ret;
/* construct a message to send to the client containing the data */
len = offsetof(struct ctdb_reply_control, data) + data.dsize;
memcpy(&r->data[r->datalen], errormsg, r->errorlen);
}
- daemon_queue_send(client, &r->hdr);
-
- talloc_free(state);
+ ret = daemon_queue_send(client, &r->hdr);
+ if (ret != -1) {
+ talloc_free(state);
+ }
}
/*
int len;
if (ctdb->methods == NULL) {
- DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
+ DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
return -1;
}
return 0;
}
+
+
+struct ctdb_client_notify_list {
+ struct ctdb_client_notify_list *next, *prev;
+ struct ctdb_context *ctdb;
+ uint64_t srvid;
+ TDB_DATA data;
+};
+
+
+static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
+{
+ int ret;
+
+ DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
+
+ ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
+ }
+
+ return 0;
+}
+
+int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
+{
+ struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+ struct ctdb_client_notify_list *nl;
+
+ DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
+
+ if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
+ DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
+ return -1;
+ }
+
+ if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
+ DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
+ return -1;
+ }
+
+
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
+ return -1;
+ }
+
+ for(nl=client->notify; nl; nl=nl->next) {
+ if (nl->srvid == notify->srvid) {
+ break;
+ }
+ }
+ if (nl != NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
+ return -1;
+ }
+
+ nl = talloc(client, struct ctdb_client_notify_list);
+ CTDB_NO_MEMORY(ctdb, nl);
+ nl->ctdb = ctdb;
+ nl->srvid = notify->srvid;
+ nl->data.dsize = notify->len;
+ nl->data.dptr = talloc_size(nl, nl->data.dsize);
+ CTDB_NO_MEMORY(ctdb, nl->data.dptr);
+ memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
+
+ DLIST_ADD(client->notify, nl);
+ talloc_set_destructor(nl, ctdb_client_notify_destructor);
+
+ return 0;
+}
+
+int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
+{
+ struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+ struct ctdb_client_notify_list *nl;
+
+ DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
+
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
+ return -1;
+ }
+
+ for(nl=client->notify; nl; nl=nl->next) {
+ if (nl->srvid == notify->srvid) {
+ break;
+ }
+ }
+ if (nl == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
+ return -1;
+ }
+
+ DLIST_REMOVE(client->notify, nl);
+ talloc_set_destructor(nl, NULL);
+ talloc_free(nl);
+
+ return 0;
+}
+
+struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
+{
+ struct ctdb_client_pid_list *client_pid;
+
+ for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
+ if (client_pid->pid == pid) {
+ return client_pid->client;
+ }
+ }
+ return NULL;
+}
+
+
+/* This control is used by samba when probing if a process (of a samba daemon)
+ exists on the node.
+ Samba does this when it needs/wants to check if a subrecord in one of the
+ databases is still valied, or if it is stale and can be removed.
+ If the node is in unhealthy or stopped state we just kill of the samba
+ process holding htis sub-record and return to the calling samba that
+ the process does not exist.
+ This allows us to forcefully recall subrecords registered by samba processes
+ on banned and stopped nodes.
+*/
+int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
+{
+ struct ctdb_client *client;
+
+ if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
+ client = ctdb_find_client_by_pid(ctdb, pid);
+ if (client != NULL) {
+ DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
+ talloc_free(client);
+ }
+ return -1;
+ }
+
+ return kill(pid, 0);
+}