ctdbd: Replace ctdb->done_startup with ctdb->runstate
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
index 2a881c5d5af1635ec77f8c8ffbf31553522f0918..0426d96bdbc584284d500c94996ba4f5d7efb2bc 100644 (file)
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
 #include "lib/tdb/include/tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/dir.h"
+#include "system/time.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 #include "db_wrap.h"
 #include "lib/util/dlinklist.h"
+#include <ctype.h>
 
-/*
-  this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
-{
-       return 0;
-}
+#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
 
-/*
-  this is a plain fetch procedure that all databases support
-*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
+/**
+ * write a record to a normal database
+ *
+ * This is the server-variant of the ctdb_ltdb_store function.
+ * It contains logic to determine whether a record should be
+ * stored or deleted. It also sends SCHEDULE_FOR_DELETION
+ * controls to the local ctdb daemon if apporpriate.
+ */
+static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
+                                 TDB_DATA key,
+                                 struct ctdb_ltdb_header *header,
+                                 TDB_DATA data)
 {
-       call->reply_data = &call->record_data;
-       return 0;
-}
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+       TDB_DATA rec;
+       int ret;
+       bool seqnum_suppressed = false;
+       bool keep = false;
+       bool schedule_for_deletion = false;
+       bool remove_from_delete_queue = false;
+       uint32_t lmaster;
+
+       if (ctdb->flags & CTDB_FLAG_TORTURE) {
+               struct ctdb_ltdb_header *h2;
+               rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
+               h2 = (struct ctdb_ltdb_header *)rec.dptr;
+               if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
+                       DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
+                                (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+               }
+               if (rec.dptr) free(rec.dptr);
+       }
+
+       if (ctdb->vnn_map == NULL) {
+               /*
+                * Called from a client: always store the record
+                * Also don't call ctdb_lmaster since it uses the vnn_map!
+                */
+               keep = true;
+               goto store;
+       }
+
+       lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
+
+       /*
+        * If we migrate an empty record off to another node
+        * and the record has not been migrated with data,
+        * delete the record instead of storing the empty record.
+        */
+       if (data.dsize != 0) {
+               keep = true;
+       } else if (header->flags & CTDB_REC_RO_FLAGS) {
+               keep = true;
+       } else if (ctdb_db->persistent) {
+               keep = true;
+       } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
+               /*
+                * The record is not created by the client but
+                * automatically by the ctdb_ltdb_fetch logic that
+                * creates a record with an initial header in the
+                * ltdb before trying to migrate the record from
+                * the current lmaster. Keep it instead of trying
+                * to delete the non-existing record...
+                */
+               keep = true;
+               schedule_for_deletion = true;
+       } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
+               keep = true;
+       } else if (ctdb_db->ctdb->pnn == lmaster) {
+               /*
+                * If we are lmaster, then we usually keep the record.
+                * But if we retrieve the dmaster role by a VACUUM_MIGRATE
+                * and the record is empty and has never been migrated
+                * with data, then we should delete it instead of storing it.
+                * This is part of the vacuuming process.
+                *
+                * The reason that we usually need to store even empty records
+                * on the lmaster is that a client operating directly on the
+                * lmaster (== dmaster) expects the local copy of the record to
+                * exist after successful ctdb migrate call. If the record does
+                * not exist, the client goes into a migrate loop and eventually
+                * fails. So storing the empty record makes sure that we do not
+                * need to change the client code.
+                */
+               if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
+                       keep = true;
+               } else if (ctdb_db->ctdb->pnn != header->dmaster) {
+                       keep = true;
+               }
+       } else if (ctdb_db->ctdb->pnn == header->dmaster) {
+               keep = true;
+       }
 
+       if (keep) {
+               if (!ctdb_db->persistent &&
+                   (ctdb_db->ctdb->pnn == header->dmaster) &&
+                   !(header->flags & CTDB_REC_RO_FLAGS))
+               {
+                       header->rsn++;
+
+                       if (data.dsize == 0) {
+                               schedule_for_deletion = true;
+                       }
+               }
+               remove_from_delete_queue = !schedule_for_deletion;
+       }
+
+store:
+       /*
+        * The VACUUM_MIGRATED flag is only set temporarily for
+        * the above logic when the record was retrieved by a
+        * VACUUM_MIGRATE call and should not be stored in the
+        * database.
+        *
+        * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
+        * and there are two cases in which the corresponding record
+        * is stored in the local database:
+        * 1. The record has been migrated with data in the past
+        *    (the MIGRATED_WITH_DATA record flag is set).
+        * 2. The record has been filled with data again since it
+        *    had been submitted in the VACUUM_FETCH message to the
+        *    lmaster.
+        * For such records it is important to not store the
+        * VACUUM_MIGRATED flag in the database.
+        */
+       header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
+
+       /*
+        * Similarly, clear the AUTOMATIC flag which should not enter
+        * the local database copy since this would require client
+        * modifications to clear the flag when the client stores
+        * the record.
+        */
+       header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
+
+       rec.dsize = sizeof(*header) + data.dsize;
+       rec.dptr = talloc_size(ctdb, rec.dsize);
+       CTDB_NO_MEMORY(ctdb, rec.dptr);
+
+       memcpy(rec.dptr, header, sizeof(*header));
+       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+       /* Databases with seqnum updates enabled only get their seqnum
+          changes when/if we modify the data */
+       if (ctdb_db->seqnum_update != NULL) {
+               TDB_DATA old;
+               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+
+               if ( (old.dsize == rec.dsize)
+               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+                       tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
+                       seqnum_suppressed = true;
+               }
+               if (old.dptr) free(old.dptr);
+       }
 
