dsdb: Remove dead code in partition_prep_request()
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
index f6453aac7f2c39173cb9637aec97574de9b45ffb..d44dc19320a33632a1a46e24b85d01f647c81d5d 100644 (file)
@@ -238,6 +238,7 @@ static int partition_prep_request(struct partition_context *ac,
        int ret;
        struct ldb_request *req;
        struct ldb_control *partition_ctrl = NULL;
+       void *part_data = NULL;
 
        ac->part_req = talloc_realloc(ac, ac->part_req,
                                        struct part_request,
@@ -323,42 +324,35 @@ static int partition_prep_request(struct partition_context *ac,
                }
        }
 
-       if (partition) {
-               void *part_data = partition->ctrl;
+       part_data = partition->ctrl;
 
-               ac->part_req[ac->num_requests].module = partition->module;
+       ac->part_req[ac->num_requests].module = partition->module;
 
-               if (partition_ctrl != NULL) {
-                       if (partition_ctrl->data != NULL) {
-                               part_data = partition_ctrl->data;
-                       }
-
-                       /*
-                        * If the provided current partition control is without
-                        * data then use the calculated one.
-                        */
-                       ret = ldb_request_add_control(req,
-                                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
-                                                     false, part_data);
-                       if (ret != LDB_SUCCESS) {
-                               return ret;
-                       }
+       if (partition_ctrl != NULL) {
+               if (partition_ctrl->data != NULL) {
+                       part_data = partition_ctrl->data;
                }
 
-               if (req->operation == LDB_SEARCH) {
-                       /* If the search is for 'more' than this partition,
-                        * then change the basedn, so a remote LDAP server
-                        * doesn't object */
-                       if (ldb_dn_compare_base(partition->ctrl->dn,
-                                               req->op.search.base) != 0) {
-                               req->op.search.base = partition->ctrl->dn;
-                       }
+               /*
+                * If the provided current partition control is without
+                * data then use the calculated one.
+                */
+               ret = ldb_request_add_control(req,
+                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
+                                             false, part_data);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
+       }
 
-       } else {
-               /* make sure you put the module here, or
-                * or ldb_next_request() will skip a module */
-               ac->part_req[ac->num_requests].module = ac->module;
+       if (req->operation == LDB_SEARCH) {
+               /* If the search is for 'more' than this partition,
+                * then change the basedn, so a remote LDAP server
+                * doesn't object */
+               if (ldb_dn_compare_base(partition->ctrl->dn,
+                                       req->op.search.base) != 0) {
+                       req->op.search.base = partition->ctrl->dn;
+               }
        }
 
        ac->num_requests++;
@@ -372,7 +366,7 @@ static int partition_call_first(struct partition_context *ac)
 }
 
 /**
- * Send a request down to all the partitions
+ * Send a request down to all the partitions (but not the sam.ldb file)
  */
 static int partition_send_all(struct ldb_module *module, 
                              struct partition_context *ac, 
@@ -381,10 +375,8 @@ static int partition_send_all(struct ldb_module *module,
        unsigned int i;
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
                                                              struct partition_private_data);
-       int ret = partition_prep_request(ac, NULL);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
+       int ret;
+
        for (i=0; data && data->partitions && data->partitions[i]; i++) {
                ret = partition_prep_request(ac, data->partitions[i]);
                if (ret != LDB_SUCCESS) {
@@ -396,32 +388,36 @@ static int partition_send_all(struct ldb_module *module,
        return partition_call_first(ac);
 }
 
+struct partition_copy_context {
+       struct ldb_module *module;
+       struct partition_context *partition_context;
+       struct ldb_request *request;
+       struct ldb_dn *dn;
+};
 
-/**
- * send an operation to the top partition, then copy the resulting
- * object to all other partitions
+/*
+ * A special DN has been updated in the primary partition. Now propagate those
+ * changes to the remaining partitions.
+ *
+ * Note: that the operations are asynchronous and this function is called
+ *       from partition_copy_all_callback_handler in response to an async
+ *       callback.
  */
-static int partition_copy_all(struct ldb_module *module,
-                             struct partition_context *ac,
-                             struct ldb_request *req,
-                             struct ldb_dn *dn)
+static int partition_copy_all_callback_action(
+       struct ldb_module *module,
+       struct partition_context *ac,
+       struct ldb_request *req,
+       struct ldb_dn *dn)
+
 {
+
        unsigned int i;
-       struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
-                                                             struct partition_private_data);
-       int ret, search_ret;
+       struct partition_private_data *data =
+               talloc_get_type(
+                       ldb_module_get_private(module),
+                       struct partition_private_data);
+       int search_ret;
        struct ldb_result *res;
-
-       /* do the request on the top level sam.ldb synchronously */
-       ret = ldb_next_request(module, req);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-       ret = ldb_wait(req->handle, LDB_WAIT_ALL);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
        /* now fetch the resulting object, and then copy it to all the
         * other partitions. We need this approach to cope with the
         * partitions getting out of sync. If for example the
@@ -430,39 +426,258 @@ static int partition_copy_all(struct ldb_module *module,
         * lead to an error
         */
        search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
-       if (search_ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+       if (search_ret != LDB_SUCCESS) {
                return search_ret;
        }
 
-       /* now delete the object in the other partitions. Once that is
-          done we will re-add the object, if search_ret was not
-          LDB_ERR_NO_SUCH_OBJECT
+       /* now delete the object in the other partitions, if requried
        */
+       if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
+               for (i=0; data->partitions && data->partitions[i]; i++) {
+                       int pret;
+                       pret = dsdb_module_del(data->partitions[i]->module,
+                                              dn,
+                                              DSDB_FLAG_NEXT_MODULE,
+                                              req);
+                       if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
+                               /* we should only get success or no
+                                  such object from the other partitions */
+                               return pret;
+                       }
+               }
+
+               return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
+       }
+
+       /* now add/modify in the other partitions */
        for (i=0; data->partitions && data->partitions[i]; i++) {
+               struct ldb_message *modify_msg = NULL;
                int pret;
-               pret = dsdb_module_del(data->partitions[i]->module, dn, DSDB_FLAG_NEXT_MODULE, req);
-               if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
-                       /* we should only get success or no
-                          such object from the other partitions */
+               unsigned int el_idx;
+
+               pret = dsdb_module_add(data->partitions[i]->module,
+                                      res->msgs[0],
+                                      DSDB_FLAG_NEXT_MODULE,
+                                      req);
+               if (pret == LDB_SUCCESS) {
+                       continue;
+               }
+
+               if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
                        return pret;
                }
-       }
 
+               modify_msg = ldb_msg_copy(req, res->msgs[0]);
+               if (modify_msg == NULL) {
+                       return ldb_module_oom(module);
+               }
 
-       if (search_ret != LDB_ERR_NO_SUCH_OBJECT) {
-               /* now re-add in the other partitions */
-               for (i=0; data->partitions && data->partitions[i]; i++) {
-                       int pret;
-                       pret = dsdb_module_add(data->partitions[i]->module, res->msgs[0], DSDB_FLAG_NEXT_MODULE, req);
-                       if (pret != LDB_SUCCESS) {
-                               return pret;
+               /*
+                * mark all the message elements as
+                * LDB_FLAG_MOD_REPLACE
+                */
+               for (el_idx=0;
+                    el_idx < modify_msg->num_elements;
+                    el_idx++) {
+                       modify_msg->elements[el_idx].flags
+                               = LDB_FLAG_MOD_REPLACE;
+               }
+
+               if (req->operation == LDB_MODIFY) {
+                       const struct ldb_message *req_msg = req->op.mod.message;
+                       /*
+                        * mark elements to be removed, if there were
+                        * deleted entirely above we need to delete
+                        * them here too
+                        */
+                       for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
+                               if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
+                                   || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
+                                       req_msg->elements[el_idx].num_values == 0)) {
+                                       if (ldb_msg_find_element(modify_msg,
+                                                                req_msg->elements[el_idx].name) != NULL) {
+                                               continue;
+                                       }
+                                       pret = ldb_msg_add_empty(
+                                               modify_msg,
+                                               req_msg->elements[el_idx].name,
+                                               LDB_FLAG_MOD_REPLACE,
+                                               NULL);
+                                       if (pret != LDB_SUCCESS) {
+                                               return pret;
+                                       }
+                               }
                        }
                }
+
+               pret = dsdb_module_modify(data->partitions[i]->module,
+                                         modify_msg,
+                                         DSDB_FLAG_NEXT_MODULE,
+                                         req);
+
+               if (pret != LDB_SUCCESS) {
+                       return pret;
+               }
        }
 
        return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
 }
 
+
+/*
+ * @brief call back function for the ldb operations on special DN's.
+ *
+ * As the LDB operations are async, and we wish to use the result
+ * the operations, a callback needs to be registered to process the results
+ * of the LDB operations.
+ *
+ * @param req the ldb request
+ * @param res the result of the operation
+ *
+ * @return the LDB_STATUS
+ */
+static int partition_copy_all_callback_handler(
+       struct ldb_request *req,
+       struct ldb_reply *ares)
+{
+       struct partition_copy_context *ac = NULL;
+
+       ac = talloc_get_type(
+               req->context,
+               struct partition_copy_context);
+
+       if (!ares) {
+               return ldb_module_done(
+                       ac->request,
+                       NULL,
+                       NULL,
+                       LDB_ERR_OPERATIONS_ERROR);
+       }
+
+       /* pass on to the callback */
+       switch (ares->type) {
+       case LDB_REPLY_ENTRY:
+               return ldb_module_send_entry(
+                       ac->request,
+                       ares->message,
+                       ares->controls);
+
+       case LDB_REPLY_REFERRAL:
+               return ldb_module_send_referral(
+                       ac->request,
+                       ares->referral);
+
+       case LDB_REPLY_DONE: {
+               int error = ares->error;
+               if (error == LDB_SUCCESS) {
+                       error = partition_copy_all_callback_action(
+                               ac->module,
+                               ac->partition_context,
+                               ac->request,
+                               ac->dn);
+               }
+               return ldb_module_done(
+                       ac->request,
+                       ares->controls,
+                       ares->response,
+                       error);
+       }
+
+       default:
+               /* Can't happen */
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+}
+
+/**
+ * send an operation to the top partition, then copy the resulting
+ * object to all other partitions.
+ */
+static int partition_copy_all(
+       struct ldb_module *module,
+       struct partition_context *partition_context,
+       struct ldb_request *req,
+       struct ldb_dn *dn)
+{
+       struct ldb_request *new_req = NULL;
+       struct ldb_context *ldb = NULL;
+       struct partition_copy_context *context = NULL;
+
+       int ret;
+
+       ldb = ldb_module_get_ctx(module);
+
+       context = talloc_zero(req, struct partition_copy_context);
+       if (context == NULL) {
+               return ldb_oom(ldb);
+       }
+       context->module = module;
+       context->request = req;
+       context->dn = dn;
+       context->partition_context = partition_context;
+
+       switch (req->operation) {
+       case LDB_ADD:
+               ret = ldb_build_add_req(
+                       &new_req,
+                       ldb,
+                       req,
+                       req->op.add.message,
+                       req->controls,
+                       context,
+                       partition_copy_all_callback_handler,
+                       req);
+               break;
+       case LDB_MODIFY:
+               ret = ldb_build_mod_req(
+                       &new_req,
+                       ldb,
+                       req,
+                       req->op.mod.message,
+                       req->controls,
+                       context,
+                       partition_copy_all_callback_handler,
+                       req);
+               break;
+       case LDB_DELETE:
+               ret = ldb_build_del_req(
+                       &new_req,
+                       ldb,
+                       req,
+                       req->op.del.dn,
+                       req->controls,
+                       context,
+                       partition_copy_all_callback_handler,
+                       req);
+               break;
+       case LDB_RENAME:
+               ret = ldb_build_rename_req(
+                       &new_req,
+                       ldb,
+                       req,
+                       req->op.rename.olddn,
+                       req->op.rename.newdn,
+                       req->controls,
+                       context,
+                       partition_copy_all_callback_handler,
+                       req);
+               break;
+       default:
+               /*
+                * Shouldn't happen.
+                */
+               ldb_debug(
+                       ldb,
+                       LDB_DEBUG_ERROR,
+                       "Unexpected operation type (%d)\n", req->operation);
+               ret = LDB_ERR_OPERATIONS_ERROR;
+               break;
+       }
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+       return ldb_next_request(module, new_req);
+}
 /**
  * Figure out which backend a request needs to be aimed at.  Some
  * requests must be replicated to all backends
@@ -531,7 +746,6 @@ static int partition_replicate(struct ldb_module *module, struct ldb_request *re
 /* search */
 static int partition_search(struct ldb_module *module, struct ldb_request *req)
 {
-       struct ldb_control **saved_controls;
        /* Find backend */
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
                                                              struct partition_private_data);
@@ -549,12 +763,6 @@ static int partition_search(struct ldb_module *module, struct ldb_request *req)
        int ret;
        bool domain_scope = false, phantom_root = false;
 
-       /* see if we are still up-to-date */
-       ret = partition_reload_if_required(module, data, req);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
        p = find_partition(data, NULL, req);
        if (p != NULL) {
                /* the caller specified what partition they want the
@@ -572,12 +780,6 @@ static int partition_search(struct ldb_module *module, struct ldb_request *req)
 
        }
 
-       /* Remove the "domain_scope" control, so we don't confuse a backend
-        * server */
-       if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
-               return ldb_oom(ldb_module_get_ctx(module));
-       }
-
        /* if we aren't initialised yet go further */
        if (!data || !data->partitions) {
                return ldb_next_request(module, req);
@@ -687,11 +889,17 @@ static int partition_search(struct ldb_module *module, struct ldb_request *req)
                                                 data->partitions[i]->ctrl->dn) == 0) &&
                            (ldb_dn_compare(req->op.search.base,
                                            data->partitions[i]->ctrl->dn) != 0)) {
-                               char *ref = talloc_asprintf(ac,
-                                                           "ldap://%s/%s%s",
-                                                           lpcfg_dnsdomain(lp_ctx),
-                                                           ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
-                                                           req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
+                               const char *scheme = ldb_get_opaque(
+                                   ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
+                               char *ref = talloc_asprintf(
+                                       ac,
+                                       "%s://%s/%s%s",
+                                       scheme == NULL ? "ldap" : scheme,
+                                       lpcfg_dnsdomain(lp_ctx),
+                                       ldb_dn_get_linearized(
+                                           data->partitions[i]->ctrl->dn),
+                                       req->op.search.scope ==
+                                           LDB_SCOPE_ONELEVEL ? "??base" : "");
 
                                if (ref == NULL) {
                                        return ldb_oom(ldb);
@@ -699,7 +907,8 @@ static int partition_search(struct ldb_module *module, struct ldb_request *req)
 
                                /* Initialise the referrals list */
                                if (ac->referrals == NULL) {
-                                       ac->referrals = (const char **) str_list_make_empty(ac);
+                                       char **l = str_list_make_empty(ac);
+                                       ac->referrals = discard_const_p(const char *, l);
                                        if (ac->referrals == NULL) {
                                                return ldb_oom(ldb);
                                        }
@@ -808,28 +1017,71 @@ static int partition_rename(struct ldb_module *module, struct ldb_request *req)
 }
 
 /* start a transaction */
-static int partition_start_trans(struct ldb_module *module)
+int partition_start_trans(struct ldb_module *module)
 {
-       unsigned int i;
-       int ret;
+       int i = 0;
+       int ret = 0;
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
                                                              struct partition_private_data);
        /* Look at base DN */
        /* Figure out which partition it is under */
        /* Skip the lot if 'data' isn't here yet (initialization) */
-       if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
+       if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
                ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
        }
+
+       /*
+        * We start a transaction on metadata.tdb first and end it last in
+        * end_trans. This makes locking semantics follow TDB rather than MDB,
+        * and effectively locks all partitions at once.
+        * Detail:
+        * Samba AD is special in that the partitions module (this file)
+        * combines multiple independently locked databases into one overall
+        * transaction. Changes across multiple partition DBs in a single
+        * transaction must ALL be either visible or invisible.
+        * The way this is achieved is by taking out a write lock on
+        * metadata.tdb at the start of prepare_commit, while unlocking it at
+        * the end of end_trans. This is matched by read_lock, ensuring it
+        * can't progress until that write lock is released.
+        *
+        * metadata.tdb needs to be a TDB file because MDB uses independent
+        * locks, which means a read lock and a write lock can be held at the
+        * same time, whereas in TDB, the two locks block each other. The TDB
+        * behaviour is required to implement the functionality described
+        * above.
+        *
+        * An important additional detail here is that if prepare_commit is
+        * called on a TDB without any changes being made, no write lock is
+        * taken. We address this by storing a sequence number in metadata.tdb
+        * which is updated every time a replicated attribute is modified.
+        * The possibility of a few unreplicated attributes being out of date
+        * turns out not to be a problem.
+        * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
+        * the same end as locking metadata.tdb, unless we made a modification
+        * to the @ records found there before every prepare_commit.
+        */
+       ret = partition_metadata_start_trans(module);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
        ret = ldb_next_start_trans(module);
        if (ret != LDB_SUCCESS) {
+               partition_metadata_del_trans(module);
                return ret;
        }
 
        ret = partition_reload_if_required(module, data, NULL);
        if (ret != LDB_SUCCESS) {
+               ldb_next_del_trans(module);
+               partition_metadata_del_trans(module);
                return ret;
        }
 
+       /*
+        * The following per partition locks are required mostly because TDB
+        * and MDB require locks before read and write ops are permitted.
+        */
        for (i=0; data && data->partitions && data->partitions[i]; i++) {
                if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
                        ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
@@ -842,6 +1094,7 @@ static int partition_start_trans(struct ldb_module *module)
                                ldb_next_del_trans(data->partitions[i]->module);
                        }
                        ldb_next_del_trans(module);
+                       partition_metadata_del_trans(module);
                        return ret;
                }
        }
@@ -852,15 +1105,28 @@ static int partition_start_trans(struct ldb_module *module)
 }
 
 /* prepare for a commit */
-static int partition_prepare_commit(struct ldb_module *module)
+int partition_prepare_commit(struct ldb_module *module)
 {
        unsigned int i;
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
                                                              struct partition_private_data);
+       int ret;
 
-       for (i=0; data && data->partitions && data->partitions[i]; i++) {
-               int ret;
+       /*
+        * Order of prepare_commit calls must match that in
+        * partition_start_trans. See comment in that function for detail.
+        */
+       ret = partition_metadata_prepare_commit(module);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       ret = ldb_next_prepare_commit(module);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
 
+       for (i=0; data && data->partitions && data->partitions[i]; i++) {
                if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
                        ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
                                  ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
@@ -877,17 +1143,20 @@ static int partition_prepare_commit(struct ldb_module *module)
        if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
                ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
        }
-       return ldb_next_prepare_commit(module);
+
+       return LDB_SUCCESS;
 }
 
 
 /* end a transaction */
-static int partition_end_trans(struct ldb_module *module)
+int partition_end_trans(struct ldb_module *module)
 {
        int ret, ret2;
-       unsigned int i;
+       int i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
                                                              struct partition_private_data);
+       bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
 
        ret = LDB_SUCCESS;
 
@@ -898,112 +1167,142 @@ static int partition_end_trans(struct ldb_module *module)
                data->in_transaction--;
        }
 
