ctdb-daemon: Increment pending calls statistics correctly
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_daemon.c
index 75344ad386c73bd2ea8b52fa9d859e2e14eb42c9..4ae56a9eb7bc9e81af81fa0db0ee5665d5608ff4 100644 (file)
 */
 
 #include "includes.h"
-#include "db_wrap.h"
-#include "lib/tdb/include/tdb.h"
-#include "lib/tevent/tevent.h"
+#include "lib/tdb_wrap/tdb_wrap.h"
+#include "tdb.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/wait.h"
+#include "../include/ctdb_version.h"
 #include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 #include <sys/socket.h>
 
 struct ctdb_client_pid_list {
@@ -36,11 +37,20 @@ struct ctdb_client_pid_list {
        struct ctdb_client *client;
 };
 
+const char *ctdbd_pidfile = NULL;
+
 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
 
 static void print_exit_message(void)
 {
-       DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
+       if (debug_extra != NULL && debug_extra[0] != '\0') {
+               DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
+       } else {
+               DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
+
+               /* Wait a second to allow pending log messages to be flushed */
+               sleep(1);
+       }
 }
 
 
@@ -50,7 +60,7 @@ static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
 {
        struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
 
-       if (getpid() != ctdbd_pid) {
+       if (getpid() != ctdb->ctdbd_pid) {
                return;
        }
 
@@ -69,36 +79,11 @@ static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
                        ctdb_time_tick, ctdb);
 }
 
-
-/* called when the "startup" event script has finished */
-static void ctdb_start_transport(struct ctdb_context *ctdb)
+static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
 {
-       if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
-               ctdb_fatal(ctdb, "transport is not initialized but startup completed");
-       }
-
-       /* start the transport running */
-       if (ctdb->methods->start(ctdb) != 0) {
-               DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
-               ctdb_fatal(ctdb, "transport failed to start");
-       }
-
-       /* start the recovery daemon process */
-       if (ctdb_start_recoverd(ctdb) != 0) {
-               DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
-               exit(11);
-       }
-
-       /* Make sure we log something when the daemon terminates */
-       atexit(print_exit_message);
-
        /* start monitoring for connected/disconnected nodes */
        ctdb_start_keepalive(ctdb);
 
-       /* start monitoring for node health */
-       ctdb_start_monitoring(ctdb);
-
        /* start periodic update of tcp tickle lists */
                ctdb_start_tcp_tickle_update(ctdb);
 
@@ -109,7 +94,7 @@ static void ctdb_start_transport(struct ctdb_context *ctdb)
        ctdb_start_time_tickd(ctdb);
 }
 
-static void block_signal(int signum)
+static void ignore_signal(int signum)
 {
        struct sigaction act;
 
@@ -204,6 +189,36 @@ int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client
        return ctdb_deregister_message_handler(ctdb, srvid, client);
 }
 
+int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
+                       TDB_DATA *outdata)
+{
+       uint64_t *ids;
+       int i, num_ids;
+       uint8_t *results;
+
+       if ((indata.dsize % sizeof(uint64_t)) != 0) {
+               DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
+                                 "size=%d\n", (int)indata.dsize));
+               return -1;
+       }
+
+       ids = (uint64_t *)indata.dptr;
+       num_ids = indata.dsize / 8;
+
+       results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
+       if (results == NULL) {
+               DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
+               return -1;
+       }
+       for (i=0; i<num_ids; i++) {
+               if (ctdb_check_message_handler(ctdb, ids[i])) {
+                       results[i/8] |= (1 << (i%8));
+               }
+       }
+       outdata->dptr = (uint8_t *)results;
+       outdata->dsize = talloc_get_size(results);
+       return 0;
+}
 
 /*
   destroy a ctdb_client
@@ -214,7 +229,7 @@ static int ctdb_client_destructor(struct ctdb_client *client)
 
        ctdb_takeover_client_destructor_hook(client);
        ctdb_reqid_remove(client->ctdb, client->client_id);
-       CTDB_DECREMENT_STAT(client->ctdb, num_clients);
+       client->ctdb->num_clients--;
 
        if (client->num_persistent_updates != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
@@ -226,9 +241,6 @@ static int ctdb_client_destructor(struct ctdb_client *client)
                                  "commit active. Forcing recovery.\n"));
                client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
 
-               /* legacy trans2 transaction state: */
-               ctdb_db->transaction_active = false;
-
                /*
                 * trans3 transaction state:
                 *
@@ -251,6 +263,10 @@ static void daemon_request_message_from_client(struct ctdb_client *client,
        TDB_DATA data;
        int res;
 
+       if (c->hdr.destnode == CTDB_CURRENT_NODE) {
+               c->hdr.destnode = ctdb_get_pnn(client->ctdb);
+       }
+
        /* maybe the message is for another client on this node */
        if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
                ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