+       DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
+                           ctdb_db->db_name,
+                           keep?"storing":"deleting",
+                           ctdb_hash(&key)));
+
+       if (keep) {
+               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+       } else {
+               ret = tdb_delete(ctdb_db->ltdb->tdb, key);
+       }
+
+       if (ret != 0) {
+               int lvl = DEBUG_ERR;
+
+               if (keep == false &&
+                   tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
+               {
+                       lvl = DEBUG_DEBUG;
+               }
+
+               DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
+                           "%d - %s\n",
+                           ctdb_db->db_name,
+                           keep?"store":"delete", ret,
+                           tdb_errorstr(ctdb_db->ltdb->tdb)));
+
+               schedule_for_deletion = false;
+               remove_from_delete_queue = false;
+       }
+       if (seqnum_suppressed) {
+               tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
+       }
+
+       talloc_free(rec.dptr);
+
+       if (schedule_for_deletion) {
+               int ret2;
+               ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
+               if (ret2 != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
+               }
+       }
+
+       if (remove_from_delete_queue) {
+               ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
+       }
+
+       return ret;
+}
 
 struct lock_fetch_state {
        struct ctdb_context *ctdb;
@@ -58,7 +251,7 @@ struct lock_fetch_state {
 /*
   called when we should retry the operation
  */
-static void lock_fetch_callback(void *p)
+static void lock_fetch_callback(void *p, bool locked)
 {
        struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
        if (!state->ignore_generation &&
@@ -81,9 +274,9 @@ static void lock_fetch_callback(void *p)
    1) tries to get the chainlock. If it succeeds, then it returns 0
 
    2) if it fails to get a chainlock immediately then it sets up a
-   non-blocking chainlock via ctdb_lockwait, and when it gets the
+   non-blocking chainlock via ctdb_lock_record, and when it gets the
    chainlock it re-submits this ctdb request to the main packet
-   receive function
+   receive function.
 
    This effectively queues all ctdb requests that cannot be
    immediately satisfied until it can get the lock. This means that
@@ -103,7 +296,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
 {
        int ret;
        struct tdb_context *tdb = ctdb_db->ltdb->tdb;
-       struct lockwait_handle *h;
+       struct lock_request *lreq;
        struct lock_fetch_state *state;
        
        ret = tdb_chainlock_nonblock(tdb, key);
@@ -135,16 +328,14 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
        state->ignore_generation = ignore_generation;
 
        /* now the contended path */
-       h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
-       if (h == NULL) {
-               tdb_chainunlock(tdb, key);
+       lreq = ctdb_lock_record(ctdb_db, key, true, lock_fetch_callback, state);
+       if (lreq == NULL) {
                return -1;
        }
 
        /* we need to move the packet off the temporary context in ctdb_input_pkt(),
           so it won't be freed yet */
        talloc_steal(state, hdr);
-       talloc_steal(state, h);
 
        /* now tell the caller than we will retry asynchronously */
        return -2;
@@ -166,7 +357,11 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
        if (ret == 0) {
                ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
                if (ret != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
+                       int uret;
+                       uret = ctdb_ltdb_unlock(ctdb_db, key);
+                       if (uret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
+                       }
                }
        }
        return ret;
@@ -187,17 +382,370 @@ static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
        }
 }
 
+int ctdb_load_persistent_health(struct ctdb_context *ctdb,
+                               struct ctdb_db_context *ctdb_db)
+{
+       struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
+       char *old;
+       char *reason = NULL;
+       TDB_DATA key;
+       TDB_DATA val;
+
+       key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
+       key.dsize = strlen(ctdb_db->db_name);
+
+       old = ctdb_db->unhealthy_reason;
+       ctdb_db->unhealthy_reason = NULL;
+
+       val = tdb_fetch(tdb, key);
+       if (val.dsize > 0) {
+               reason = talloc_strndup(ctdb_db,
+                                       (const char *)val.dptr,
+                                       val.dsize);
+               if (reason == NULL) {
+                       DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
+                                          (int)val.dsize));
+                       ctdb_db->unhealthy_reason = old;
+                       free(val.dptr);
+                       return -1;
+               }
+       }
+
+       if (val.dptr) {
+               free(val.dptr);
+       }
+
+       talloc_free(old);
+       ctdb_db->unhealthy_reason = reason;
+       return 0;
+}
+
+int ctdb_update_persistent_health(struct ctdb_context *ctdb,
+                                 struct ctdb_db_context *ctdb_db,
+                                 const char *given_reason,/* NULL means healthy */
+                                 int num_healthy_nodes)
+{
+       struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
+       int ret;
+       TDB_DATA key;
+       TDB_DATA val;
+       char *new_reason = NULL;
+       char *old_reason = NULL;
+
+       ret = tdb_transaction_start(tdb);
+       if (ret != 0) {
+               DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
+                                  tdb_name(tdb), ret, tdb_errorstr(tdb)));
+               return -1;
+       }
+
+       ret = ctdb_load_persistent_health(ctdb, ctdb_db);
+       if (ret != 0) {
+               DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
+                                  ctdb_db->db_name, ret));
+               return -1;
+       }
+       old_reason = ctdb_db->unhealthy_reason;
+
+       key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
+       key.dsize = strlen(ctdb_db->db_name);
+
+       if (given_reason) {
+               new_reason = talloc_strdup(ctdb_db, given_reason);
+               if (new_reason == NULL) {
+                       DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
+                                         given_reason));
+                       return -1;
+               }
+       } else if (old_reason && num_healthy_nodes == 0) {
+               /*
+                * If the reason indicates ok, but there where no healthy nodes
+                * available, that it means, we have not recovered valid content
+                * of the db. So if there's an old reason, prefix it with
+                * "NO-HEALTHY-NODES - "
+                */
+               const char *prefix;
+
+#define _TMP_PREFIX "NO-HEALTHY-NODES - "
+               ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
+               if (ret != 0) {
+                       prefix = _TMP_PREFIX;
+               } else {
+                       prefix = "";
+               }
+               new_reason = talloc_asprintf(ctdb_db, "%s%s",
+                                        prefix, old_reason);
+               if (new_reason == NULL) {
+                       DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
+                                         prefix, old_reason));
+                       return -1;
+               }
+#undef _TMP_PREFIX
+       }
+
+       if (new_reason) {
+               val.dptr = discard_const_p(uint8_t, new_reason);
+               val.dsize = strlen(new_reason);
+
+               ret = tdb_store(tdb, key, val, TDB_REPLACE);
+               if (ret != 0) {
+                       tdb_transaction_cancel(tdb);
+                       DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
+                                          tdb_name(tdb), ctdb_db->db_name, new_reason,
+                                          ret, tdb_errorstr(tdb)));
+                       talloc_free(new_reason);
+                       return -1;
+               }
+               DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
+                                  ctdb_db->db_name, new_reason));
+       } else if (old_reason) {
+               ret = tdb_delete(tdb, key);
+               if (ret != 0) {
+                       tdb_transaction_cancel(tdb);
+                       DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
+                                          tdb_name(tdb), ctdb_db->db_name,
+                                          ret, tdb_errorstr(tdb)));
+                       talloc_free(new_reason);
+                       return -1;
+               }
+               DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
+                                  ctdb_db->db_name));
+       }
+
+       ret = tdb_transaction_commit(tdb);
+       if (ret != TDB_SUCCESS) {
+               DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
+                                  tdb_name(tdb), ret, tdb_errorstr(tdb)));
+               talloc_free(new_reason);
+               return -1;
+       }
+
+       talloc_free(old_reason);
+       ctdb_db->unhealthy_reason = new_reason;
+
+       return 0;
+}
+
+static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
+                                    struct ctdb_db_context *ctdb_db)
+{
+       time_t now = time(NULL);
+       char *new_path;
+       char *new_reason;
+       int ret;
+       struct tm *tm;
+
+       tm = gmtime(&now);
+
+       /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
+       new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
+                                  "%04u%02u%02u%02u%02u%02u.0Z",
+                                  ctdb_db->db_path,
+                                  tm->tm_year+1900, tm->tm_mon+1,
+                                  tm->tm_mday, tm->tm_hour, tm->tm_min,
+                                  tm->tm_sec);
+       if (new_path == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
+               return -1;
+       }
+
+       new_reason = talloc_asprintf(ctdb_db,
+                                    "ERROR - Backup of corrupted TDB in '%s'",
+                                    new_path);
+       if (new_reason == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
+               return -1;
+       }
+       ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
+       talloc_free(new_reason);
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,(__location__
+                                ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
+                                ctdb_db->db_path));
+               return -1;
+       }
+
+       ret = rename(ctdb_db->db_path, new_path);
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,(__location__
+                                 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
+                                 ctdb_db->db_path, new_path,
+                                 errno, strerror(errno)));
+               talloc_free(new_path);
+               return -1;
+       }
+
+       DEBUG(DEBUG_CRIT,(__location__
+                        ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
+                        ctdb_db->db_path, new_path));
+       talloc_free(new_path);
+       return 0;
+}
+
+int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
+{
+       struct ctdb_db_context *ctdb_db;
+       int ret;
+       int ok = 0;
+       int fail = 0;
+
+       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+               if (!ctdb_db->persistent) {
+                       continue;
+               }
+
+               ret = ctdb_load_persistent_health(ctdb, ctdb_db);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ALERT,(__location__
+                                          " load persistent health for '%s' failed\n",
+                                          ctdb_db->db_path));
+                       return -1;
+               }
+
+               if (ctdb_db->unhealthy_reason == NULL) {
+                       ok++;
+                       DEBUG(DEBUG_INFO,(__location__
+                                  " persistent db '%s' healthy\n",
+                                  ctdb_db->db_path));
+                       continue;
+               }
+
+               fail++;
+               DEBUG(DEBUG_ALERT,(__location__
+                                  " persistent db '%s' unhealthy: %s\n",
+                                  ctdb_db->db_path,
+                                  ctdb_db->unhealthy_reason));
+       }
+       DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
+             ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
+              ok, fail));
+
+       if (fail != 0) {
+               return -1;
+       }
+
+       return 0;
+}
+
+
+/*
+  mark a database - as healthy
+ */
+int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+       uint32_t db_id = *(uint32_t *)indata.dptr;
+       struct ctdb_db_context *ctdb_db;
+       int ret;
+       bool may_recover = false;
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (!ctdb_db) {
+               DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
+               return -1;
+       }
+
+       if (ctdb_db->unhealthy_reason) {
+               may_recover = true;
+       }
+
+       ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__
+                                " ctdb_update_persistent_health(%s) failed\n",
+                                ctdb_db->db_name));
+               return -1;
+       }
+
+       if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
+               DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
+                                 ctdb_db->db_name));
+               ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+       }
+
+       return 0;
+}
+
+int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
+                                  TDB_DATA indata,
+                                  TDB_DATA *outdata)
+{
+       uint32_t db_id = *(uint32_t *)indata.dptr;
+       struct ctdb_db_context *ctdb_db;
+       int ret;
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (!ctdb_db) {
+               DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
+               return -1;
+       }
+
+       ret = ctdb_load_persistent_health(ctdb, ctdb_db);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__
+                                " ctdb_load_persistent_health(%s) failed\n",
+                                ctdb_db->db_name));
+               return -1;
+       }
+
+       *outdata = tdb_null;
+       if (ctdb_db->unhealthy_reason) {
+               outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
+               outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
+       }
+
+       return 0;
+}
+
+
+int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+       char *ropath;
+
+       if (ctdb_db->readonly) {
+               return 0;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
+               return -1;
+       }
+
+       ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
+       if (ropath == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
+               return -1;
+       }
+       ctdb_db->rottdb = tdb_open(ropath, 
+                             ctdb->tunable.database_hash_size, 
+                             TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
+                             O_CREAT|O_RDWR, 0);
+       if (ctdb_db->rottdb == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
+               talloc_free(ropath);
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
+
+       ctdb_db->readonly = true;
+       talloc_free(ropath);
+       return 0;
+}
 
 /*
   attach to a database, handling both persistent and non-persistent databases
   return 0 on success, -1 on failure
  */
