Move platform-specific code to common/system_*
[ctdb.git] / server / ctdb_daemon.c
index 3978e28fdf48d9957512f4dc42765ef3f9c3733b..69488dfac4258cb7bc4e20c8d4a64ab65a753c39 100644 (file)
 #include "includes.h"
 #include "db_wrap.h"
 #include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/wait.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 #include <sys/socket.h>
 
+struct ctdb_client_pid_list {
+       struct ctdb_client_pid_list *next, *prev;
+       struct ctdb_context *ctdb;
+       pid_t pid;
+       struct ctdb_client *client;
+};
+
 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
 
-/*
-  handler for when a node changes its flags
-*/
-static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                               TDB_DATA data, void *private_data)
+static void print_exit_message(void)
 {
-       struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+       DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
+}
 
-       if (data.dsize != sizeof(*c) || !ctdb_validate_pnn(ctdb, c->pnn)) {
-               DEBUG(DEBUG_CRIT,(__location__ "Invalid data in ctdb_node_flag_change\n"));
-               return;
-       }
 
-       if (!ctdb_validate_pnn(ctdb, c->pnn)) {
-               DEBUG(DEBUG_CRIT,("Bad pnn %u in flag_change_handler\n", c->pnn));
-               return;
-       }
 
-       /* don't get the disconnected flag from the other node */
-       ctdb->nodes[c->pnn]->flags = 
-               (ctdb->nodes[c->pnn]->flags&NODE_FLAGS_DISCONNECTED) 
-               | (c->new_flags & ~NODE_FLAGS_DISCONNECTED);    
-       DEBUG(DEBUG_DEBUG,("Node flags for node %u are now 0x%x\n", c->pnn, ctdb->nodes[c->pnn]->flags));
+static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
+                                 struct timeval t, void *private_data)
+{
+       struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
 
-       /* make sure we don't hold any IPs when we shouldn't */
-       if (c->pnn == ctdb->pnn &&
-           (ctdb->nodes[c->pnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
-               ctdb_release_all_ips(ctdb);
+       if (getpid() != ctdbd_pid) {
+               return;
        }
+
+       event_add_timed(ctdb->ev, ctdb, 
+                       timeval_current_ofs(1, 0), 
+                       ctdb_time_tick, ctdb);
 }
 
-static void print_exit_message(void)
+/* Used to trigger a dummy event once per second, to make
+ * detection of hangs more reliable.
+ */
+static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
 {
-       DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
+       event_add_timed(ctdb->ev, ctdb, 
+                       timeval_current_ofs(1, 0), 
+                       ctdb_time_tick, ctdb);
 }
 
 
@@ -91,10 +94,6 @@ static void ctdb_start_transport(struct ctdb_context *ctdb)
        /* Make sure we log something when the daemon terminates */
        atexit(print_exit_message);
 
-       /* a handler for when nodes are disabled/enabled */
-       ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
-                                     flag_change_handler, NULL);
-
        /* start monitoring for connected/disconnected nodes */
        ctdb_start_keepalive(ctdb);
 
@@ -103,6 +102,12 @@ static void ctdb_start_transport(struct ctdb_context *ctdb)
 
        /* start periodic update of tcp tickle lists */
                ctdb_start_tcp_tickle_update(ctdb);
+
+       /* start listening for recovery daemon pings */
+       ctdb_control_recd_ping(ctdb);
+
+       /* start listening to timer ticks */
+       ctdb_start_time_tickd(ctdb);
 }
 
 static void block_signal(int signum)
@@ -123,7 +128,14 @@ static void block_signal(int signum)
  */
 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
 {
-       client->ctdb->statistics.client_packets_sent++;
+       CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
+       if (hdr->operation == CTDB_REQ_MESSAGE) {
+               if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
+                       DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
+                       talloc_free(client);
+                       return -1;
+               }
+       }
        return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
 }
 
@@ -154,7 +166,6 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        talloc_free(r);
 }
