schema: Do not read different schema sequence values during a read transaction
authorGarming Sam <garming@catalyst.net.nz>
Thu, 1 Feb 2018 23:05:27 +0000 (12:05 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 5 Mar 2018 19:50:14 +0000 (20:50 +0100)
During a read lock, we find ourselves seeing an unchanged schema, but
reading any updates to the metadata.tdb (in the case of lmdb, where
reads do not block writes).

The alternative is to read-lock the entire metadata.tdb, however, this
allows more concurrency by allowing reads not to block writes.

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
source4/dsdb/samdb/ldb_modules/schema_load.c

index 0bbd5fc60f03fa9839b663712f5e52728e1ba82f..f1a01d73f5fb2a8f2620c33295f2e89a42e10626 100644 (file)
 #include "system/filesys.h"
 struct schema_load_private_data {
        struct ldb_module *module;
-       bool in_transaction;
+       uint64_t in_transaction;
+       uint64_t in_read_transaction;
        struct tdb_wrap *metadata;
+       uint64_t schema_seq_num_read_lock;
        uint64_t schema_seq_num_cache;
        int tdb_seqnum;
 };
@@ -193,7 +195,7 @@ static struct dsdb_schema *dsdb_schema_refresh(struct ldb_module *module, struct
                return schema;
        }
 
-       if (private_data->in_transaction) {
+       if (private_data->in_transaction > 0) {
 
                /*
                 * If the refresh is not an expected part of a larger
@@ -221,7 +223,19 @@ static struct dsdb_schema *dsdb_schema_refresh(struct ldb_module *module, struct
         * continue to hit the database to get the highest USN.
         */
 
-       ret = schema_metadata_get_uint64(private_data, DSDB_METADATA_SCHEMA_SEQ_NUM, &schema_seq_num, 0);
+       if (private_data->in_read_transaction > 0) {
+               /*
+                * We must give a static value of the metadata sequence number
+                * during a read lock, otherwise, we will fail to load the
+                * schema at runtime.
+                */
+               schema_seq_num = private_data->schema_seq_num_read_lock;
+               ret = LDB_SUCCESS;
+       } else {
+               ret = schema_metadata_get_uint64(private_data,
+                                                DSDB_METADATA_SCHEMA_SEQ_NUM,
+                                                &schema_seq_num, 0);
+       }
 
        if (schema != NULL) {
                if (ret == LDB_SUCCESS) {
@@ -564,7 +578,7 @@ static int schema_load_start_transaction(struct ldb_module *module)
                              "schema_load_init: dsdb_get_schema failed");
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       private_data->in_transaction = true;
+       private_data->in_transaction++;
 
        return ldb_next_start_trans(module);
 }
@@ -573,8 +587,14 @@ static int schema_load_end_transaction(struct ldb_module *module)
 {
        struct schema_load_private_data *private_data =
                talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
 
-       private_data->in_transaction = false;
+       if (private_data->in_transaction == 0) {
+               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                             "schema_load_end_transaction: transaction mismatch");
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       private_data->in_transaction--;
 
        return ldb_next_end_trans(module);
 }
@@ -583,8 +603,14 @@ static int schema_load_del_transaction(struct ldb_module *module)
 {
        struct schema_load_private_data *private_data =
                talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
 
-       private_data->in_transaction = false;
+       if (private_data->in_transaction == 0) {
+               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                             "schema_load_del_transaction: transaction mismatch");
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       private_data->in_transaction--;
 
        return ldb_next_del_trans(module);
 }
@@ -618,6 +644,52 @@ static int schema_load_extended(struct ldb_module *module, struct ldb_request *r
        return ldb_next_request(module, req);
 }
 
+static int schema_read_lock(struct ldb_module *module)
+{
+       struct schema_load_private_data *private_data =
+               talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+       uint64_t schema_seq_num = 0;
+
+       if (private_data == NULL) {
+               return ldb_next_read_lock(module);
+       }
+
+       if (private_data->in_transaction == 0 &&
+           private_data->in_read_transaction == 0) {
+               /*
+                * This appears to fail during the init path, so do not bother
+                * checking the return, and return 0 (reload schema).
+                */
+               schema_metadata_get_uint64(private_data,
+                                          DSDB_METADATA_SCHEMA_SEQ_NUM,
+                                          &schema_seq_num, 0);
+
+               private_data->schema_seq_num_read_lock = schema_seq_num;
+       }
+       private_data->in_read_transaction++;
+
+       return ldb_next_read_lock(module);
+
+}
+
+static int schema_read_unlock(struct ldb_module *module)
+{
+       struct schema_load_private_data *private_data =
+               talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+
+       if (private_data == NULL) {
+               return ldb_next_read_unlock(module);
+       }
+
+       if (private_data->in_transaction == 0 &&
+           private_data->in_read_transaction == 1) {
+               private_data->schema_seq_num_read_lock = 0;
+       }
+       private_data->in_read_transaction--;
+
+       return ldb_next_read_unlock(module);
+}
+
 
 static const struct ldb_module_ops ldb_schema_load_module_ops = {
        .name           = "schema_load",
@@ -627,6 +699,8 @@ static const struct ldb_module_ops ldb_schema_load_module_ops = {
        .start_transaction = schema_load_start_transaction,
        .end_transaction   = schema_load_end_transaction,
        .del_transaction   = schema_load_del_transaction,
+       .read_lock      = schema_read_lock,
+       .read_unlock    = schema_read_unlock,
 };
 
 int ldb_schema_load_module_init(const char *version)