dsdb: Teach the Samba partition module how to lock all the DB backends
authorAndrew Bartlett <abartlet@samba.org>
Mon, 26 Jun 2017 02:13:41 +0000 (14:13 +1200)
committerStefan Metzmacher <metze@samba.org>
Sun, 2 Jul 2017 15:35:20 +0000 (17:35 +0200)
The metadata partition (sam.ldb) lock is not
enough to block another process in prepare_commit(),
because prepare_commit() is a no-op, if nothing
was changed in the specific backend.

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
selftest/knownfail.d/ldb-locking [deleted file]
source4/dsdb/samdb/ldb_modules/partition.c

diff --git a/selftest/knownfail.d/ldb-locking b/selftest/knownfail.d/ldb-locking
deleted file mode 100644 (file)
index 532bf82..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock1\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock1_config\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock2\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock2_config\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_db_lock2\(ad_dc_ntvfs:local\)
index 08a959cabb5b751cf53f1774b8a22ae47418af37..c304efa645df28f2c741d9397f40f9389fea8e04 100644 (file)
@@ -814,7 +814,7 @@ static int partition_start_trans(struct ldb_module *module)
                ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
        }
 
                ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
        }
 
-       /* This order must match that in prepare_commit() */
+       /* This order must match that in prepare_commit() and read_lock() */
        ret = ldb_next_start_trans(module);
        if (ret != LDB_SUCCESS) {
                return ret;
        ret = ldb_next_start_trans(module);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -1144,6 +1144,178 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque
        return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
 }
 
        return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
 }
 