-                                          
 
 /*
   this is called when the ctdb daemon received a ctdb request to 
@@ -177,13 +188,6 @@ int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_i
                         (unsigned long long)srvid));
        }
 
-       /* this is a hack for Samba - we now know the pid of the Samba client */
-       if ((srvid & 0xFFFFFFFF) == srvid &&
-           kill(srvid, 0) == 0) {
-               client->pid = srvid;
-               DEBUG(DEBUG_INFO,(__location__ " Registered PID %u for client %u\n",
-                        (unsigned)client->pid, client_id));
-       }
        return res;
 }
 
@@ -201,20 +205,74 @@ int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client
        return ctdb_deregister_message_handler(ctdb, srvid, client);
 }
 
+int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
+                       TDB_DATA *outdata)
+{
+       uint64_t *ids;
+       int i, num_ids;
+       uint8_t *results;
+
+       if ((indata.dsize % sizeof(uint64_t)) != 0) {
+               DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
+                                 "size=%d\n", (int)indata.dsize));
+               return -1;
+       }
+
+       ids = (uint64_t *)indata.dptr;
+       num_ids = indata.dsize / 8;
+
+       results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
+       if (results == NULL) {
+               DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
+               return -1;
+       }
+       for (i=0; i<num_ids; i++) {
+               struct ctdb_message_list *ml;
+               for (ml=ctdb->message_list; ml; ml=ml->next) {
+                       if (ml->srvid == ids[i]) {
+                               break;
+                       }
+               }
+               if (ml != NULL) {
+                       results[i/8] |= (1 << (i%8));
+               }
+       }
+       outdata->dptr = (uint8_t *)results;
+       outdata->dsize = talloc_get_size(results);
+       return 0;
+}
 
 /*
   destroy a ctdb_client
 */
 static int ctdb_client_destructor(struct ctdb_client *client)
 {
+       struct ctdb_db_context *ctdb_db;
+
        ctdb_takeover_client_destructor_hook(client);
        ctdb_reqid_remove(client->ctdb, client->client_id);
-       client->ctdb->statistics.num_clients--;
+       CTDB_DECREMENT_STAT(client->ctdb, num_clients);
 
        if (client->num_persistent_updates != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
                client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
        }
+       ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
+       if (ctdb_db) {
+               DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
+                                 "commit active. Forcing recovery.\n"));
+               client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+
+               /* legacy trans2 transaction state: */
+               ctdb_db->transaction_active = false;
+
+               /*
+                * trans3 transaction state:
+                *
+                * The destructor sets the pointer to NULL.
+                */
+               talloc_free(ctdb_db->persistent_state);
+       }
 
        return 0;
 }
@@ -266,6 +324,7 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
        int res;
        uint32_t length;
        struct ctdb_client *client = dstate->client;
+       struct ctdb_db_context *ctdb_db = state->ctdb_db;
 
        talloc_steal(client, dstate);
        talloc_steal(dstate, dstate->call);
@@ -273,8 +332,9 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
        res = ctdb_daemon_call_recv(state, dstate->call);
        if (res != 0) {
                DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
-               client->ctdb->statistics.pending_calls--;
-               ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
+               CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
+
+               CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
                return;
        }
 
@@ -283,21 +343,26 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
                               length, struct ctdb_reply_call);
        if (r == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
-               client->ctdb->statistics.pending_calls--;
-               ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
+               CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
+               CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
                return;
        }
        r->hdr.reqid        = dstate->reqid;
        r->datalen          = dstate->call->reply_data.dsize;
+       r->status           = dstate->call->status;
        memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
 
        res = daemon_queue_send(client, &r->hdr);
+       if (res == -1) {
+               /* client is dead - return immediately */
+               return;
+       }
        if (res != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
        }
-       ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
+       CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
+       CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
        talloc_free(dstate);
-       client->ctdb->statistics.pending_calls--;
 }
 
 struct ctdb_daemon_packet_wrap {
@@ -331,6 +396,190 @@ static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
        daemon_incoming_packet(client, hdr);    
 }
 