-       for (i=0; data && data->partitions && data->partitions[i]; i++) {
-               if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
-                       ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
-                                 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
-               }
-               ret2 = ldb_next_end_trans(data->partitions[i]->module);
-               if (ret2 != LDB_SUCCESS) {
-                       ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
-                                              ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
-                                              ldb_errstring(ldb_module_get_ctx(module)));
-                       ret = ret2;
+       /*
+        * Order of end_trans calls must be the reverse of that in
+        * partition_start_trans. See comment in that function for detail.
+        */
+       if (data && data->partitions) {
+               /* Just counting the partitions */
+               for (i=0; data->partitions[i]; i++) {}
+
+               /* now walk them backwards */
+               for (i--; i>=0; i--) {
+                       struct dsdb_partition *p = data->partitions[i];
+                       if (trace) {
+                               ldb_debug(ldb,
+                                         LDB_DEBUG_TRACE,
+                                         "partition_end_trans() -> %s",
+                                         ldb_dn_get_linearized(p->ctrl->dn));
+                       }
+                       ret2 = ldb_next_end_trans(p->module);
+                       if (ret2 != LDB_SUCCESS) {
+                               ldb_asprintf_errstring(ldb,
+                                       "end_trans error on %s: %s",
+                                       ldb_dn_get_linearized(p->ctrl->dn),
+                                       ldb_errstring(ldb));
+                               ret = ret2;
+                       }
                }
        }
 