-static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
+static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
+                            bool persistent, const char *unhealthy_reason,
+                            bool jenkinshash)
 {
        struct ctdb_db_context *ctdb_db, *tmp_db;
        int ret;
        struct TDB_DATA key;
        unsigned tdb_flags;
+       int mode = 0600;
+       int remaining_tries = 0;
 
        ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
        CTDB_NO_MEMORY(ctdb, ctdb_db);
@@ -212,6 +760,15 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, boo
        ctdb_db->db_id = ctdb_hash(&key);
        ctdb_db->persistent = persistent;
 
+       if (!ctdb_db->persistent) {
+               ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
+               if (ctdb_db->delete_queue == NULL) {
+                       CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
+               }
+
+               ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
+       }
+
        /* check for hash collisions */
        for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
                if (tmp_db->db_id == ctdb_db->db_id) {
@@ -222,23 +779,45 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, boo
                }
        }
 
-       if (ctdb->db_directory == NULL) {
-               ctdb->db_directory = VARDIR "/ctdb";
+       if (persistent) {
+               if (unhealthy_reason) {
+                       ret = ctdb_update_persistent_health(ctdb, ctdb_db,
+                                                           unhealthy_reason, 0);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
+                                                  ctdb_db->db_name, unhealthy_reason, ret));
+                               talloc_free(ctdb_db);
+                               return -1;
+                       }
+               }
+
+               if (ctdb->max_persistent_check_errors > 0) {
+                       remaining_tries = 1;
+               }
+               if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
+                       remaining_tries = 0;
+               }
+
+               ret = ctdb_load_persistent_health(ctdb, ctdb_db);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
+                                  ctdb_db->db_name, ret));
+                       talloc_free(ctdb_db);
+                       return -1;
+               }
        }
 