+struct ctdb_deferred_fetch_call {
+       struct ctdb_deferred_fetch_call *next, *prev;
+       struct ctdb_req_call *c;
+       struct ctdb_daemon_packet_wrap *w;
+};
+
+struct ctdb_deferred_fetch_queue {
+       struct ctdb_deferred_fetch_call *deferred_calls;
+};
+
+struct ctdb_deferred_requeue {
+       struct ctdb_deferred_fetch_call *dfc;
+       struct ctdb_client *client;
+};
+
+/* called from a timer event and starts reprocessing the deferred call.*/
+static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
+                                      struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
+       struct ctdb_client *client = dfr->client;
+
+       talloc_steal(client, dfr->dfc->c);
+       daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
+       talloc_free(dfr);
+}
+
+/* the referral context is destroyed either after a timeout or when the initial
+   fetch-lock has finished.
+   at this stage, immediately start reprocessing the queued up deferred
+   calls so they get reprocessed immediately (and since we are dmaster at
+   this stage, trigger the waiting smbd processes to pick up and aquire the
+   record right away.
+*/
+static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
+{
+
+       /* need to reprocess the packets from the queue explicitely instead of
+          just using a normal destructor since we want, need, to
+          call the clients in the same oder as the requests queued up
+       */
+       while (dfq->deferred_calls != NULL) {
+               struct ctdb_client *client;
+               struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
+               struct ctdb_deferred_requeue *dfr;
+
+               DLIST_REMOVE(dfq->deferred_calls, dfc);
+
+               client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
+               if (client == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
+                                dfc->w->client_id));
+                       continue;
+               }
+
+               /* process it by pushing it back onto the eventloop */
+               dfr = talloc(client, struct ctdb_deferred_requeue);
+               if (dfr == NULL) {
+                       DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
+                       continue;
+               }
+
+               dfr->dfc    = talloc_steal(dfr, dfc);
+               dfr->client = client;
+
+               event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
+       }
+
+       return 0;
+}
+
+/* insert the new deferral context into the rb tree.
+   there should never be a pre-existing context here, but check for it
+   warn and destroy the previous context if there is already a deferral context
+   for this key.
+*/
+static void *insert_dfq_callback(void *parm, void *data)
+{
+        if (data) {
+               DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
+                talloc_free(data);
+        }
+        return parm;
+}
+
+/* if the original fetch-lock did not complete within a reasonable time,
+   free the context and context for all deferred requests to cause them to be
+   re-inserted into the event system.
+*/
+static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
+                                 struct timeval t, void *private_data)
+{
+       talloc_free(private_data);
+}
+
+/* This function is used in the local daemon to register a KEY in a database
+   for being "fetched"
+   While the remote fetch is in-flight, any futher attempts to re-fetch the
+   same record will be deferred until the fetch completes.
+*/
+static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+
+       k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       k[0] = (call->key.dsize + 3) / 4 + 1;
+       memcpy(&k[1], call->key.dptr, call->key.dsize);
+
+       dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
+       if (dfq == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
+               talloc_free(k);
+               return -1;
+       }
+       dfq->deferred_calls = NULL;
+
+       trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
+
+       talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
+
+       /* if the fetch havent completed in 30 seconds, just tear it all down
+          and let it try again as the events are reissued */
+       event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
+
+       talloc_free(k);
+       return 0;
+}
+
+/* check if this is a duplicate request to a fetch already in-flight
+   if it is, make this call deferred to be reprocessed later when
+   the in-flight fetch completes.
+*/
+static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+       struct ctdb_deferred_fetch_call *dfc;
+
+       k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       k[0] = (key.dsize + 3) / 4 + 1;
+       memcpy(&k[1], key.dptr, key.dsize);
+
+       dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
+       if (dfq == NULL) {
+               talloc_free(k);
+               return -1;
+       }
+
+
+       talloc_free(k);
+
+       dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
+       if (dfc == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
+               return -1;
+       }
+
+       dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
+       if (dfc->w == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
+               talloc_free(dfc);
+               return -1;
+       }
+
+       dfc->c = talloc_steal(dfc, c);
+       dfc->w->ctdb = ctdb_db->ctdb;
+       dfc->w->client_id = client->client_id;
+
+       DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
+
+       return 0;
+}
+
 
 /*
   this is called when the ctdb daemon received a ctdb request call
@@ -349,17 +598,27 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
        struct ctdb_context *ctdb = client->ctdb;
        struct ctdb_daemon_packet_wrap *w;
 
-       ctdb->statistics.total_calls++;
-       ctdb->statistics.pending_calls++;
+       CTDB_INCREMENT_STAT(ctdb, total_calls);
+       CTDB_DECREMENT_STAT(ctdb, pending_calls);
 
        ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
        if (!ctdb_db) {
                DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
                          c->db_id));
-               ctdb->statistics.pending_calls--;
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
                return;
        }
 
+       if (ctdb_db->unhealthy_reason) {
+               /*
+                * this is just a warning, as the tdb should be empty anyway,
+                * and only persistent databases can be unhealthy, which doesn't
+                * use this code patch
+                */
+               DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
+                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
+       }
+
        key.dptr = c->data;
        key.dsize = c->keylen;
 