-       if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
+       if (trace) {
                ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
        }
        ret2 = ldb_next_end_trans(module);
        if (ret2 != LDB_SUCCESS) {
                ret = ret2;
        }
+
+       ret2 = partition_metadata_end_trans(module);
+       if (ret2 != LDB_SUCCESS) {
+               ret = ret2;
+       }
+
        return ret;
 }
 
 /* delete a transaction */
-static int partition_del_trans(struct ldb_module *module)
+int partition_del_trans(struct ldb_module *module)
 {
        int ret, final_ret = LDB_SUCCESS;
-       unsigned int i;
+       int i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
                                                              struct partition_private_data);
-       for (i=0; data && data->partitions && data->partitions[i]; i++) {
-               if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
-                       ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
-                                 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
-               }
-               ret = ldb_next_del_trans(data->partitions[i]->module);
-               if (ret != LDB_SUCCESS) {
-                       ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
-                                              ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
-                                              ldb_errstring(ldb_module_get_ctx(module)));
-                       final_ret = ret;
-               }
-       }       
+       bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
 
-       if (data->in_transaction == 0) {
-               DEBUG(0,("partition del transaction mismatch\n"));
-               return ldb_operr(ldb_module_get_ctx(module));
+       if (data == NULL) {
+               DEBUG(0,("partion delete transaction with no private data\n"));
+               return ldb_operr(ldb);
        }
-       data->in_transaction--;
 
-       if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
+       /*
+        * Order of del_trans calls must be the reverse of that in
+        * partition_start_trans. See comment in that function for detail.
+        */
+       if (data->partitions) {
+               /* Just counting the partitions */
+               for (i=0; data->partitions[i]; i++) {}
+
+               /* now walk them backwards */
+               for (i--; i>=0; i--) {
+                       struct dsdb_partition *p = data->partitions[i];
+                       if (trace) {
+                               ldb_debug(ldb,
+                                         LDB_DEBUG_TRACE,
+                                         "partition_del_trans() -> %s",
+                                         ldb_dn_get_linearized(p->ctrl->dn));
+                       }
+                       ret = ldb_next_del_trans(p->module);
+                       if (ret != LDB_SUCCESS) {
+                               ldb_asprintf_errstring(ldb,
+                                       "del_trans error on %s: %s",
+                                       ldb_dn_get_linearized(p->ctrl->dn),
+                                       ldb_errstring(ldb));
+                               final_ret = ret;
+                       }
+               }
+       }
+
+       if (trace) {
                ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
        }
        ret = ldb_next_del_trans(module);
        if (ret != LDB_SUCCESS) {
                final_ret = ret;
        }
+
+       ret = partition_metadata_del_trans(module);
+       if (ret != LDB_SUCCESS) {
+               final_ret = ret;
+       }
+
+       if (data->in_transaction == 0) {
+               DEBUG(0,("partition del transaction mismatch\n"));
+               return ldb_operr(ldb_module_get_ctx(module));
+       }
+       data->in_transaction--;
+
        return final_ret;
 }
 
 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