@@ -274,6 +290,10 @@ struct daemon_call_state {
        uint32_t reqid;
        struct ctdb_call *call;
        struct timeval start_time;
+
+       /* readonly request ? */
+       uint32_t readonly_fetch;
+       uint32_t client_callid;
 };
 
 /* 
@@ -302,6 +322,16 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
        }
 
        length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
+       /* If the client asked for readonly FETCH, we remapped this to 
+          FETCH_WITH_HEADER when calling the daemon. So we must
+          strip the extra header off the reply data before passing
+          it back to the client.
+       */
+       if (dstate->readonly_fetch
+       && dstate->client_callid == CTDB_FETCH_FUNC) {
+               length -= sizeof(struct ctdb_ltdb_header);
+       }
+
        r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
                               length, struct ctdb_reply_call);
        if (r == NULL) {
@@ -311,8 +341,19 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
                return;
        }
        r->hdr.reqid        = dstate->reqid;
-       r->datalen          = dstate->call->reply_data.dsize;
-       memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
+       r->status           = dstate->call->status;
+
+       if (dstate->readonly_fetch
+       && dstate->client_callid == CTDB_FETCH_FUNC) {
+               /* client only asked for a FETCH so we must strip off
+                  the extra ctdb_ltdb header
+               */
+               r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
+               memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
+       } else {
+               r->datalen          = dstate->call->reply_data.dsize;
+               memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
+       }
 
        res = daemon_queue_send(client, &r->hdr);
        if (res == -1) {
@@ -358,6 +399,184 @@ static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
        daemon_incoming_packet(client, hdr);    
 }
 