@@ -374,7 +633,7 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                                           daemon_incoming_packet_wrap, w, True);
        if (ret == -2) {
                /* will retry later */
-               ctdb->statistics.pending_calls--;
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
                return;
        }
 
@@ -382,15 +641,83 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
-               ctdb->statistics.pending_calls--;
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
+               return;
+       }
+
+       if (c->flags & CTDB_IMMEDIATE_MIGRATION) {
+               /* check if this fetch-lock request is a duplicate for a
+                  request we already have in flight. If so defer it until
+                  the first request completes.
+                */
+               if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
+                       ret = ctdb_ltdb_unlock(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+                       }
+                       return;
+               }
+       }
+
+       /* Dont do READONLY if we dont have a tracking database */
+       if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
+               c->flags &= ~CTDB_WANT_READONLY;
+       }
+
+       if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
+               header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
+               if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
+               }
+               /* and clear out the tracking data */
+               if (tdb_delete(ctdb_db->rottdb, key) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
+               }
+       }
+
+       /* if we are revoking, we must defer all other calls until the revoke
+        * had completed.
+        */
+       if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
+               talloc_free(data.dptr);
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+
+               if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
+                       ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+               }
                return;
        }
 
+       if ((header.dmaster == ctdb->pnn)
+       && (!(c->flags & CTDB_WANT_READONLY))
+       && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
+               header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
+               if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
+               }
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+
+               if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to start record revoke");
+               }
+               talloc_free(data.dptr);
+
+               if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
+                       ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+               }
+
+               return;
+       }               
+
        dstate = talloc(client, struct daemon_call_state);
        if (dstate == NULL) {
-               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
+
                DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
-               ctdb->statistics.pending_calls--;
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
                return;
        }
        dstate->start_time = timeval_current();
@@ -400,10 +727,14 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 
        call = dstate->call = talloc_zero(dstate, struct ctdb_call);
        if (call == NULL) {
-               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
+
                DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
-               ctdb->statistics.pending_calls--;
-               ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
+               CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
                return;
        }
 
@@ -417,14 +748,25 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                state = ctdb_call_local_send(ctdb_db, call, &header, &data);
        } else {
                state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+               if (call->flags & CTDB_IMMEDIATE_MIGRATION) {
+                       /* This request triggered a remote fetch-lock.
+                          set up a deferral for this key so any additional
+                          fetch-locks are deferred until the current one
+                          finishes.
+                        */
+                       setup_deferred_fetch_locks(ctdb_db, call);
+               }
        }
 