-       /* make sure the db directory exists */
-       if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n", 
-                        ctdb->db_directory));
+       if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
+               DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
+                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
                talloc_free(ctdb_db);
                return -1;
        }
 
-       if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n", 
-                        ctdb->db_directory_persistent));
-               talloc_free(ctdb_db);
-               return -1;
+       if (ctdb_db->unhealthy_reason) {
+               /* this is just a warning, but we want that in the log file! */
+               DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
+                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
        }
 
        /* open the database */
@@ -247,22 +826,124 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, boo
                                           db_name, ctdb->pnn);
 
        tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
-       if (!ctdb->do_setsched) {
+       if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
+       tdb_flags |= TDB_DISALLOW_NESTING;
+       if (jenkinshash) {
+               tdb_flags |= TDB_INCOMPATIBLE_HASH;
+       }
 
+again:
        ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
                                      ctdb->tunable.database_hash_size, 
                                      tdb_flags, 
-                                     O_CREAT|O_RDWR, 0666);
+                                     O_CREAT|O_RDWR, mode);
        if (ctdb_db->ltdb == NULL) {
-               DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
-               talloc_free(ctdb_db);
-               return -1;
+               struct stat st;
+               int saved_errno = errno;
+
+               if (!persistent) {
+                       DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
+                                         ctdb_db->db_path,
+                                         saved_errno,
+                                         strerror(saved_errno)));
+                       talloc_free(ctdb_db);
+                       return -1;
+               }
+
+               if (remaining_tries == 0) {
+                       DEBUG(DEBUG_CRIT,(__location__
+                                         "Failed to open persistent tdb '%s': %d - %s\n",
+                                         ctdb_db->db_path,
+                                         saved_errno,
+                                         strerror(saved_errno)));
+                       talloc_free(ctdb_db);
+                       return -1;
+               }
+
+               ret = stat(ctdb_db->db_path, &st);
+               if (ret != 0) {
+                       DEBUG(DEBUG_CRIT,(__location__
+                                         "Failed to open persistent tdb '%s': %d - %s\n",
+                                         ctdb_db->db_path,
+                                         saved_errno,
+                                         strerror(saved_errno)));
+                       talloc_free(ctdb_db);
+                       return -1;
+               }
+
+               ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
+               if (ret != 0) {
+                       DEBUG(DEBUG_CRIT,(__location__
+                                         "Failed to open persistent tdb '%s': %d - %s\n",
+                                         ctdb_db->db_path,
+                                         saved_errno,
+                                         strerror(saved_errno)));
+                       talloc_free(ctdb_db);
+                       return -1;
+               }
+
+               remaining_tries--;
+               mode = st.st_mode;
+               goto again;
        }
 
        if (!persistent) {
                ctdb_check_db_empty(ctdb_db);
+       } else {
+               ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
+               if (ret != 0) {
+                       int fd;
+                       struct stat st;
+
+                       DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
+                                         ctdb_db->db_path, ret,
+                                         tdb_errorstr(ctdb_db->ltdb->tdb)));
+                       if (remaining_tries == 0) {
+                               talloc_free(ctdb_db);
+                               return -1;
+                       }
+
+                       fd = tdb_fd(ctdb_db->ltdb->tdb);
+                       ret = fstat(fd, &st);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_CRIT,(__location__
+                                                 "Failed to fstat() persistent tdb '%s': %d - %s\n",
+                                                 ctdb_db->db_path,
+                                                 errno,
+                                                 strerror(errno)));
+                               talloc_free(ctdb_db);
+                               return -1;
+                       }
+
+                       /* close the TDB */
+                       talloc_free(ctdb_db->ltdb);
+                       ctdb_db->ltdb = NULL;
+
+                       ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
+                                                 ctdb_db->db_path));
+                               talloc_free(ctdb_db);
+                               return -1;
+                       }
+
+                       remaining_tries--;
+                       mode = st.st_mode;
+                       goto again;
+               }
+       }
+
+       /* set up a rb tree we can use to track which records we have a 
+          fetch-lock in-flight for so we can defer any additional calls
+          for the same record.
+        */
+       ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+       if (ctdb_db->deferred_fetch == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+               talloc_free(ctdb_db);
+               return -1;
        }
 
        DLIST_ADD(ctdb->db_list, ctdb_db);