+struct ctdb_deferred_fetch_call {
+       struct ctdb_deferred_fetch_call *next, *prev;
+       struct ctdb_req_call *c;
+       struct ctdb_daemon_packet_wrap *w;
+};
+
+struct ctdb_deferred_fetch_queue {
+       struct ctdb_deferred_fetch_call *deferred_calls;
+};
+
+struct ctdb_deferred_requeue {
+       struct ctdb_deferred_fetch_call *dfc;
+       struct ctdb_client *client;
+};
+
+/* called from a timer event and starts reprocessing the deferred call.*/
+static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
+                                      struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
+       struct ctdb_client *client = dfr->client;
+
+       talloc_steal(client, dfr->dfc->c);
+       daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
+       talloc_free(dfr);
+}
+
+/* the referral context is destroyed either after a timeout or when the initial
+   fetch-lock has finished.
+   at this stage, immediately start reprocessing the queued up deferred
+   calls so they get reprocessed immediately (and since we are dmaster at
+   this stage, trigger the waiting smbd processes to pick up and aquire the
+   record right away.
+*/
+static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
+{
+
+       /* need to reprocess the packets from the queue explicitely instead of
+          just using a normal destructor since we want, need, to
+          call the clients in the same oder as the requests queued up
+       */
+       while (dfq->deferred_calls != NULL) {
+               struct ctdb_client *client;
+               struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
+               struct ctdb_deferred_requeue *dfr;
+
+               DLIST_REMOVE(dfq->deferred_calls, dfc);
+
+               client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
+               if (client == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
+                                dfc->w->client_id));
+                       continue;
+               }
+
+               /* process it by pushing it back onto the eventloop */
+               dfr = talloc(client, struct ctdb_deferred_requeue);
+               if (dfr == NULL) {
+                       DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
+                       continue;
+               }
+
+               dfr->dfc    = talloc_steal(dfr, dfc);
+               dfr->client = client;
+
+               event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
+       }
+
+       return 0;
+}
+
+/* insert the new deferral context into the rb tree.
+   there should never be a pre-existing context here, but check for it
+   warn and destroy the previous context if there is already a deferral context
+   for this key.
+*/
+static void *insert_dfq_callback(void *parm, void *data)
+{
+        if (data) {
+               DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
+                talloc_free(data);
+        }
+        return parm;
+}
+
+/* if the original fetch-lock did not complete within a reasonable time,
+   free the context and context for all deferred requests to cause them to be
+   re-inserted into the event system.
+*/
+static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
+                                 struct timeval t, void *private_data)
+{
+       talloc_free(private_data);
+}
+
+/* This function is used in the local daemon to register a KEY in a database
+   for being "fetched"
+   While the remote fetch is in-flight, any futher attempts to re-fetch the
+   same record will be deferred until the fetch completes.
+*/
+static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+
+       k = ctdb_key_to_idkey(call, call->key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
+       if (dfq == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
+               talloc_free(k);
+               return -1;
+       }
+       dfq->deferred_calls = NULL;
+
+       trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
+
+       talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
+
+       /* if the fetch havent completed in 30 seconds, just tear it all down
+          and let it try again as the events are reissued */
+       event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
+
+       talloc_free(k);
+       return 0;
+}
+
+/* check if this is a duplicate request to a fetch already in-flight
+   if it is, make this call deferred to be reprocessed later when
+   the in-flight fetch completes.
+*/
+static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+       struct ctdb_deferred_fetch_call *dfc;
+
+       k = ctdb_key_to_idkey(c, key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
+       if (dfq == NULL) {
+               talloc_free(k);
+               return -1;
+       }
+
+
+       talloc_free(k);
+
+       dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
+       if (dfc == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
+               return -1;
+       }
+
+       dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
+       if (dfc->w == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
+               talloc_free(dfc);
+               return -1;
+       }
+
+       dfc->c = talloc_steal(dfc, c);
+       dfc->w->ctdb = ctdb_db->ctdb;
+       dfc->w->client_id = client->client_id;
+
+       DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
+
+       return 0;
+}
+
 
 /*
   this is called when the ctdb daemon received a ctdb request call
@@ -377,7 +596,7 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
        struct ctdb_daemon_packet_wrap *w;
 
        CTDB_INCREMENT_STAT(ctdb, total_calls);
-       CTDB_DECREMENT_STAT(ctdb, pending_calls);
+       CTDB_INCREMENT_STAT(ctdb, pending_calls);
 
        ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
        if (!ctdb_db) {
@@ -408,7 +627,7 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
                                           (struct ctdb_req_header *)c, &data,
-                                          daemon_incoming_packet_wrap, w, True);
+                                          daemon_incoming_packet_wrap, w, true);
        if (ret == -2) {
                /* will retry later */
                CTDB_DECREMENT_STAT(ctdb, pending_calls);
@@ -423,6 +642,73 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                return;
        }
 