-       ctdb_ltdb_unlock(ctdb_db, key);
+       ret = ctdb_ltdb_unlock(ctdb_db, key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
        if (state == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
-               ctdb->statistics.pending_calls--;
-               ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
+               CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
                return;
        }
        talloc_steal(state, dstate);
@@ -464,17 +806,17 @@ static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
 
        switch (hdr->operation) {
        case CTDB_REQ_CALL:
-               ctdb->statistics.client.req_call++;
+               CTDB_INCREMENT_STAT(ctdb, client.req_call);
                daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
                break;
 
        case CTDB_REQ_MESSAGE:
-               ctdb->statistics.client.req_message++;
+               CTDB_INCREMENT_STAT(ctdb, client.req_message);
                daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
                break;
 
        case CTDB_REQ_CONTROL:
-               ctdb->statistics.client.req_control++;
+               CTDB_INCREMENT_STAT(ctdb, client.req_control);
                daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
                break;
 
@@ -500,7 +842,7 @@ static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
                return;
        }
 
-       client->ctdb->statistics.client_packets_recv++;
+       CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
 
        if (cnt < sizeof(*hdr)) {
                ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
@@ -532,21 +874,27 @@ static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
        daemon_incoming_packet(client, hdr);
 }
 
+
+static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
+{
+       if (client_pid->ctdb->client_pids != NULL) {
+               DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
+       }
+
+       return 0;
+}
+
+
 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
                         uint16_t flags, void *private_data)
 {
-       struct sockaddr_in addr;
+       struct sockaddr_un addr;
        socklen_t len;
        int fd;
        struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
        struct ctdb_client *client;
-#ifdef _AIX
-       struct peercred_struct cr;
-       socklen_t crl = sizeof(struct peercred_struct);
-#else
-       struct ucred cr;
-       socklen_t crl = sizeof(struct ucred);
-#endif
+       struct ctdb_client_pid_list *client_pid;
+       pid_t peer_pid = 0;
 
        memset(&addr, 0, sizeof(addr));
        len = sizeof(addr);
@@ -558,24 +906,38 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
        set_nonblocking(fd);
        set_close_on_exec(fd);
 
+       DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
+
        client = talloc_zero(ctdb, struct ctdb_client);
-#ifdef _AIX
-       if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
-#else
-       if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
-#endif
-               talloc_asprintf(client, "struct ctdb_client: pid:%u", (unsigned)cr.pid);
+       if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
+               DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
        }
 
        client->ctdb = ctdb;
        client->fd = fd;
        client->client_id = ctdb_reqid_new(ctdb, client);
-       ctdb->statistics.num_clients++;
+       client->pid = peer_pid;
+
+       client_pid = talloc(client, struct ctdb_client_pid_list);
+       if (client_pid == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
+               close(fd);
+               talloc_free(client);
+               return;
+       }               
+       client_pid->ctdb   = ctdb;
+       client_pid->pid    = peer_pid;
+       client_pid->client = client;
+
+       DLIST_ADD(ctdb->client_pids, client_pid);
 
        client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
-                                        ctdb_daemon_read_cb, client);
+                                        ctdb_daemon_read_cb, client,
+                                        "client-%u", client->pid);
 
        talloc_set_destructor(client, ctdb_client_destructor);
+       talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
+       CTDB_INCREMENT_STAT(ctdb, num_clients);
 }
 
 
@@ -612,7 +974,7 @@ static int ux_socket_bind(struct ctdb_context *ctdb)
        } 
 
 
-       if (listen(ctdb->daemon.sd, 10) != 0) {
+       if (listen(ctdb->daemon.sd, 100) != 0) {
                DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
                goto failed;
        }
@@ -625,15 +987,6 @@ failed:
        return -1;      
 }
 
-/*
-  delete the socket on exit - called on destruction of autofree context
- */
-static int unlink_destructor(const char *name)
-{
-       unlink(name);
-       return 0;
-}
-
 static void sig_child_handler(struct event_context *ev,
        struct signal_event *se, int signum, int count,
        void *dont_care, 
@@ -655,10 +1008,26 @@ static void sig_child_handler(struct event_context *ev,
        }
 }
 
+static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
+                                     void *private_data)
+{
+       if (status != 0) {
+               ctdb_fatal(ctdb, "Failed to run setup event\n");
+               return;
+       }
+       ctdb_run_notification_script(ctdb, "setup");
+
+       /* tell all other nodes we've just started up */
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
+                                0, CTDB_CONTROL_STARTUP, 0,
+                                CTDB_CTRL_FLAG_NOREPLY,
+                                tdb_null, NULL, NULL);
+}
+
 /*
   start the protocol going as a daemon
 */