@@ -292,9 +973,21 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, boo
                return -1;
        }
 
+       /* 
+          all databases support the "fetch_with_header" function. we need this
+          for efficient readonly record fetches
+       */
+       ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        ret = ctdb_vacuum_init(ctdb_db);
        if (ret != 0) {
-               DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for database '%s'\n", ctdb_db->db_name));
+               DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
+                                 "database '%s'\n", ctdb_db->db_name));
                talloc_free(ctdb_db);
                return -1;
        }
@@ -307,43 +1000,137 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, boo
 }
 
 
+struct ctdb_deferred_attach_context {
+       struct ctdb_deferred_attach_context *next, *prev;
+       struct ctdb_context *ctdb;
+       struct ctdb_req_control *c;
+};
+
+
+static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
+{
+       DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
+
+       return 0;
+}
+
+static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
+       struct ctdb_context *ctdb = da_ctx->ctdb;
+
+       ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
+       talloc_free(da_ctx);
+}
+
+static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
+       struct ctdb_context *ctdb = da_ctx->ctdb;
+
+       /* This talloc-steals the packet ->c */
+       ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
+       talloc_free(da_ctx);
+}
+
+int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
+{
+       struct ctdb_deferred_attach_context *da_ctx;
+
+       /* call it from the main event loop as soon as the current event 
+          finishes.
+        */
+       while ((da_ctx = ctdb->deferred_attach) != NULL) {
+               DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
+               event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+       }
+
+       return 0;
+}
+
 /*
   a client has asked to attach a new database
  */
 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                               TDB_DATA *outdata, uint64_t tdb_flags, 