-                                    enum ldb_sequence_type type, uint64_t *seq_number) 
+                                     uint64_t *seq_number,
+                                     struct ldb_request *parent)
 {
        int ret;
        struct ldb_result *res;
        struct ldb_seqnum_request *tseq;
-       struct ldb_request *treq;
        struct ldb_seqnum_result *seqr;
-       res = talloc_zero(mem_ctx, struct ldb_result);
-       if (res == NULL) {
-               return ldb_oom(ldb_module_get_ctx(module));
-       }
-       tseq = talloc_zero(res, struct ldb_seqnum_request);
+
+       tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
        if (tseq == NULL) {
-               talloc_free(res);
                return ldb_oom(ldb_module_get_ctx(module));
        }
-       tseq->type = type;
-       
-       ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
-                                    LDB_EXTENDED_SEQUENCE_NUMBER,
-                                    tseq,
-                                    NULL,
-                                    res,
-                                    ldb_extended_default_callback,
-                                    NULL);
-       LDB_REQ_SET_LOCATION(treq);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(res);
-               return ret;
-       }
+       tseq->type = LDB_SEQ_HIGHEST_SEQ;
        
-       ret = ldb_next_request(module, treq);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(res);
-               return ret;
-       }
-       ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
+       ret = dsdb_module_extended(module, tseq, &res,
+                                  LDB_EXTENDED_SEQUENCE_NUMBER,
+                                  tseq,
+                                  DSDB_FLAG_NEXT_MODULE,
+                                  parent);
        if (ret != LDB_SUCCESS) {
-               talloc_free(res);
+               talloc_free(tseq);
                return ret;
        }
        