-int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
+int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
 {
        int res, ret = -1;
        struct fd_event *fde;
@@ -691,6 +1060,12 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
        }
        block_signal(SIGPIPE);
 
+       ctdbd_pid = getpid();
+       ctdb->ctdbd_pid = ctdbd_pid;
+
+
+       DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
+
        if (ctdb->do_setsched) {
                /* try to set us up as realtime */
                ctdb_set_scheduler(ctdb);
@@ -698,12 +1073,24 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
 
        /* ensure the socket is deleted on exit of the daemon */
        domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
-       talloc_set_destructor(domain_socket_name, unlink_destructor);   
+       if (domain_socket_name == NULL) {
+               DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
+               exit(12);
+       }
 
        ctdb->ev = event_context_init(NULL);
+       tevent_loop_allow_nesting(ctdb->ev);
+       ret = ctdb_init_tevent_logging(ctdb);
+       if (ret != 0) {
+               DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
+               exit(1);
+       }
 
        ctdb_set_child_logging(ctdb);
 
+       /* initialize statistics collection */
+       ctdb_statistics_init(ctdb);
+
        /* force initial recovery for election */
        ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
 
@@ -731,30 +1118,41 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
        if (ctdb->methods->initialise(ctdb) != 0) {
                ctdb_fatal(ctdb, "transport failed to initialise");
        }
+       if (public_address_list) {
+               ret = ctdb_set_public_addresses(ctdb, public_address_list);
+               if (ret == -1) {
+                       DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
+                       exit(1);
+               }
+       }
+
+
+       /* attach to existing databases */
+       if (ctdb_attach_databases(ctdb) != 0) {
+               ctdb_fatal(ctdb, "Failed to attach to databases\n");
+       }
 
-       /* attach to any existing persistent databases */
-       if (ctdb_attach_persistent(ctdb) != 0) {
-               ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
+       ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
+       if (ret != 0) {
+               ctdb_fatal(ctdb, "Failed to run init event\n");
        }
+       ctdb_run_notification_script(ctdb, "init");
 
        /* start frozen, then let the first election sort things out */
-       if (!ctdb_blocking_freeze(ctdb)) {
+       if (ctdb_blocking_freeze(ctdb)) {
                ctdb_fatal(ctdb, "Failed to get initial freeze\n");
        }
 
        /* now start accepting clients, only can do this once frozen */
        fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
-                          EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+                          EVENT_FD_READ,
                           ctdb_accept_client, ctdb);
-
-       /* tell all other nodes we've just started up */
-       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
-                                0, CTDB_CONTROL_STARTUP, 0,
-                                CTDB_CTRL_FLAG_NOREPLY,
-                                tdb_null, NULL, NULL);
+       tevent_fd_set_auto_close(fde);
 
        /* release any IPs we hold from previous runs of the daemon */
-       ctdb_release_all_ips(ctdb);
+       if (ctdb->tunable.disable_ip_failover == 0) {
+               ctdb_release_all_ips(ctdb);
+       }
 
        /* start the transport going */
        ctdb_start_transport(ctdb);
@@ -768,6 +1166,28 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
                DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
                exit(1);
        }
+
+       ret = ctdb_event_script_callback(ctdb,
+                                        ctdb,
+                                        ctdb_setup_event_callback,
+                                        ctdb,
+                                        false,
+                                        CTDB_EVENT_SETUP,
+                                        "%s",
+                                        "");
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
+               exit(1);
+       }
+
+       if (use_syslog) {
+               if (start_syslog_daemon(ctdb)) {
+                       DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
+                       exit(10);
+               }
+       }
+
+       ctdb_lockdown_memory(ctdb);
          
        /* go into a wait loop to allow other nodes to complete */
        event_loop_wait(ctdb->ev);
@@ -792,7 +1212,7 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
        size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
+               DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
                         operation, (unsigned)length));
                return NULL;
        }