-                              bool persistent)
+                              bool persistent, uint32_t client_id,
+                              struct ctdb_req_control *c,
+                              bool *async_reply)
 {
        const char *db_name = (const char *)indata.dptr;
        struct ctdb_db_context *db;
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
+       struct ctdb_client *client = NULL;
+
+       if (ctdb->tunable.allow_client_db_attach == 0) {
+               DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
+                                 "AllowClientDBAccess == 0\n", db_name));
+               return -1;
+       }
+
+       /* dont allow any local clients to attach while we are in recovery mode
+        * except for the recovery daemon.
+        * allow all attach from the network since these are always from remote
+        * recovery daemons.
+        */
+       if (client_id != 0) {
+               client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+       }
+       if (client != NULL) {
+               /* If the node is inactive it is not part of the cluster
+                  and we should not allow clients to attach to any
+                  databases
+               */
+               if (node->flags & NODE_FLAGS_INACTIVE) {
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
+                       return -1;
+               }
+
+               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
+                   client->pid != ctdb->recoverd_pid &&
+                   ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
+                       struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
+
+                       if (da_ctx == NULL) {
+                               DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
+                               return -1;
+                       }
+
+                       da_ctx->ctdb = ctdb;
+                       da_ctx->c = talloc_steal(da_ctx, c);
+                       talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
+                       DLIST_ADD(ctdb->deferred_attach, da_ctx);
+
+                       event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
+
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
+                       *async_reply = true;
+                       return 0;
+               }
+       }
 
        /* the client can optionally pass additional tdb flags, but we
           only allow a subset of those on the database in ctdb. Note
           that tdb_flags is passed in via the (otherwise unused)
           srvid to the attach control */
-       tdb_flags &= TDB_NOSYNC;
-
-       /* If the node is inactive it is not part of the cluster
-          and we should not allow clients to attach to any
-          databases
-       */
-       if (node->flags & NODE_FLAGS_INACTIVE) {
-               DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
-               return -1;
-       }
-
+       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
 
        /* see if we already have this name */
        db = ctdb_db_handle(ctdb, db_name);
        if (db) {
+               if (db->persistent != persistent) {
+                       DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
+                                         "database %s\n", persistent ? "" : "non-",
+                                         db-> persistent ? "" : "non-", db_name));
+                       return -1;
+               }
                outdata->dptr  = (uint8_t *)&db->db_id;
                outdata->dsize = sizeof(db->db_id);
                tdb_add_flags(db->ltdb->tdb, tdb_flags);
                return 0;
        }
 
-       if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
+       if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
                return -1;
        }
 
@@ -359,8 +1146,11 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        outdata->dptr  = (uint8_t *)&db->db_id;
        outdata->dsize = sizeof(db->db_id);
 
+       /* Try to ensure it's locked in mem */
+       ctdb_lockdown_memory(ctdb);
+
        /* tell all the other nodes about this database */
