From d21d83219534782c5d75a707aab23a5d7e91ce56 Mon Sep 17 00:00:00 2001 From: Andrew Bartlett Date: Mon, 26 Jun 2017 14:13:41 +1200 Subject: [PATCH] dsdb: Teach the Samba partition module how to lock all the DB backends The metadata partition (sam.ldb) lock is not enough to block another process in prepare_commit(), because prepare_commit() is a no-op, if nothing was changed in the specific backend. Signed-off-by: Andrew Bartlett Reviewed-by: Douglas Bagnall Reviewed-by: Stefan Metzmacher --- selftest/knownfail.d/ldb-locking | 5 - source4/dsdb/samdb/ldb_modules/partition.c | 176 ++++++++++++++++++++- 2 files changed, 175 insertions(+), 6 deletions(-) delete mode 100644 selftest/knownfail.d/ldb-locking diff --git a/selftest/knownfail.d/ldb-locking b/selftest/knownfail.d/ldb-locking deleted file mode 100644 index 532bf82b0ad..00000000000 --- a/selftest/knownfail.d/ldb-locking +++ /dev/null @@ -1,5 +0,0 @@ -samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock1\(ad_dc_ntvfs:local\) -samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock1_config\(ad_dc_ntvfs:local\) -samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock2\(ad_dc_ntvfs:local\) -samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock2_config\(ad_dc_ntvfs:local\) -samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_db_lock2\(ad_dc_ntvfs:local\) diff --git a/source4/dsdb/samdb/ldb_modules/partition.c b/source4/dsdb/samdb/ldb_modules/partition.c index 08a959cabb5..c304efa645d 100644 --- a/source4/dsdb/samdb/ldb_modules/partition.c +++ b/source4/dsdb/samdb/ldb_modules/partition.c @@ -814,7 +814,7 @@ static int partition_start_trans(struct ldb_module *module) ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)"); } - /* This order must match that in prepare_commit() */ + /* This order must match that in prepare_commit() and read_lock() */ ret = ldb_next_start_trans(module); if (ret != LDB_SUCCESS) { return ret; @@ -1144,6 +1144,178 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque return ldb_module_done(req, NULL, ext, LDB_SUCCESS); } +/* lock all the backends */ +static int partition_read_lock(struct ldb_module *module) +{ + int i; + int ret; + int ret2; + struct ldb_context *ldb = ldb_module_get_ctx(module); + struct partition_private_data *data = \ + talloc_get_type(ldb_module_get_private(module), + struct partition_private_data); + + if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) { + ldb_debug(ldb, LDB_DEBUG_TRACE, + "partition_read_lock() -> (metadata partition)"); + } + + /* + * It is important to only do this for LOCK because: + * - we don't want to unlock what we did not lock + * + * - we don't want to make a new lock on the sam.ldb + * (triggered inside this routine due to the seq num check) + * during an unlock phase as that will violate the lock + * ordering + */ + + /* + * This will lock the metadata partition (sam.ldb) and + * will also call event loops, so we do it before we + * get the whole db lock. + */ + ret = partition_reload_if_required(module, data, NULL); + if (ret != LDB_SUCCESS) { + return ret; + } + + /* + * This order must match that in prepare_commit(), start with + * the metadata partition (sam.ldb) lock + */ + ret = ldb_next_read_lock(module); + if (ret != LDB_SUCCESS) { + ldb_debug_set(ldb, + LDB_DEBUG_FATAL, + "Failed to lock db: %s / %s for metadata partition", + ldb_errstring(ldb), + ldb_strerror(ret)); + + return ret; + } + + /* + * The metadata partition (sam.ldb) lock is not + * enough to block another process in prepare_commit(), + * because prepare_commit() is a no-op, if nothing + * was changed in the specific backend. + * + * That means the following per partition locks are required. + */ + for (i=0; data && data->partitions && data->partitions[i]; i++) { + if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) { + ldb_debug(ldb, LDB_DEBUG_TRACE, + "partition_read_lock() -> %s", + ldb_dn_get_linearized( + data->partitions[i]->ctrl->dn)); + } + ret = ldb_next_read_lock(data->partitions[i]->module); + if (ret == LDB_SUCCESS) { + continue; + } + + ldb_debug_set(ldb, + LDB_DEBUG_FATAL, + "Failed to lock db: %s / %s for %s", + ldb_errstring(ldb), + ldb_strerror(ret), + ldb_dn_get_linearized( + data->partitions[i]->ctrl->dn)); + + /* Back it out, if it fails on one */ + for (i--; i >= 0; i--) { + ret2 = ldb_next_read_unlock(data->partitions[i]->module); + if (ret2 != LDB_SUCCESS) { + ldb_debug(ldb, + LDB_DEBUG_FATAL, + "Failed to unlock db: %s / %s", + ldb_errstring(ldb), + ldb_strerror(ret2)); + } + } + ret2 = ldb_next_read_unlock(module); + if (ret2 != LDB_SUCCESS) { + ldb_debug(ldb, + LDB_DEBUG_FATAL, + "Failed to unlock db: %s / %s", + ldb_errstring(ldb), + ldb_strerror(ret2)); + } + return ret; + } + + return LDB_SUCCESS; +} + +/* unlock all the backends */ +static int partition_read_unlock(struct ldb_module *module) +{ + int i; + int ret = LDB_SUCCESS; + int ret2; + struct ldb_context *ldb = ldb_module_get_ctx(module); + struct partition_private_data *data = \ + talloc_get_type(ldb_module_get_private(module), + struct partition_private_data); + + /* + * This order must be similar to partition_{end,del}_trans() + * the metadata partition (sam.ldb) unlock must be at the end. + */ + + for (i=0; data && data->partitions && data->partitions[i]; i++) { + if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) { + ldb_debug(ldb, LDB_DEBUG_TRACE, + "partition_read_unlock() -> %s", + ldb_dn_get_linearized( + data->partitions[i]->ctrl->dn)); + } + ret2 = ldb_next_read_unlock(data->partitions[i]->module); + if (ret2 != LDB_SUCCESS) { + ldb_debug_set(ldb, + LDB_DEBUG_FATAL, + "Failed to lock db: %s / %s for %s", + ldb_errstring(ldb), + ldb_strerror(ret), + ldb_dn_get_linearized( + data->partitions[i]->ctrl->dn)); + + /* + * Don't overwrite the original failure code + * if there was one + */ + if (ret == LDB_SUCCESS) { + ret = ret2; + } + } + } + + if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) { + ldb_debug(ldb, LDB_DEBUG_TRACE, + "partition_read_unlock() -> (metadata partition)"); + } + + ret2 = ldb_next_read_unlock(module); + if (ret2 != LDB_SUCCESS) { + ldb_debug_set(ldb, + LDB_DEBUG_FATAL, + "Failed to unlock db: %s / %s for metadata partition", + ldb_errstring(ldb), + ldb_strerror(ret2)); + + /* + * Don't overwrite the original failure code + * if there was one + */ + if (ret == LDB_SUCCESS) { + ret = ret2; + } + } + + return ret; +} + /* extended */ static int partition_extended(struct ldb_module *module, struct ldb_request *req) { @@ -1198,6 +1370,8 @@ static const struct ldb_module_ops ldb_partition_module_ops = { .prepare_commit = partition_prepare_commit, .end_transaction = partition_end_trans, .del_transaction = partition_del_trans, + .read_lock = partition_read_lock, + .read_unlock = partition_read_unlock }; int ldb_partition_module_init(const char *version) -- 2.34.1