-       seqr = talloc_get_type(res->extended->data,
-                              struct ldb_seqnum_result);
+       seqr = talloc_get_type_abort(res->extended->data,
+                                    struct ldb_seqnum_result);
        if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
                talloc_free(res);
                return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
@@ -1011,125 +1310,350 @@ int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem
        }
 
        *seq_number = seqr->seq_num;
-       talloc_free(res);
+       talloc_free(tseq);
        return LDB_SUCCESS;
 }
 
-/* FIXME: This function is still semi-async */
-static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
+
+/*
+ * Older version of sequence number as sum of sequence numbers for each partition
+ */
+int partition_sequence_number_from_partitions(struct ldb_module *module,
+                                             uint64_t *seqr)
 {
        int ret;
        unsigned int i;
        uint64_t seq_number = 0;
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
                                                              struct partition_private_data);
-       struct ldb_seqnum_request *seq;
-       struct ldb_seqnum_result *seqr;
-       struct ldb_request *treq;
-       struct ldb_seqnum_request *tseq;
-       struct ldb_seqnum_result *tseqr;
-       struct ldb_extended *ext;
-       struct ldb_result *res;
-       struct dsdb_partition *p;
 
-       p = find_partition(data, NULL, req);
-       if (p != NULL) {
-               /* the caller specified what partition they want the
-                * sequence number operation on - just pass it on
-                */
-               return ldb_next_request(p->module, req);                
+       ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+       
+       /* Skip the lot if 'data' isn't here yet (initialisation) */
+       for (i=0; data && data->partitions && data->partitions[i]; i++) {
+               struct ldb_seqnum_request *tseq;
+               struct ldb_seqnum_result *tseqr;
+               struct ldb_request *treq;
+               struct ldb_result *res = talloc_zero(data, struct ldb_result);
+               if (res == NULL) {
+                       return ldb_oom(ldb_module_get_ctx(module));
+               }
+               tseq = talloc_zero(res, struct ldb_seqnum_request);
+               if (tseq == NULL) {
+                       talloc_free(res);
+                       return ldb_oom(ldb_module_get_ctx(module));
+               }
+               tseq->type = LDB_SEQ_HIGHEST_SEQ;
+               
+               ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
+                                            LDB_EXTENDED_SEQUENCE_NUMBER,
+                                            tseq,
+                                            NULL,
+                                            res,
+                                            ldb_extended_default_callback,
+                                            NULL);
+               LDB_REQ_SET_LOCATION(treq);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(res);
+                       return ret;
+               }
+               
+               ret = partition_request(data->partitions[i]->module, treq);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(res);
+                       return ret;
+               }
+               ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(res);
+                       return ret;
+               }
+               tseqr = talloc_get_type(res->extended->data,
+                                       struct ldb_seqnum_result);
+               seq_number += tseqr->seq_num;
+               talloc_free(res);
        }
 