-       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
                                 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
                                                CTDB_CONTROL_DB_ATTACH,
                                 0, CTDB_CTRL_FLAG_NOREPLY,
@@ -374,7 +1164,8 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
 /*
   attach to all existing persistent databases
  */
-int ctdb_attach_persistent(struct ctdb_context *ctdb)
+static int ctdb_attach_persistent(struct ctdb_context *ctdb,
+                                 const char *unhealthy_reason)
 {
        DIR *d;
        struct dirent *de;
@@ -386,38 +1177,43 @@ int ctdb_attach_persistent(struct ctdb_context *ctdb)
        }
 
        while ((de=readdir(d))) {
-               char *p, *s;
+               char *p, *s, *q;
                size_t len = strlen(de->d_name);
                uint32_t node;
+               int invalid_name = 0;
                
                s = talloc_strdup(ctdb, de->d_name);
                CTDB_NO_MEMORY(ctdb, s);
 
-               /* ignore names ending in .bak */
-               p = strstr(s, ".bak");
-               if (p != NULL) {
-                       continue;
-               }
-
                /* only accept names ending in .tdb */
                p = strstr(s, ".tdb.");
                if (len < 7 || p == NULL) {
                        talloc_free(s);
                        continue;
                }
-               if (sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
+
+               /* only accept names ending with .tdb. and any number of digits */
+               q = p+5;
+               while (*q != 0 && invalid_name == 0) {
+                       if (!isdigit(*q++)) {
+                               invalid_name = 1;
+                       }
+               }
+               if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
+                       DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
                        talloc_free(s);
                        continue;
                }
                p[4] = 0;
 
-               if (ctdb_local_attach(ctdb, s, true) != 0) {
+               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
                        closedir(d);
                        talloc_free(s);
                        return -1;
                }
-               DEBUG(DEBUG_NOTICE,("Attached to persistent database %s\n", s));
+
+               DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
 
                talloc_free(s);
        }
@@ -425,6 +1221,158 @@ int ctdb_attach_persistent(struct ctdb_context *ctdb)
        return 0;
 }
 
+int ctdb_attach_databases(struct ctdb_context *ctdb)
+{
+       int ret;
+       char *persistent_health_path = NULL;
+       char *unhealthy_reason = NULL;
+       bool first_try = true;
+
+       if (ctdb->db_directory == NULL) {
+               ctdb->db_directory = VARDIR "/ctdb";
+       }
+       if (ctdb->db_directory_persistent == NULL) {
+               ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
+       }
+       if (ctdb->db_directory_state == NULL) {
+               ctdb->db_directory_state = VARDIR "/ctdb/state";
+       }
+
+       /* make sure the db directory exists */
+       ret = mkdir(ctdb->db_directory, 0700);
+       if (ret == -1 && errno != EEXIST) {
+               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
+                        ctdb->db_directory));
+               return -1;
+       }
+
+       /* make sure the persistent db directory exists */
+       ret = mkdir(ctdb->db_directory_persistent, 0700);
+       if (ret == -1 && errno != EEXIST) {
+               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
+                        ctdb->db_directory_persistent));
+               return -1;
+       }
+
+       /* make sure the internal state db directory exists */
+       ret = mkdir(ctdb->db_directory_state, 0700);
+       if (ret == -1 && errno != EEXIST) {
+               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
+                        ctdb->db_directory_state));
+               return -1;
+       }
+
+       persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
+                                                ctdb->db_directory_state,
+                                                PERSISTENT_HEALTH_TDB,
+                                                ctdb->pnn);
+       if (persistent_health_path == NULL) {
+               DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
+               return -1;
+       }
+
+again:
+
+       ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
+                                                  0, TDB_DISALLOW_NESTING,
+                                                  O_CREAT | O_RDWR, 0600);
+       if (ctdb->db_persistent_health == NULL) {
+               struct tdb_wrap *tdb;
+
+               if (!first_try) {
+                       DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
+                                         persistent_health_path,
+                                         errno,
+                                         strerror(errno)));
+                       talloc_free(persistent_health_path);
+                       talloc_free(unhealthy_reason);
+                       return -1;
+               }
+               first_try = false;
+
+               unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
+                                                  persistent_health_path,
+                                                  "was cleared after a failure",
+                                                  "manual verification needed");
+               if (unhealthy_reason == NULL) {
+                       DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
+                       talloc_free(persistent_health_path);
+                       return -1;
+               }
+
+               DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
+                                 persistent_health_path));
+               tdb = tdb_wrap_open(ctdb, persistent_health_path,
+                                   0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
+                                   O_CREAT | O_RDWR, 0600);
+               if (tdb) {
+                       DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
+                                         persistent_health_path,
+                                         errno,
+                                         strerror(errno)));
+                       talloc_free(persistent_health_path);
+                       talloc_free(unhealthy_reason);
+                       return -1;
+               }
+
+               talloc_free(tdb);
+               goto again;
+       }
+       ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
+       if (ret != 0) {
+               struct tdb_wrap *tdb;
+
+               talloc_free(ctdb->db_persistent_health);
+               ctdb->db_persistent_health = NULL;
+
+               if (!first_try) {
+                       DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
+                                         persistent_health_path));
+                       talloc_free(persistent_health_path);
+                       talloc_free(unhealthy_reason);
+                       return -1;
+               }
+               first_try = false;
+
+               unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
+                                                  persistent_health_path,
+                                                  "was cleared after a failure",
+                                                  "manual verification needed");
+               if (unhealthy_reason == NULL) {
+                       DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
+                       talloc_free(persistent_health_path);
+                       return -1;
+               }
+
+               DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
+                                 persistent_health_path));
+               tdb = tdb_wrap_open(ctdb, persistent_health_path,
+                                   0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
+                                   O_CREAT | O_RDWR, 0600);
+               if (tdb) {
+                       DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
+                                         persistent_health_path,
+                                         errno,
+                                         strerror(errno)));
+                       talloc_free(persistent_health_path);
+                       talloc_free(unhealthy_reason);
+                       return -1;
+               }
+
+               talloc_free(tdb);
+               goto again;
+       }
+       talloc_free(persistent_health_path);
+
+       ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
+       talloc_free(unhealthy_reason);
+       if (ret != 0) {
+               return ret;
+       }
+
+       return 0;
+}
+
 /*
   called when a broadcast seqnum update comes in
  */
