ctdbd: Replace ctdb->done_startup with ctdb->runstate
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
index 48b085277724c2db5bb40fe9c2e205d206a8cebf..0426d96bdbc584284d500c94996ba4f5d7efb2bc 100644 (file)
@@ -18,7 +18,6 @@
 */
 
 #include "includes.h"
-#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
 
 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
 
-/*
-  this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
-{
-       return 0;
-}
-
-/*
-  this is a plain fetch procedure that all databases support
-*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
-{
-       call->reply_data = &call->record_data;
-       return 0;
-}
-
-
 /**
  * write a record to a normal database
  *
  * This is the server-variant of the ctdb_ltdb_store function.
  * It contains logic to determine whether a record should be
- * stored or deleted.
+ * stored or deleted. It also sends SCHEDULE_FOR_DELETION
+ * controls to the local ctdb daemon if apporpriate.
  */
 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                                  TDB_DATA key,
@@ -67,6 +49,8 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
        int ret;
        bool seqnum_suppressed = false;
        bool keep = false;
+       bool schedule_for_deletion = false;
+       bool remove_from_delete_queue = false;
        uint32_t lmaster;
 
        if (ctdb->flags & CTDB_FLAG_TORTURE) {
@@ -98,8 +82,21 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
         */
        if (data.dsize != 0) {
                keep = true;
+       } else if (header->flags & CTDB_REC_RO_FLAGS) {
+               keep = true;
        } else if (ctdb_db->persistent) {
                keep = true;
+       } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
+               /*
+                * The record is not created by the client but
+                * automatically by the ctdb_ltdb_fetch logic that
+                * creates a record with an initial header in the
+                * ltdb before trying to migrate the record from
+                * the current lmaster. Keep it instead of trying
+                * to delete the non-existing record...
+                */
+               keep = true;
+               schedule_for_deletion = true;
        } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
                keep = true;
        } else if (ctdb_db->ctdb->pnn == lmaster) {
@@ -127,6 +124,20 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                keep = true;
        }
 
+       if (keep) {
+               if (!ctdb_db->persistent &&
+                   (ctdb_db->ctdb->pnn == header->dmaster) &&
+                   !(header->flags & CTDB_REC_RO_FLAGS))
+               {
+                       header->rsn++;
+
+                       if (data.dsize == 0) {
+                               schedule_for_deletion = true;
+                       }
+               }
+               remove_from_delete_queue = !schedule_for_deletion;
+       }
+
 store:
        /*
         * The VACUUM_MIGRATED flag is only set temporarily for
@@ -147,6 +158,14 @@ store:
         */
        header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
 
+       /*
+        * Similarly, clear the AUTOMATIC flag which should not enter
+        * the local database copy since this would require client
+        * modifications to clear the flag when the client stores
+        * the record.
+        */
+       header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
+
        rec.dsize = sizeof(*header) + data.dsize;
        rec.dptr = talloc_size(ctdb, rec.dsize);
        CTDB_NO_MEMORY(ctdb, rec.dptr);
@@ -195,6 +214,9 @@ store:
                            ctdb_db->db_name,
                            keep?"store":"delete", ret,
                            tdb_errorstr(ctdb_db->ltdb->tdb)));
+
+               schedule_for_deletion = false;
+               remove_from_delete_queue = false;
        }
        if (seqnum_suppressed) {
                tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
@@ -202,6 +224,18 @@ store:
 
        talloc_free(rec.dptr);
 
+       if (schedule_for_deletion) {
+               int ret2;
+               ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
+               if (ret2 != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
+               }
+       }
+
+       if (remove_from_delete_queue) {
+               ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
+       }
+
        return ret;
 }
 
@@ -217,7 +251,7 @@ struct lock_fetch_state {
 /*
   called when we should retry the operation
  */
-static void lock_fetch_callback(void *p)
+static void lock_fetch_callback(void *p, bool locked)
 {
        struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
        if (!state->ignore_generation &&
@@ -240,9 +274,9 @@ static void lock_fetch_callback(void *p)
    1) tries to get the chainlock. If it succeeds, then it returns 0
 
    2) if it fails to get a chainlock immediately then it sets up a
-   non-blocking chainlock via ctdb_lockwait, and when it gets the
+   non-blocking chainlock via ctdb_lock_record, and when it gets the
    chainlock it re-submits this ctdb request to the main packet
-   receive function
+   receive function.
 
    This effectively queues all ctdb requests that cannot be
    immediately satisfied until it can get the lock. This means that
@@ -262,7 +296,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
 {
        int ret;
        struct tdb_context *tdb = ctdb_db->ltdb->tdb;
-       struct lockwait_handle *h;
+       struct lock_request *lreq;
        struct lock_fetch_state *state;
        
        ret = tdb_chainlock_nonblock(tdb, key);
@@ -294,15 +328,14 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
        state->ignore_generation = ignore_generation;
 
        /* now the contended path */
-       h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
-       if (h == NULL) {
+       lreq = ctdb_lock_record(ctdb_db, key, true, lock_fetch_callback, state);
+       if (lreq == NULL) {
                return -1;
        }
 
        /* we need to move the packet off the temporary context in ctdb_input_pkt(),
           so it won't be freed yet */
        talloc_steal(state, hdr);
-       talloc_steal(state, h);
 
        /* now tell the caller than we will retry asynchronously */
        return -2;
@@ -623,7 +656,7 @@ int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
                return -1;
        }
 
-       if (may_recover && !ctdb->done_startup) {
+       if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
                DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
                                  ctdb_db->db_name));
                ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
@@ -663,6 +696,42 @@ int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
        return 0;
 }
 