+
+       /* check if this fetch request is a duplicate for a
+          request we already have in flight. If so defer it until
+          the first request completes.
+       */
+       if (ctdb->tunable.fetch_collapse == 1) {
+               if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
+                       ret = ctdb_ltdb_unlock(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+                       }
+                       return;
+               }
+       }
+
+       /* Dont do READONLY if we dont have a tracking database */
+       if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
+               c->flags &= ~CTDB_WANT_READONLY;
+       }
+
+       if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
+               header.flags &= ~CTDB_REC_RO_FLAGS;
+               CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
+               CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
+               if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
+               }
+               /* and clear out the tracking data */
+               if (tdb_delete(ctdb_db->rottdb, key) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
+               }
+       }
+
+       /* if we are revoking, we must defer all other calls until the revoke
+        * had completed.
+        */
+       if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
+               talloc_free(data.dptr);
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+
+               if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
+                       ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+               }
+               return;
+       }
+
+       if ((header.dmaster == ctdb->pnn)
+       && (!(c->flags & CTDB_WANT_READONLY))
+       && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
+               header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
+               if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
+               }
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+
+               if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to start record revoke");
+               }
+               talloc_free(data.dptr);
+
+               if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
+                       ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+               }
+
+               return;
+       }               
+
        dstate = talloc(client, struct daemon_call_state);
        if (dstate == NULL) {
                ret = ctdb_ltdb_unlock(ctdb_db, key);
@@ -452,16 +738,36 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                return;
        }
 
+       dstate->readonly_fetch = 0;
        call->call_id = c->callid;
        call->key = key;
        call->call_data.dptr = c->data + c->keylen;
        call->call_data.dsize = c->calldatalen;
        call->flags = c->flags;
 
+       if (c->flags & CTDB_WANT_READONLY) {
+               /* client wants readonly record, so translate this into a 
+                  fetch with header. remember what the client asked for
+                  so we can remap the reply back to the proper format for
+                  the client in the reply
+                */
+               dstate->client_callid = call->call_id;
+               call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
+               dstate->readonly_fetch = 1;
+       }
+
        if (header.dmaster == ctdb->pnn) {
                state = ctdb_call_local_send(ctdb_db, call, &header, &data);
        } else {
                state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+               if (ctdb->tunable.fetch_collapse == 1) {
+                       /* This request triggered a remote fetch-lock.
+                          set up a deferral for this key so any additional
+                          fetch-locks are deferred until the current one
+                          finishes.
+                        */
+                       setup_deferred_fetch_locks(ctdb_db, call);
+               }
        }
 
        ret = ctdb_ltdb_unlock(ctdb_db, key);
@@ -600,13 +906,7 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
        struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
        struct ctdb_client *client;
        struct ctdb_client_pid_list *client_pid;
-#ifdef _AIX
-       struct peercred_struct cr;
-       socklen_t crl = sizeof(struct peercred_struct);
-#else
-       struct ucred cr;
-       socklen_t crl = sizeof(struct ucred);
-#endif
+       pid_t peer_pid = 0;
 
        memset(&addr, 0, sizeof(addr));
        len = sizeof(addr);
@@ -621,18 +921,14 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
        DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
 
        client = talloc_zero(ctdb, struct ctdb_client);
-#ifdef _AIX
-       if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
-#else
-       if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
-#endif
-               DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
+       if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
+               DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
        }
 
        client->ctdb = ctdb;
        client->fd = fd;
        client->client_id = ctdb_reqid_new(ctdb, client);
-       client->pid = cr.pid;
+       client->pid = peer_pid;
 
        client_pid = talloc(client, struct ctdb_client_pid_list);
        if (client_pid == NULL) {
@@ -642,7 +938,7 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
                return;
        }               
        client_pid->ctdb   = ctdb;
-       client_pid->pid    = cr.pid;
+       client_pid->pid    = peer_pid;
        client_pid->client = client;
 
        DLIST_ADD(ctdb->client_pids, client_pid);
@@ -653,7 +949,7 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
 
        talloc_set_destructor(client, ctdb_client_destructor);
        talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
-       CTDB_INCREMENT_STAT(ctdb, num_clients);
+       ctdb->num_clients++;
 }
 
 
@@ -671,23 +967,35 @@ static int ux_socket_bind(struct ctdb_context *ctdb)
                return -1;
        }
 
-       set_close_on_exec(ctdb->daemon.sd);
-       set_nonblocking(ctdb->daemon.sd);
-
        memset(&addr, 0, sizeof(addr));
        addr.sun_family = AF_UNIX;