+/* lock all the backends */
+static int partition_read_lock(struct ldb_module *module)
+{
+       int i;
+       int ret;
+       int ret2;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct partition_private_data *data = \
+               talloc_get_type(ldb_module_get_private(module),
+                               struct partition_private_data);
+
+       if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
+               ldb_debug(ldb, LDB_DEBUG_TRACE,
+                         "partition_read_lock() -> (metadata partition)");
+       }
+
+       /*
+        * It is important to only do this for LOCK because:
+        * - we don't want to unlock what we did not lock
+        *
+        * - we don't want to make a new lock on the sam.ldb
+        *   (triggered inside this routine due to the seq num check)
+        *   during an unlock phase as that will violate the lock
+        *   ordering
+        */
+
+       /*
+        * This will lock the metadata partition (sam.ldb) and
+        * will also call event loops, so we do it before we
+        * get the whole db lock.
+        */
+       ret = partition_reload_if_required(module, data, NULL);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * This order must match that in prepare_commit(), start with
+        * the metadata partition (sam.ldb) lock
+        */
+       ret = ldb_next_read_lock(module);
+       if (ret != LDB_SUCCESS) {
+               ldb_debug_set(ldb,
+                             LDB_DEBUG_FATAL,
+                             "Failed to lock db: %s / %s for metadata partition",
+                             ldb_errstring(ldb),
+                             ldb_strerror(ret));
+
+               return ret;
+       }
+
+       /*
+        * The metadata partition (sam.ldb) lock is not
+        * enough to block another process in prepare_commit(),
+        * because prepare_commit() is a no-op, if nothing
+        * was changed in the specific backend.
+        *
+        * That means the following per partition locks are required.
+        */
+       for (i=0; data && data->partitions && data->partitions[i]; i++) {
+               if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
+                       ldb_debug(ldb, LDB_DEBUG_TRACE,
+                                 "partition_read_lock() -> %s",
+                                 ldb_dn_get_linearized(
+                                         data->partitions[i]->ctrl->dn));
+               }
+               ret = ldb_next_read_lock(data->partitions[i]->module);
+               if (ret == LDB_SUCCESS) {
+                       continue;
+               }
+
+               ldb_debug_set(ldb,
+                             LDB_DEBUG_FATAL,
+                             "Failed to lock db: %s / %s for %s",
+                             ldb_errstring(ldb),
+                             ldb_strerror(ret),
+                             ldb_dn_get_linearized(
+                                     data->partitions[i]->ctrl->dn));
+
+               /* Back it out, if it fails on one */
+               for (i--; i >= 0; i--) {
+                       ret2 = ldb_next_read_unlock(data->partitions[i]->module);
+                       if (ret2 != LDB_SUCCESS) {
+                               ldb_debug(ldb,
+                                         LDB_DEBUG_FATAL,
+                                         "Failed to unlock db: %s / %s",
+                                         ldb_errstring(ldb),
+                                         ldb_strerror(ret2));
+                       }
+               }
+               ret2 = ldb_next_read_unlock(module);
+               if (ret2 != LDB_SUCCESS) {
+                       ldb_debug(ldb,
+                                 LDB_DEBUG_FATAL,
+                                 "Failed to unlock db: %s / %s",
+                                 ldb_errstring(ldb),
+                                 ldb_strerror(ret2));
+               }
+               return ret;
+       }
+
+       return LDB_SUCCESS;
+}
+
+/* unlock all the backends */
+static int partition_read_unlock(struct ldb_module *module)
+{
+       int i;
+       int ret = LDB_SUCCESS;
+       int ret2;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct partition_private_data *data = \
+               talloc_get_type(ldb_module_get_private(module),
+                               struct partition_private_data);
+
+       /*
+        * This order must be similar to partition_{end,del}_trans()
+        * the metadata partition (sam.ldb) unlock must be at the end.
+        */
+
+       for (i=0; data && data->partitions && data->partitions[i]; i++) {
+               if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
+                       ldb_debug(ldb, LDB_DEBUG_TRACE,
+                                 "partition_read_unlock() -> %s",
+                                 ldb_dn_get_linearized(
+                                         data->partitions[i]->ctrl->dn));
+               }
+               ret2 = ldb_next_read_unlock(data->partitions[i]->module);
+               if (ret2 != LDB_SUCCESS) {
+                       ldb_debug_set(ldb,
+                                     LDB_DEBUG_FATAL,
+                                     "Failed to lock db: %s / %s for %s",
+                                     ldb_errstring(ldb),
+                                     ldb_strerror(ret),
+                                     ldb_dn_get_linearized(
+                                             data->partitions[i]->ctrl->dn));
+
+                       /*
+                        * Don't overwrite the original failure code
+                        * if there was one
+                        */
+                       if (ret == LDB_SUCCESS) {
+                               ret = ret2;
+                       }
+               }
+       }
+
+       if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
+               ldb_debug(ldb, LDB_DEBUG_TRACE,
+                         "partition_read_unlock() -> (metadata partition)");
+       }
+
+       ret2 = ldb_next_read_unlock(module);
+       if (ret2 != LDB_SUCCESS) {
+               ldb_debug_set(ldb,
+                             LDB_DEBUG_FATAL,
+                             "Failed to unlock db: %s / %s for metadata partition",
+                             ldb_errstring(ldb),
+                             ldb_strerror(ret2));
+
+               /*
+                * Don't overwrite the original failure code
+                * if there was one
+                */
+               if (ret == LDB_SUCCESS) {
+                       ret = ret2;
+               }
+       }
+
+       return ret;
+}
+
 /* extended */
 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
 {
 /* extended */
 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
 {
@@ -1198,6 +1370,8 @@ static const struct ldb_module_ops ldb_partition_module_ops = {
        .prepare_commit    = partition_prepare_commit,
        .end_transaction   = partition_end_trans,
        .del_transaction   = partition_del_trans,
        .prepare_commit    = partition_prepare_commit,
        .end_transaction   = partition_end_trans,
        .del_transaction   = partition_del_trans,
+       .read_lock         = partition_read_lock,
+       .read_unlock       = partition_read_unlock
 };
 
 int ldb_partition_module_init(const char *version)
 };
 
 int ldb_partition_module_init(const char *version)