+
+int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+       char *ropath;
+
+       if (ctdb_db->readonly) {
+               return 0;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
+               return -1;
+       }
+
+       ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
+       if (ropath == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
+               return -1;
+       }
+       ctdb_db->rottdb = tdb_open(ropath, 
+                             ctdb->tunable.database_hash_size, 
+                             TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
+                             O_CREAT|O_RDWR, 0);
+       if (ctdb_db->rottdb == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
+               talloc_free(ropath);
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
+
+       ctdb_db->readonly = true;
+       talloc_free(ropath);
+       return 0;
+}
+
 /*
   attach to a database, handling both persistent and non-persistent databases
   return 0 on success, -1 on failure
@@ -725,7 +794,7 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                if (ctdb->max_persistent_check_errors > 0) {
                        remaining_tries = 1;
                }
-               if (ctdb->done_startup) {
+               if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
                        remaining_tries = 0;
                }
 
@@ -866,6 +935,17 @@ again:
                }
        }
 
+       /* set up a rb tree we can use to track which records we have a 
+          fetch-lock in-flight for so we can defer any additional calls
+          for the same record.
+        */
+       ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+       if (ctdb_db->deferred_fetch == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        DLIST_ADD(ctdb->db_list, ctdb_db);
 
        /* setting this can help some high churn databases */
@@ -893,6 +973,17 @@ again:
                return -1;
        }
 
+       /* 
+          all databases support the "fetch_with_header" function. we need this
+          for efficient readonly record fetches
+       */
+       ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        ret = ctdb_vacuum_init(ctdb_db);
        if (ret != 0) {
                DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
@@ -951,7 +1042,7 @@ int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
         */
        while ((da_ctx = ctdb->deferred_attach) != NULL) {
                DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
-               event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+               event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
        }
 
        return 0;
@@ -971,6 +1062,12 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
        struct ctdb_client *client = NULL;
 
+       if (ctdb->tunable.allow_client_db_attach == 0) {
+               DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
+                                 "AllowClientDBAccess == 0\n", db_name));
+               return -1;
+       }
+
        /* dont allow any local clients to attach while we are in recovery mode
         * except for the recovery daemon.
         * allow all attach from the network since these are always from remote
@@ -985,12 +1082,13 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                   databases
                */
                if (node->flags & NODE_FLAGS_INACTIVE) {
-                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
                        return -1;
                }
 
-               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
-                && client->pid != ctdb->recoverd_pid) {
+               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
+                   client->pid != ctdb->recoverd_pid &&
+                   ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
                        struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
 
                        if (da_ctx == NULL) {
@@ -1020,6 +1118,12 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        /* see if we already have this name */
        db = ctdb_db_handle(ctdb, db_name);
        if (db) {
+               if (db->persistent != persistent) {
+                       DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
+                                         "database %s\n", persistent ? "" : "non-",
+                                         db-> persistent ? "" : "non-", db_name));
+                       return -1;
+               }
                outdata->dptr  = (uint8_t *)&db->db_id;
                outdata->dsize = sizeof(db->db_id);
                tdb_add_flags(db->ltdb->tdb, tdb_flags);
@@ -1370,3 +1474,76 @@ int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
        return 0;
 }
 
+
+int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+
+       DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
+
+       if (ctdb_db->sticky) {
+               return 0;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+               return -1;
+       }
+
+       ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
+
+       ctdb_db->sticky = true;
+
+       return 0;
+}
+
+int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
+                               uint32_t db_id,
+                               TDB_DATA *outdata)
+{
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_db_statistics_wire *stats;
+       int i;
+       int len;
+       char *ptr;
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (!ctdb_db) {
+               DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
+               return -1;
+       }
+
+       len = offsetof(struct ctdb_db_statistics_wire, hot_keys);
+       for (i = 0; i < MAX_HOT_KEYS; i++) {
+               len += 8 + ctdb_db->statistics.hot_keys[i].key.dsize;
+       }
+
+       stats = talloc_size(outdata, len);
+       if (stats == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate db statistics wire structure\n"));
+               return -1;
+       }
+
+       stats->db_ro_delegations = ctdb_db->statistics.db_ro_delegations;
+       stats->db_ro_revokes     = ctdb_db->statistics.db_ro_revokes;
+       for (i = 0; i < MAX_COUNT_BUCKETS; i++) {
+               stats->hop_count_bucket[i] = ctdb_db->statistics.hop_count_bucket[i];
+       }
+       stats->num_hot_keys = MAX_HOT_KEYS;
+
+       ptr = &stats->hot_keys[0];
+       for (i = 0; i < MAX_HOT_KEYS; i++) {
+               *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].count;
+               ptr += 4;
+
+               *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].key.dsize;
+               ptr += 4;
+
+               memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr, ctdb_db->statistics.hot_keys[i].key.dsize);
+               ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
+       }
+
+       outdata->dptr  = (uint8_t *)stats;
+       outdata->dsize = len;
+
+       return 0;
+}