@@ -836,6 +1256,7 @@ static void daemon_control_callback(struct ctdb_context *ctdb,
        struct ctdb_client *client = state->client;
        struct ctdb_reply_control *r;
        size_t len;
+       int ret;
 
        /* construct a message to send to the client containing the data */
        len = offsetof(struct ctdb_reply_control, data) + data.dsize;
@@ -856,9 +1277,10 @@ static void daemon_control_callback(struct ctdb_context *ctdb,
                memcpy(&r->data[r->datalen], errormsg, r->errorlen);
        }
 
-       daemon_queue_send(client, &r->hdr);
-
-       talloc_free(state);
+       ret = daemon_queue_send(client, &r->hdr);
+       if (ret != -1) {
+               talloc_free(state);
+       }
 }
 
 /*
@@ -1014,6 +1436,11 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
        struct ctdb_req_message *r;
        int len;
 
+       if (ctdb->methods == NULL) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
+               return -1;
+       }
+
        /* see if this is a message to ourselves */
        if (pnn == ctdb->pnn) {
                return ctdb_local_message(ctdb, srvid, data);
@@ -1035,3 +1462,144 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
        return 0;
 }
 
+
+
+struct ctdb_client_notify_list {
+       struct ctdb_client_notify_list *next, *prev;
+       struct ctdb_context *ctdb;
+       uint64_t srvid;
+       TDB_DATA data;
+};
+
+
+static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
+{
+       int ret;
+
+       DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
+
+       ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
+       }
+
+       return 0;
+}
+
+int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
+{
+       struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
+        struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
+       struct ctdb_client_notify_list *nl;
+
+       DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
+
+       if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
+               DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
+               return -1;
+       }
+
+       if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
+               DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
+               return -1;
+       }
+
+
+        if (client == NULL) {
+                DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
+                return -1;
+        }
+
+       for(nl=client->notify; nl; nl=nl->next) {
+               if (nl->srvid == notify->srvid) {
+                       break;
+               }
+       }
+       if (nl != NULL) {
+                DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
+                return -1;
+        }
+
+       nl = talloc(client, struct ctdb_client_notify_list);
+       CTDB_NO_MEMORY(ctdb, nl);
+       nl->ctdb       = ctdb;
+       nl->srvid      = notify->srvid;
+       nl->data.dsize = notify->len;
+       nl->data.dptr  = talloc_size(nl, nl->data.dsize);
+       CTDB_NO_MEMORY(ctdb, nl->data.dptr);
+       memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
+       
+       DLIST_ADD(client->notify, nl);
+       talloc_set_destructor(nl, ctdb_client_notify_destructor);
+
+       return 0;
+}
+
+int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
+{
+       struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
+        struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
+       struct ctdb_client_notify_list *nl;
+
+       DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
+
+        if (client == NULL) {
+                DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
+                return -1;
+        }
+
+       for(nl=client->notify; nl; nl=nl->next) {
+               if (nl->srvid == notify->srvid) {
+                       break;
+               }
+       }
+       if (nl == NULL) {
+                DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
+                return -1;
+        }
+
+       DLIST_REMOVE(client->notify, nl);
+       talloc_set_destructor(nl, NULL);
+       talloc_free(nl);
+
+       return 0;
+}
+
+struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
+{
+       struct ctdb_client_pid_list *client_pid;
+
+       for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
+               if (client_pid->pid == pid) {
+                       return client_pid->client;
+               }
+       }
+       return NULL;
+}
+
+
+/* This control is used by samba when probing if a process (of a samba daemon)
+   exists on the node.
+   Samba does this when it needs/wants to check if a subrecord in one of the
+   databases is still valied, or if it is stale and can be removed.
+   If the node is in unhealthy or stopped state we just kill of the samba
+   process holding htis sub-record and return to the calling samba that
+   the process does not exist.
+   This allows us to forcefully recall subrecords registered by samba processes
+   on banned and stopped nodes.
+*/
+int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
+{
+        struct ctdb_client *client;
+
+       if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
+               client = ctdb_find_client_by_pid(ctdb, pid);
+               if (client != NULL) {
+                       DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
+                       talloc_free(client);
+               }
+               return -1;
+       }
+
+       return kill(pid, 0);
+}