-       strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
+       strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
+
+       /* First check if an old ctdbd might be running */
+       if (connect(ctdb->daemon.sd,
+                   (struct sockaddr *)&addr, sizeof(addr)) == 0) {
+               DEBUG(DEBUG_CRIT,
+                     ("Something is already listening on ctdb socket '%s'\n",
+                      ctdb->daemon.name));
+               goto failed;
+       }
+
+       /* Remove any old socket */
+       unlink(ctdb->daemon.name);
+
+       set_close_on_exec(ctdb->daemon.sd);
+       set_nonblocking(ctdb->daemon.sd);
 
        if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
                DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
                goto failed;
-       }       
+       }
 
        if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
            chmod(ctdb->daemon.name, 0700) != 0) {
                DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
                goto failed;
-       } 
+       }
 
 
        if (listen(ctdb->daemon.sd, 100) != 0) {
@@ -703,24 +1011,23 @@ failed:
        return -1;      
 }
 
-static void sig_child_handler(struct event_context *ev,
-       struct signal_event *se, int signum, int count,
-       void *dont_care, 
-       void *private_data)
+static void initialise_node_flags (struct ctdb_context *ctdb)
 {
-//     struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
-       int status;
-       pid_t pid = -1;
-
-       while (pid != 0) {
-               pid = waitpid(-1, &status, WNOHANG);
-               if (pid == -1) {
-                       DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
-                       return;
-               }
-               if (pid > 0) {
-                       DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
-               }
+       if (ctdb->pnn == -1) {
+               ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
+       }
+
+       ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
+
+       /* do we start out in DISABLED mode? */
+       if (ctdb->start_as_disabled != 0) {
+               DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
+               ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
+       }
+       /* do we start out in STOPPED mode? */
+       if (ctdb->start_as_stopped != 0) {
+               DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
+               ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
        }
 }
 
@@ -728,8 +1035,7 @@ static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
                                      void *private_data)
 {
        if (status != 0) {
-               ctdb_fatal(ctdb, "Failed to run setup event\n");
-               return;
+               ctdb_die(ctdb, "Failed to run setup event");
        }
        ctdb_run_notification_script(ctdb, "setup");
 
@@ -738,25 +1044,111 @@ static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
                                 0, CTDB_CONTROL_STARTUP, 0,
                                 CTDB_CTRL_FLAG_NOREPLY,
                                 tdb_null, NULL, NULL);
+
+       /* Start the recovery daemon */
+       if (ctdb_start_recoverd(ctdb) != 0) {
+               DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
+               exit(11);
+       }
+
+       ctdb_start_periodic_events(ctdb);
+
+       ctdb_wait_for_first_recovery(ctdb);
+}
+
+static struct timeval tevent_before_wait_ts;
+static struct timeval tevent_after_wait_ts;
+
+static void ctdb_tevent_trace(enum tevent_trace_point tp,
+                             void *private_data)
+{
+       struct timeval diff;
+       struct timeval now;
+       struct ctdb_context *ctdb =
+               talloc_get_type(private_data, struct ctdb_context);
+
+       if (getpid() != ctdb->ctdbd_pid) {
+               return;
+       }
+
+       now = timeval_current();
+
+       switch (tp) {
+       case TEVENT_TRACE_BEFORE_WAIT:
+               if (!timeval_is_zero(&tevent_after_wait_ts)) {
+                       diff = timeval_until(&tevent_after_wait_ts, &now);
+                       if (diff.tv_sec > 3) {
+                               DEBUG(DEBUG_ERR,
+                                     ("Handling event took %ld seconds!\n",
+                                      diff.tv_sec));
+                       }
+               }
+               tevent_before_wait_ts = now;
+               break;
+
+       case TEVENT_TRACE_AFTER_WAIT:
+               if (!timeval_is_zero(&tevent_before_wait_ts)) {
+                       diff = timeval_until(&tevent_before_wait_ts, &now);
+                       if (diff.tv_sec > 3) {
+                               DEBUG(DEBUG_CRIT,
+                                     ("No event for %ld seconds!\n",
+                                      diff.tv_sec));
+                       }
+               }
+               tevent_after_wait_ts = now;
+               break;
+
+       default:
+               /* Do nothing for future tevent trace points */ ;
+       }
+}
+
+static void ctdb_remove_pidfile(void)
+{
+       /* Only the main ctdbd's PID matches the SID */
+       if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
+               if (unlink(ctdbd_pidfile) == 0) {
+                       DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
+                                            ctdbd_pidfile));
+               } else {
+                       DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
+                                             ctdbd_pidfile));
+               }
+       }
+}
+
+static void ctdb_create_pidfile(pid_t pid)
+{
+       if (ctdbd_pidfile != NULL) {
+               FILE *fp;
+
+               fp = fopen(ctdbd_pidfile, "w");
+               if (fp == NULL) {
+                       DEBUG(DEBUG_ALERT,
+                             ("Failed to open PID file %s\n", ctdbd_pidfile));
+                       exit(11);
+               }
+
+               fprintf(fp, "%d\n", pid);
+               fclose(fp);
+               DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
+               atexit(ctdb_remove_pidfile);
+       }
 }
 
 /*
   start the protocol going as a daemon
 */