-       seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
+       *seqr = seq_number;
+       return LDB_SUCCESS;
+}
 
+
+/*
+ * Newer version of sequence number using metadata tdb
+ */
+static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
+{
+       struct ldb_extended *ext;
+       struct ldb_seqnum_request *seq;
+       struct ldb_seqnum_result *seqr;
+       uint64_t seq_number;
+       int ret;
+
+       seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
        switch (seq->type) {
        case LDB_SEQ_NEXT:
-       case LDB_SEQ_HIGHEST_SEQ:
-
-               ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
+               ret = partition_metadata_sequence_number_increment(module, &seq_number);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
+               break;
 
-               /* Skip the lot if 'data' isn't here yet (initialisation) */
-               for (i=0; data && data->partitions && data->partitions[i]; i++) {
-
-                       res = talloc_zero(req, struct ldb_result);
-                       if (res == NULL) {
-                               return ldb_oom(ldb_module_get_ctx(module));
-                       }
-                       tseq = talloc_zero(res, struct ldb_seqnum_request);
-                       if (tseq == NULL) {
-                               talloc_free(res);
-                               return ldb_oom(ldb_module_get_ctx(module));
-                       }
-                       tseq->type = seq->type;
-
-                       ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
-                                                    LDB_EXTENDED_SEQUENCE_NUMBER,
-                                                    tseq,
-                                                    NULL,
-                                                    res,
-                                                    ldb_extended_default_callback,
-                                                    req);
-                       LDB_REQ_SET_LOCATION(treq);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(res);
-                               return ret;
-                       }
-
-                       ret = ldb_request_add_control(treq,
-                                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
-                                                     false, data->partitions[i]->ctrl);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(res);
-                               return ret;
-                       }
-
-                       ret = partition_request(data->partitions[i]->module, treq);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(res);
-                               return ret;
-                       }
-                       ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(res);
-                               return ret;
-                       }
-                       tseqr = talloc_get_type(res->extended->data,
-                                               struct ldb_seqnum_result);
-                       seq_number += tseqr->seq_num;
-                       talloc_free(res);
+       case LDB_SEQ_HIGHEST_SEQ:
+               ret = partition_metadata_sequence_number(module, &seq_number);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
                break;
 
        case LDB_SEQ_HIGHEST_TIMESTAMP:
