lib ldb key value: add nested transaction support.
authorGary Lockyer <gary@catalyst.net.nz>
Wed, 6 Mar 2019 21:37:18 +0000 (10:37 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Fri, 21 Jun 2019 04:27:12 +0000 (04:27 +0000)
Use the nested transaction support added to the key value back ends to
make key value operations atomic. This will ensure that rename
operation failures, which delete the original record and add a new
record, leave the database in a consistent state.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
lib/ldb/ldb_key_value/ldb_kv.c
lib/ldb/ldb_key_value/ldb_kv.h

index 260087fbfc4085e20ccfac2425ef905674ab7845..16944b8b164516f27ea979dce4f4a3ab8821e2f8 100644 (file)
@@ -469,6 +469,81 @@ static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
        return false;
 }
 
+
+/*
+ * Place holder actual implementation to be added in subsequent commits
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+       return LDB_SUCCESS;
+}
+/*
+ * Starts a sub transaction if they are supported by the backend
+ */
+static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+       int ret = LDB_SUCCESS;
+
+       ret = ldb_kv->kv_ops->begin_nested_write(ldb_kv);
+       if (ret == LDB_SUCCESS) {
+               ret = ldb_kv_index_sub_transaction_start(ldb_kv);
+       }
+       return ret;
+}
+
+/*
+ * Place holder actual implementation to be added in subsequent commits
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+       return LDB_SUCCESS;
+}
+/*
+ * Commits a sub transaction if they are supported by the backend
+ */
+static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+       int ret = LDB_SUCCESS;
+
+       ret = ldb_kv_index_sub_transaction_commit(ldb_kv);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+       ret = ldb_kv->kv_ops->finish_nested_write(ldb_kv);
+       return ret;
+}
+
+/*
+ * Place holder actual implementation to be added in subsequent commits
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+       return LDB_SUCCESS;
+}
+/*
+ * Cancels a sub transaction if they are supported by the backend
+ */
+static int ldb_kv_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+       int ret = LDB_SUCCESS;
+
+       ret = ldb_kv_index_sub_transaction_cancel(ldb_kv);
+       if (ret != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+               /*
+                * In the event of a failure we log the failure and continue
+                * as we need to cancel the database transaction.
+                */
+               ldb_debug(ldb,
+                         LDB_DEBUG_ERROR,
+                         __location__": ldb_kv_index_sub_transaction_cancel "
+                         "failed: %s",
+                         ldb_errstring(ldb));
+       }
+       ret = ldb_kv->kv_ops->abort_nested_write(ldb_kv);
+       return ret;
+}
+
 static int ldb_kv_add_internal(struct ldb_module *module,
                               struct ldb_kv_private *ldb_kv,
                               const struct ldb_message *msg,
@@ -615,7 +690,23 @@ static int ldb_kv_add(struct ldb_kv_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       ret = ldb_kv_sub_transaction_start(ldb_kv);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
        ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
+       if (ret != LDB_SUCCESS) {
+               int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+               if (r != LDB_SUCCESS) {
+                       ldb_debug(
+                               ldb_module_get_ctx(module),
+                               LDB_DEBUG_FATAL,
+                               __location__
+                               ": Unable to roll back sub transaction");
+               }
+               return ret;
+       }
+       ret = ldb_kv_sub_transaction_commit(ldb_kv);
 
        return ret;
 }
@@ -705,6 +796,9 @@ static int ldb_kv_delete(struct ldb_kv_context *ctx)
 {
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ldb_kv_private *ldb_kv =
+           talloc_get_type(data, struct ldb_kv_private);
        int ret = LDB_SUCCESS;
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
@@ -713,7 +807,23 @@ static int ldb_kv_delete(struct ldb_kv_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       ret = ldb_kv_sub_transaction_start(ldb_kv);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
        ret = ldb_kv_delete_internal(module, req->op.del.dn);
+       if (ret != LDB_SUCCESS) {
+               int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+               if (r != LDB_SUCCESS) {
+                       ldb_debug(
+                               ldb_module_get_ctx(module),
+                               LDB_DEBUG_FATAL,
+                               __location__
+                               ": Unable to roll back sub transaction");
+               }
+               return ret;
+       }
+       ret = ldb_kv_sub_transaction_commit(ldb_kv);
 
        return ret;
 }
@@ -1237,6 +1347,9 @@ static int ldb_kv_modify(struct ldb_kv_context *ctx)
 {
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ldb_kv_private *ldb_kv =
+           talloc_get_type(data, struct ldb_kv_private);
        int ret = LDB_SUCCESS;
 
        ret = ldb_kv_check_special_dn(module, req->op.mod.message);
@@ -1250,7 +1363,56 @@ static int ldb_kv_modify(struct ldb_kv_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       ret = ldb_kv_sub_transaction_start(ldb_kv);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
        ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
+       if (ret != LDB_SUCCESS) {
+               int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+               if (r != LDB_SUCCESS) {
+                       ldb_debug(
+                               ldb_module_get_ctx(module),
+                               LDB_DEBUG_FATAL,
+                               __location__
+                               ": Unable to roll back sub transaction");
+               }
+               return ret;
+       }
+       ret = ldb_kv_sub_transaction_commit(ldb_kv);
+
+
+       return ret;
+}
+
+static int ldb_kv_rename_internal(struct ldb_module *module,
+                          struct ldb_request *req,
+                          struct ldb_message *msg)
+{
+       void *data = ldb_module_get_private(module);
+       struct ldb_kv_private *ldb_kv =
+           talloc_get_type(data, struct ldb_kv_private);
+       int ret = LDB_SUCCESS;
+
+       /* Always delete first then add, to avoid conflicts with
+        * unique indexes. We rely on the transaction to make this
+        * atomic
+        */
+       ret = ldb_kv_delete_internal(module, msg->dn);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
+       if (msg->dn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* We don't check single value as we can have more than 1 with
+        * deleted attributes. We could go through all elements but that's
+        * maybe not the most efficient way
+        */
+       ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
 
        return ret;
 }
@@ -1355,28 +1517,26 @@ static int ldb_kv_rename(struct ldb_kv_context *ctx)
        talloc_free(key_old.data);
        talloc_free(key.data);
 
-       /* Always delete first then add, to avoid conflicts with
-        * unique indexes. We rely on the transaction to make this
-        * atomic
-        */
-       ret = ldb_kv_delete_internal(module, msg->dn);
+
+       ret = ldb_kv_sub_transaction_start(ldb_kv);
        if (ret != LDB_SUCCESS) {
                talloc_free(msg);
                return ret;
        }
-
-       msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
-       if (msg->dn == NULL) {
+       ret = ldb_kv_rename_internal(module, req, msg);
+       if (ret != LDB_SUCCESS) {
+               int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+               if (r != LDB_SUCCESS) {
+                       ldb_debug(
+                               ldb_module_get_ctx(module),
+                               LDB_DEBUG_FATAL,
+                               __location__
+                               ": Unable to roll back sub transaction");
+               }
                talloc_free(msg);
-               return LDB_ERR_OPERATIONS_ERROR;
+               return ret;
        }
-
-       /* We don't check single value as we can have more than 1 with
-        * deleted attributes. We could go through all elements but that's
-        * maybe not the most efficient way
-        */
-       ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
-
+       ret = ldb_kv_sub_transaction_commit(ldb_kv);
        talloc_free(msg);
 
        return ret;
index e7fcbd9be8d42d1723f931a33eccdd830298c2b1..f344155eedd2a5eef7a17c6e3fbdfa8030e17ac8 100644 (file)
@@ -304,4 +304,7 @@ int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
                      struct ldb_context *ldb,
                      const char *options[],
                      struct ldb_module **_module);
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv);
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv);
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv);
 #endif /* __LDB_KV_H__ */