-int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
+int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
 {
        int res, ret = -1;
        struct fd_event *fde;
        const char *domain_socket_name;
-       struct signal_event *se;
-
-       /* get rid of any old sockets */
-       unlink(ctdb->daemon.name);
 
        /* create a unix domain stream socket to listen to */
        res = ux_socket_bind(ctdb);
        if (res!=0) {
-               DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
+               DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
                exit(10);
        }
 
@@ -764,27 +1156,34 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog,
                return 0;
        }
 
-       tdb_reopen_all(False);
+       tdb_reopen_all(false);
 
        if (do_fork) {
-               setsid();
+               if (setsid() == -1) {
+                       ctdb_die(ctdb, "Failed to setsid()\n");
+               }
                close(0);
                if (open("/dev/null", O_RDONLY) != 0) {
                        DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
                        exit(11);
                }
        }
-       block_signal(SIGPIPE);
+       ignore_signal(SIGPIPE);
 
-       ctdbd_pid = getpid();
-       ctdb->ctdbd_pid = ctdbd_pid;
+       ctdb->ctdbd_pid = getpid();
+       DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
+                         CTDB_VERSION_STRING, ctdb->ctdbd_pid));
+       ctdb_create_pidfile(ctdb->ctdbd_pid);
 
-
-       DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
+       /* Make sure we log something when the daemon terminates.
+        * This must be the first exit handler to run (so the last to
+        * be registered.
+        */
+       atexit(print_exit_message);
 
        if (ctdb->do_setsched) {
                /* try to set us up as realtime */
-               ctdb_set_scheduler(ctdb);
+               set_scheduler();
        }
 
        /* ensure the socket is deleted on exit of the daemon */
@@ -796,13 +1195,26 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog,
 
        ctdb->ev = event_context_init(NULL);
        tevent_loop_allow_nesting(ctdb->ev);
+       tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
        ret = ctdb_init_tevent_logging(ctdb);
        if (ret != 0) {
                DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
                exit(1);
        }
 
+       /* set up a handler to pick up sigchld */
+       if (ctdb_init_sigchld(ctdb) == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
+               exit(1);
+       }
+
        ctdb_set_child_logging(ctdb);
+       if (use_syslog) {
+               if (start_syslog_daemon(ctdb)) {
+                       DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
+                       exit(10);
+               }
+       }
 
        /* initialize statistics collection */
        ctdb_statistics_init(ctdb);
@@ -810,13 +1222,18 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog,
        /* force initial recovery for election */
        ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
 