-               return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR, "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
+               return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
+                                       "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
        }
 
        ext = talloc_zero(req, struct ldb_extended);
        if (!ext) {
-               return ldb_oom(ldb_module_get_ctx(module));
+               return ldb_module_oom(module);
        }
        seqr = talloc_zero(ext, struct ldb_seqnum_result);
        if (seqr == NULL) {
                talloc_free(ext);
-               return ldb_oom(ldb_module_get_ctx(module));
+               return ldb_module_oom(module);
        }
        ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
        ext->data = seqr;
 
        seqr->seq_num = seq_number;
-       if (seq->type == LDB_SEQ_NEXT) {
-               seqr->seq_num++;
-       }
-
        seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
 
        /* send request done */
        return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
 }
 
+/* lock all the backends */
+int partition_read_lock(struct ldb_module *module)
+{
+       int i = 0;
+       int ret = 0;
+       int ret2 = 0;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct partition_private_data *data = \
+               talloc_get_type(ldb_module_get_private(module),
+                               struct partition_private_data);
+
+       if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
+               ldb_debug(ldb, LDB_DEBUG_TRACE,
+                         "partition_read_lock() -> (metadata partition)");
+       }
+
+       /*
+        * It is important to only do this for LOCK because:
+        * - we don't want to unlock what we did not lock
+        *
+        * - we don't want to make a new lock on the sam.ldb
+        *   (triggered inside this routine due to the seq num check)
+        *   during an unlock phase as that will violate the lock
+        *   ordering
+        */
+
+       if (data == NULL) {
+               TALLOC_CTX *mem_ctx = talloc_new(module);
+
+               data = talloc_zero(mem_ctx, struct partition_private_data);
+               if (data == NULL) {
+                       talloc_free(mem_ctx);
+                       return ldb_operr(ldb);
+               }
+
+               /*
+                * When used from Samba4, this message is set by the
+                * samba4 module, as a fixed value not read from the
+                * DB.  This avoids listing modules in the DB
+                */
+               data->forced_module_msg = talloc_get_type(
+                       ldb_get_opaque(ldb,
+                                      DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
+                       struct ldb_message);
+
+               ldb_module_set_private(module, talloc_steal(module,
+                                                           data));
+               talloc_free(mem_ctx);
+       }
+
+       /*
+        * This will lock sam.ldb and will also call event loops,
+        * so we do it before we get the whole db lock.
+        */
+       ret = partition_reload_if_required(module, data, NULL);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * Order of read_lock calls must match that in partition_start_trans.
+        * See comment in that function for detail.
+        */
+       ret = partition_metadata_read_lock(module);
+       if (ret != LDB_SUCCESS) {
+               goto failed;
+       }
+
+       /*
+        * The top level DB (sam.ldb) lock is not enough to block another
+        * process in prepare_commit(), because if nothing was changed in the
+        * specific backend, then prepare_commit() is a no-op. Therefore the
+        * metadata.tdb lock is taken out above, as it is the best we can do
+        * right now.
+        */
+       ret = ldb_next_read_lock(module);
+       if (ret != LDB_SUCCESS) {
+               ldb_debug_set(ldb,
+                             LDB_DEBUG_FATAL,
+                             "Failed to lock db: %s / %s for metadata partition",
+                             ldb_errstring(ldb),
+                             ldb_strerror(ret));
+
+               return ret;
+       }
+
+       /*
+        * The following per partition locks are required mostly because TDB
+        * and MDB require locks before reads are permitted.
+        */
+       for (i=0; data && data->partitions && data->partitions[i]; i++) {
+               if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
+                       ldb_debug(ldb, LDB_DEBUG_TRACE,
+                                 "partition_read_lock() -> %s",
+                                 ldb_dn_get_linearized(
+                                         data->partitions[i]->ctrl->dn));
+               }
+               ret = ldb_next_read_lock(data->partitions[i]->module);
+               if (ret == LDB_SUCCESS) {
+                       continue;
+               }
+
+               ldb_debug_set(ldb,
+                             LDB_DEBUG_FATAL,
+                             "Failed to lock db: %s / %s for %s",
+                             ldb_errstring(ldb),
+                             ldb_strerror(ret),
+                             ldb_dn_get_linearized(
+                                     data->partitions[i]->ctrl->dn));
+
+               goto failed;
+       }
+
+       return LDB_SUCCESS;
+
+failed:
+       /* Back it out, if it fails on one */
+       for (i--; i >= 0; i--) {
+               ret2 = ldb_next_read_unlock(data->partitions[i]->module);
+               if (ret2 != LDB_SUCCESS) {
+                       ldb_debug(ldb,
+                                 LDB_DEBUG_FATAL,
+                                 "Failed to unlock db: %s / %s",
+                                 ldb_errstring(ldb),
+                                 ldb_strerror(ret2));
+               }
+       }
+       ret2 = ldb_next_read_unlock(module);
+       if (ret2 != LDB_SUCCESS) {
+               ldb_debug(ldb,
+                         LDB_DEBUG_FATAL,
+                         "Failed to unlock db: %s / %s",
+                         ldb_errstring(ldb),
+                         ldb_strerror(ret2));
+       }
+       return ret;
+}
+
+/* unlock all the backends */
+int partition_read_unlock(struct ldb_module *module)
+{
+       int i;
+       int ret = LDB_SUCCESS;
+       int ret2;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct partition_private_data *data = \
+               talloc_get_type(ldb_module_get_private(module),
+                               struct partition_private_data);
+       bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
+
+       /*
+        * Order of read_unlock calls must be the reverse of that in
+        * partition_start_trans. See comment in that function for detail.
+        */
+       if (data && data->partitions) {
+               /* Just counting the partitions */
+               for (i=0; data->partitions[i]; i++) {}
+
+               /* now walk them backwards */
+               for (i--; i>=0; i--) {
+                       struct dsdb_partition *p = data->partitions[i];
+                       if (trace) {
+                               ldb_debug(ldb, LDB_DEBUG_TRACE,
+                                         "partition_read_unlock() -> %s",
+                                         ldb_dn_get_linearized(p->ctrl->dn));
+                       }
+                       ret2 = ldb_next_read_unlock(p->module);
+                       if (ret2 != LDB_SUCCESS) {
+                               ldb_debug_set(ldb,
+                                          LDB_DEBUG_FATAL,
+                                          "Failed to lock db: %s / %s for %s",
+                                          ldb_errstring(ldb),
+                                          ldb_strerror(ret),
+                                          ldb_dn_get_linearized(p->ctrl->dn));
+
+                               /*
+                                * Don't overwrite the original failure code
+                                * if there was one
+                                */
+                               if (ret == LDB_SUCCESS) {
+                                       ret = ret2;
+                               }
+                       }
+               }
+       }
+
+       if (trace) {
+               ldb_debug(ldb, LDB_DEBUG_TRACE,
+                         "partition_read_unlock() -> (metadata partition)");
+       }
+
+       ret2 = ldb_next_read_unlock(module);
+       if (ret2 != LDB_SUCCESS) {
+               ldb_debug_set(ldb,
+                             LDB_DEBUG_FATAL,
+                             "Failed to unlock db: %s / %s for metadata partition",
+                             ldb_errstring(ldb),
+                             ldb_strerror(ret2));
+
+               /*
+                * Don't overwrite the original failure code
+                * if there was one
+                */
+               if (ret == LDB_SUCCESS) {
+                       ret = ret2;
+               }
+       }
+
+       ret = partition_metadata_read_unlock(module);
+
+       /*
+        * Don't overwrite the original failure code
+        * if there was one
+        */
+       if (ret == LDB_SUCCESS) {
+               ret = ret2;
+       }
+
+       return ret;
+}
+
 /* extended */
 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
 {
@@ -1143,10 +1667,11 @@ static int partition_extended(struct ldb_module *module, struct ldb_request *req
                return ldb_next_request(module, req);
        }
 
-       /* see if we are still up-to-date */
-       ret = partition_reload_if_required(module, data, req);
-       if (ret != LDB_SUCCESS) {
-               return ret;
+       if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
+               /* Update the metadata.tdb to increment the schema version if needed*/
+               DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
+               ret = partition_metadata_inc_schema_sequence(module);
+               return ldb_module_done(req, NULL, NULL, ret);
        }
        
        if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
@@ -1183,6 +1708,8 @@ static const struct ldb_module_ops ldb_partition_module_ops = {
        .prepare_commit    = partition_prepare_commit,
        .end_transaction   = partition_end_trans,
        .del_transaction   = partition_del_trans,
+       .read_lock         = partition_read_lock,
+       .read_unlock       = partition_read_unlock
 };
 
 int ldb_partition_module_init(const char *version)