@@ -442,6 +1390,12 @@ int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint3
                return -1;
        }
 
+       if (ctdb_db->unhealthy_reason) {
+               DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
+                                ctdb_db->db_name, ctdb_db->unhealthy_reason));
+               return -1;
+       }
+
        tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
        ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
        return 0;
@@ -468,7 +1422,7 @@ static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event
        ctdb_db->seqnum = new_seqnum;
 
        /* setup a new timer */
-       ctdb_db->te = 
+       ctdb_db->seqnum_update =
                event_add_timed(ctdb->ev, ctdb_db, 
                                timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
                                ctdb_ltdb_seqnum_check, ctdb_db);
@@ -486,8 +1440,8 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
                return -1;
        }
 
-       if (ctdb_db->te == NULL) {
-               ctdb_db->te = 
+       if (ctdb_db->seqnum_update == NULL) {
+               ctdb_db->seqnum_update =
                        event_add_timed(ctdb->ev, ctdb_db, 
                                        timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
                                        ctdb_ltdb_seqnum_check, ctdb_db);
@@ -506,7 +1460,12 @@ int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
        ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
        if (!ctdb_db) {
                DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
-               return -1;
+               return 0;
+       }
+
+       if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
+               DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
+               return 0;
        }
 
        ctdb_db->priority = db_prio->priority;
@@ -516,3 +1475,75 @@ int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
 }
 
 
+int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+
+       DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
+
+       if (ctdb_db->sticky) {
+               return 0;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+               return -1;
+       }
+
+       ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
+
+       ctdb_db->sticky = true;
+
+       return 0;
+}
+
+int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
+                               uint32_t db_id,
+                               TDB_DATA *outdata)
+{
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_db_statistics_wire *stats;
+       int i;
+       int len;
+       char *ptr;
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (!ctdb_db) {
+               DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
+               return -1;
+       }
+
+       len = offsetof(struct ctdb_db_statistics_wire, hot_keys);
+       for (i = 0; i < MAX_HOT_KEYS; i++) {
+               len += 8 + ctdb_db->statistics.hot_keys[i].key.dsize;
+       }
+
+       stats = talloc_size(outdata, len);
+       if (stats == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate db statistics wire structure\n"));
+               return -1;
+       }
+
+       stats->db_ro_delegations = ctdb_db->statistics.db_ro_delegations;
+       stats->db_ro_revokes     = ctdb_db->statistics.db_ro_revokes;
+       for (i = 0; i < MAX_COUNT_BUCKETS; i++) {
+               stats->hop_count_bucket[i] = ctdb_db->statistics.hop_count_bucket[i];
+       }
+       stats->num_hot_keys = MAX_HOT_KEYS;
+
+       ptr = &stats->hot_keys[0];
+       for (i = 0; i < MAX_HOT_KEYS; i++) {
+               *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].count;
+               ptr += 4;
+
+               *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].key.dsize;
+               ptr += 4;
+
+               memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr, ctdb_db->statistics.hot_keys[i].key.dsize);
+               ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
+       }
+
+       outdata->dptr  = (uint8_t *)stats;
+       outdata->dsize = len;
+
+       return 0;
+}