+       ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
+       ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
+       if (ret != 0) {
+               ctdb_die(ctdb, "Failed to run init event\n");
+       }
+       ctdb_run_notification_script(ctdb, "init");
+
        if (strcmp(ctdb->transport, "tcp") == 0) {
-               int ctdb_tcp_init(struct ctdb_context *);
                ret = ctdb_tcp_init(ctdb);
        }
 #ifdef USE_INFINIBAND
        if (strcmp(ctdb->transport, "ib") == 0) {
-               int ctdb_ibw_init(struct ctdb_context *);
                ret = ctdb_ibw_init(ctdb);
        }
 #endif
@@ -834,12 +1251,18 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog,
        if (ctdb->methods->initialise(ctdb) != 0) {
                ctdb_fatal(ctdb, "transport failed to initialise");
        }
-       if (public_address_list) {
-               ret = ctdb_set_public_addresses(ctdb, public_address_list);
+
+       initialise_node_flags(ctdb);
+
+       if (ctdb->public_addresses_file) {
+               ret = ctdb_set_public_addresses(ctdb, true);
                if (ret == -1) {
                        DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
                        exit(1);
                }
+               if (ctdb->do_checkpublicip) {
+                       ctdb_start_monitoring_interfaces(ctdb);
+               }
        }
 
 
@@ -848,14 +1271,8 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog,
                ctdb_fatal(ctdb, "Failed to attach to databases\n");
        }
 
-       ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
-       if (ret != 0) {
-               ctdb_fatal(ctdb, "Failed to run init event\n");
-       }
-       ctdb_run_notification_script(ctdb, "init");
-
        /* start frozen, then let the first election sort things out */
-       if (ctdb_blocking_freeze(ctdb)) {
+       if (!ctdb_blocking_freeze(ctdb)) {
                ctdb_fatal(ctdb, "Failed to get initial freeze\n");
        }
 
@@ -863,6 +1280,9 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog,
        fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
                           EVENT_FD_READ,
                           ctdb_accept_client, ctdb);
+       if (fde == NULL) {
+               ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
+       }
        tevent_fd_set_auto_close(fde);
 
        /* release any IPs we hold from previous runs of the daemon */
@@ -870,40 +1290,31 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog,
                ctdb_release_all_ips(ctdb);
        }
 
-       /* start the transport going */
-       ctdb_start_transport(ctdb);
-
-       /* set up a handler to pick up sigchld */
-       se = event_add_signal(ctdb->ev, ctdb,
-                                    SIGCHLD, 0,
-                                    sig_child_handler,
-                                    ctdb);
-       if (se == NULL) {
-               DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
-               exit(1);
+       /* Start the transport */
+       if (ctdb->methods->start(ctdb) != 0) {
+               DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
+               ctdb_fatal(ctdb, "transport failed to start");
        }
 
+       /* Recovery daemon and timed events are started from the
+        * callback, only after the setup event completes
+        * successfully.
+        */
+       ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
        ret = ctdb_event_script_callback(ctdb,
                                         ctdb,
                                         ctdb_setup_event_callback,
                                         ctdb,
-                                        false,
                                         CTDB_EVENT_SETUP,
+                                        "%s",
                                         "");
        if (ret != 0) {
                DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
                exit(1);
        }
 
-       if (use_syslog) {
-               if (start_syslog_daemon(ctdb)) {
-                       DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
-                       exit(10);
-               }
-       }
+       lockdown_memory(ctdb->valgrinding);
 
-       ctdb_lockdown_memory(ctdb);
-         
        /* go into a wait loop to allow other nodes to complete */
        event_loop_wait(ctdb->ev);
 
@@ -1318,3 +1729,25 @@ int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
 
        return kill(pid, 0);
 }
+
+void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
+{
+       if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
+               DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
+               return;
+       }
+
+       DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
+       ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
+       ctdb_stop_recoverd(ctdb);
+       ctdb_stop_keepalive(ctdb);
+       ctdb_stop_monitoring(ctdb);
+       ctdb_release_all_ips(ctdb);
+       ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
+       if (ctdb->methods != NULL) {
+               ctdb->methods->shutdown(ctdb);
+       }
+
+       DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
+       exit(exit_code);
+}