4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
83 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
92 struct ldb_request *req)
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
137 struct ldb_control *partition_ctrl;
139 ac = talloc_get_type(req->context, struct partition_context);
142 return ldb_module_done(ac->req, NULL, NULL,
143 LDB_ERR_OPERATIONS_ERROR);
146 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148 /* If we didn't fan this request out to mulitple partitions,
149 * or this is an individual search result, we can
150 * deterministically tell the caller what partition this was
151 * written to (repl_meta_data likes to know) */
152 ret = ldb_reply_add_control(ares,
153 DSDB_CONTROL_CURRENT_PARTITION_OID,
154 false, partition_ctrl->data);
155 if (ret != LDB_SUCCESS) {
156 return ldb_module_done(ac->req, NULL, NULL,
161 if (ares->error != LDB_SUCCESS) {
162 return ldb_module_done(ac->req, ares->controls,
163 ares->response, ares->error);
166 switch (ares->type) {
167 case LDB_REPLY_REFERRAL:
168 return ldb_module_send_referral(ac->req, ares->referral);
170 case LDB_REPLY_ENTRY:
171 if (ac->req->operation != LDB_SEARCH) {
172 ldb_set_errstring(ldb_module_get_ctx(ac->module),
173 "partition_req_callback:"
174 " Unsupported reply type for this request");
175 return ldb_module_done(ac->req, NULL, NULL,
176 LDB_ERR_OPERATIONS_ERROR);
179 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182 if (ac->req->operation == LDB_EXTENDED) {
183 /* FIXME: check for ares->response, replmd does not fill it ! */
184 if (ares->response) {
185 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186 ldb_set_errstring(ldb_module_get_ctx(ac->module),
187 "partition_req_callback:"
188 " Unknown extended reply, "
189 "only supports START_TLS");
191 return ldb_module_done(ac->req, NULL, NULL,
192 LDB_ERR_OPERATIONS_ERROR);
197 ac->finished_requests++;
198 if (ac->finished_requests == ac->num_requests) {
199 /* Send back referrals if they do exist (search ops) */
200 if (ac->referrals != NULL) {
202 for (ref = ac->referrals; *ref != NULL; ++ref) {
203 ret = ldb_module_send_referral(ac->req,
204 talloc_strdup(ac->req, *ref));
205 if (ret != LDB_SUCCESS) {
206 return ldb_module_done(ac->req, NULL, NULL,
212 /* this was the last one, call callback */
213 return ldb_module_done(ac->req, ares->controls,
218 /* not the last, now call the next one */
219 module = ac->part_req[ac->finished_requests].module;
220 nreq = ac->part_req[ac->finished_requests].req;
222 ret = partition_request(module, nreq);
223 if (ret != LDB_SUCCESS) {
225 return ldb_module_done(ac->req, NULL, NULL, ret);
235 static int partition_prep_request(struct partition_context *ac,
236 struct dsdb_partition *partition)
239 struct ldb_request *req;
240 struct ldb_control *partition_ctrl = NULL;
241 void *part_data = NULL;
243 ac->part_req = talloc_realloc(ac, ac->part_req,
245 ac->num_requests + 1);
246 if (ac->part_req == NULL) {
247 return ldb_oom(ldb_module_get_ctx(ac->module));
250 switch (ac->req->operation) {
252 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
254 ac->req->op.search.base,
255 ac->req->op.search.scope,
256 ac->req->op.search.tree,
257 ac->req->op.search.attrs,
259 ac, partition_req_callback,
261 LDB_REQ_SET_LOCATION(req);
264 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265 ac->req->op.add.message,
267 ac, partition_req_callback,
269 LDB_REQ_SET_LOCATION(req);
272 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273 ac->req->op.mod.message,
275 ac, partition_req_callback,
277 LDB_REQ_SET_LOCATION(req);
280 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
283 ac, partition_req_callback,
285 LDB_REQ_SET_LOCATION(req);
288 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
289 ac->req->op.rename.olddn,
290 ac->req->op.rename.newdn,
292 ac, partition_req_callback,
294 LDB_REQ_SET_LOCATION(req);
297 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
299 ac->req->op.extended.oid,
300 ac->req->op.extended.data,
302 ac, partition_req_callback,
304 LDB_REQ_SET_LOCATION(req);
307 ldb_set_errstring(ldb_module_get_ctx(ac->module),
308 "Unsupported request type!");
309 ret = LDB_ERR_UNWILLING_TO_PERFORM;
312 if (ret != LDB_SUCCESS) {
316 ac->part_req[ac->num_requests].req = req;
318 if (ac->req->controls) {
319 /* Duplicate everything beside the current partition control */
320 partition_ctrl = ldb_request_get_control(ac->req,
321 DSDB_CONTROL_CURRENT_PARTITION_OID);
322 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
323 return ldb_module_oom(ac->module);
327 part_data = partition->ctrl;
329 ac->part_req[ac->num_requests].module = partition->module;
331 if (partition_ctrl != NULL) {
332 if (partition_ctrl->data != NULL) {
333 part_data = partition_ctrl->data;
337 * If the provided current partition control is without
338 * data then use the calculated one.
340 ret = ldb_request_add_control(req,
341 DSDB_CONTROL_CURRENT_PARTITION_OID,
343 if (ret != LDB_SUCCESS) {
348 if (req->operation == LDB_SEARCH) {
349 /* If the search is for 'more' than this partition,
350 * then change the basedn, so a remote LDAP server
352 if (ldb_dn_compare_base(partition->ctrl->dn,
353 req->op.search.base) != 0) {
354 req->op.search.base = partition->ctrl->dn;
363 static int partition_call_first(struct partition_context *ac)
365 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
369 * Send a request down to all the partitions (but not the sam.ldb file)
371 static int partition_send_all(struct ldb_module *module,
372 struct partition_context *ac,
373 struct ldb_request *req)
376 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
377 struct partition_private_data);
380 for (i=0; data && data->partitions && data->partitions[i]; i++) {
381 ret = partition_prep_request(ac, data->partitions[i]);
382 if (ret != LDB_SUCCESS) {
387 /* fire the first one */
388 return partition_call_first(ac);
391 struct partition_copy_context {
392 struct ldb_module *module;
393 struct partition_context *partition_context;
394 struct ldb_request *request;
399 * A special DN has been updated in the primary partition. Now propagate those
400 * changes to the remaining partitions.
402 * Note: that the operations are asynchronous and this function is called
403 * from partition_copy_all_callback_handler in response to an async
406 static int partition_copy_all_callback_action(
407 struct ldb_module *module,
408 struct partition_context *ac,
409 struct ldb_request *req,
415 struct partition_private_data *data =
417 ldb_module_get_private(module),
418 struct partition_private_data);
420 struct ldb_result *res;
421 /* now fetch the resulting object, and then copy it to all the
422 * other partitions. We need this approach to cope with the
423 * partitions getting out of sync. If for example the
424 * @ATTRIBUTES object exists on one partition but not the
425 * others then just doing each of the partitions in turn will
428 search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
429 if (search_ret != LDB_SUCCESS) {
433 /* now delete the object in the other partitions, if requried
435 if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
436 for (i=0; data->partitions && data->partitions[i]; i++) {
438 pret = dsdb_module_del(data->partitions[i]->module,
440 DSDB_FLAG_NEXT_MODULE,
442 if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
443 /* we should only get success or no
444 such object from the other partitions */
449 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
452 /* now add/modify in the other partitions */
453 for (i=0; data->partitions && data->partitions[i]; i++) {
454 struct ldb_message *modify_msg = NULL;
458 pret = dsdb_module_add(data->partitions[i]->module,
460 DSDB_FLAG_NEXT_MODULE,
462 if (pret == LDB_SUCCESS) {
466 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
470 modify_msg = ldb_msg_copy(req, res->msgs[0]);
471 if (modify_msg == NULL) {
472 return ldb_module_oom(module);
476 * mark all the message elements as
477 * LDB_FLAG_MOD_REPLACE
480 el_idx < modify_msg->num_elements;
482 modify_msg->elements[el_idx].flags
483 = LDB_FLAG_MOD_REPLACE;
486 if (req->operation == LDB_MODIFY) {
487 const struct ldb_message *req_msg = req->op.mod.message;
489 * mark elements to be removed, if there were
490 * deleted entirely above we need to delete
493 for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
494 if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
495 || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
496 req_msg->elements[el_idx].num_values == 0)) {
497 if (ldb_msg_find_element(modify_msg,
498 req_msg->elements[el_idx].name) != NULL) {
501 pret = ldb_msg_add_empty(
503 req_msg->elements[el_idx].name,
504 LDB_FLAG_MOD_REPLACE,
506 if (pret != LDB_SUCCESS) {
513 pret = dsdb_module_modify(data->partitions[i]->module,
515 DSDB_FLAG_NEXT_MODULE,
518 if (pret != LDB_SUCCESS) {
523 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
528 * @brief call back function for the ldb operations on special DN's.
530 * As the LDB operations are async, and we wish to use the result
531 * the operations, a callback needs to be registered to process the results
532 * of the LDB operations.
534 * @param req the ldb request
535 * @param res the result of the operation
537 * @return the LDB_STATUS
539 static int partition_copy_all_callback_handler(
540 struct ldb_request *req,
541 struct ldb_reply *ares)
543 struct partition_copy_context *ac = NULL;
545 ac = talloc_get_type(
547 struct partition_copy_context);
550 return ldb_module_done(
554 LDB_ERR_OPERATIONS_ERROR);
557 /* pass on to the callback */
558 switch (ares->type) {
559 case LDB_REPLY_ENTRY:
560 return ldb_module_send_entry(
565 case LDB_REPLY_REFERRAL:
566 return ldb_module_send_referral(
570 case LDB_REPLY_DONE: {
571 int error = ares->error;
572 if (error == LDB_SUCCESS) {
573 error = partition_copy_all_callback_action(
575 ac->partition_context,
579 return ldb_module_done(
588 return LDB_ERR_OPERATIONS_ERROR;
593 * send an operation to the top partition, then copy the resulting
594 * object to all other partitions.
596 static int partition_copy_all(
597 struct ldb_module *module,
598 struct partition_context *partition_context,
599 struct ldb_request *req,
602 struct ldb_request *new_req = NULL;
603 struct ldb_context *ldb = NULL;
604 struct partition_copy_context *context = NULL;
608 ldb = ldb_module_get_ctx(module);
610 context = talloc_zero(req, struct partition_copy_context);
611 if (context == NULL) {
614 context->module = module;
615 context->request = req;
617 context->partition_context = partition_context;
619 switch (req->operation) {
621 ret = ldb_build_add_req(
628 partition_copy_all_callback_handler,
632 ret = ldb_build_mod_req(
639 partition_copy_all_callback_handler,
643 ret = ldb_build_del_req(
650 partition_copy_all_callback_handler,
654 ret = ldb_build_rename_req(
658 req->op.rename.olddn,
659 req->op.rename.newdn,
662 partition_copy_all_callback_handler,
672 "Unexpected operation type (%d)\n", req->operation);
673 ret = LDB_ERR_OPERATIONS_ERROR;
676 if (ret != LDB_SUCCESS) {
679 return ldb_next_request(module, new_req);
682 * Figure out which backend a request needs to be aimed at. Some
683 * requests must be replicated to all backends
685 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
687 struct partition_context *ac;
690 struct dsdb_partition *partition;
691 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
692 struct partition_private_data);
694 /* if we aren't initialised yet go further */
695 if (!data || !data->partitions) {
696 return ldb_next_request(module, req);
699 if (ldb_dn_is_special(dn)) {
700 /* Is this a special DN, we need to replicate to every backend? */
701 for (i=0; data->replicate && data->replicate[i]; i++) {
702 if (ldb_dn_compare(data->replicate[i],
705 ac = partition_init_ctx(module, req);
707 return ldb_operr(ldb_module_get_ctx(module));
710 return partition_copy_all(module, ac, req, dn);
715 /* Otherwise, we need to find the partition to fire it to */
718 partition = find_partition(data, dn, req);
721 * if we haven't found a matching partition
722 * pass the request to the main ldb
724 * TODO: we should maybe return an error here
725 * if it's not a special dn
728 return ldb_next_request(module, req);
731 ac = partition_init_ctx(module, req);
733 return ldb_operr(ldb_module_get_ctx(module));
736 /* we need to add a control but we never touch the original request */
737 ret = partition_prep_request(ac, partition);
738 if (ret != LDB_SUCCESS) {
742 /* fire the first one */
743 return partition_call_first(ac);
747 static int partition_search(struct ldb_module *module, struct ldb_request *req)
750 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
751 struct partition_private_data);
752 struct partition_context *ac;
753 struct ldb_context *ldb;
754 struct loadparm_context *lp_ctx;
756 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
757 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
758 struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
760 struct ldb_search_options_control *search_options = NULL;
761 struct dsdb_partition *p;
764 bool domain_scope = false, phantom_root = false;
766 p = find_partition(data, NULL, req);
768 /* the caller specified what partition they want the
769 * search - just pass it on
771 return ldb_next_request(p->module, req);
774 /* Get back the search options from the search control, and mark it as
775 * non-critical (to make backends and also dcpromo happy).
777 if (search_control) {
778 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
779 search_control->critical = 0;
783 /* if we aren't initialised yet go further */
784 if (!data || !data->partitions) {
785 return ldb_next_request(module, req);
788 /* Special DNs without specified partition should go further */
789 if (ldb_dn_is_special(req->op.search.base)) {
790 return ldb_next_request(module, req);
793 /* Locate the options */
794 domain_scope = (search_options
795 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
796 || domain_scope_control;
797 phantom_root = search_options
798 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
800 /* Remove handled options from the search control flag */
801 if (search_options) {
802 search_options->search_options = search_options->search_options
803 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
804 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
807 ac = partition_init_ctx(module, req);
809 return ldb_operr(ldb_module_get_ctx(module));
812 ldb = ldb_module_get_ctx(ac->module);
813 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
814 struct loadparm_context);
816 /* Search from the base DN */
817 if (ldb_dn_is_null(req->op.search.base)) {
819 return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
821 return partition_send_all(module, ac, req);
824 for (i=0; data->partitions[i]; i++) {
825 bool match = false, stop = false;
827 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
828 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
829 req->op.search.base) == 0) {
830 /* base DN is in a partial replica
831 with the NO_GLOBAL_CATALOG
832 control. This partition is invisible */
833 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
839 /* Phantom root: Find all partitions under the
840 * search base. We match if:
842 * 1) the DN we are looking for exactly matches a
843 * certain partition and always stop
844 * 2) the DN we are looking for is a parent of certain
845 * partitions and it isn't a scope base search
846 * 3) the DN we are looking for is a child of a certain
847 * partition and always stop
848 * - we don't need to go any further up in the
851 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
852 req->op.search.base) == 0) {
857 (ldb_dn_compare_base(req->op.search.base,
858 data->partitions[i]->ctrl->dn) == 0 &&
859 req->op.search.scope != LDB_SCOPE_BASE)) {
863 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
864 req->op.search.base) == 0) {
866 stop = true; /* note that this relies on partition ordering */
869 /* Domain scope: Find all partitions under the search
872 * We generate referral candidates if we haven't
873 * specified the domain scope control, haven't a base
874 * search* scope and the DN we are looking for is a real
875 * predecessor of certain partitions. When a new
876 * referral candidate is nearer to the DN than an
877 * existing one delete the latter (we want to have only
878 * the closest ones). When we checked this for all
879 * candidates we have the final referrals.
881 * We match if the DN we are looking for is a child of
882 * a certain partition or the partition
883 * DN itself - we don't need to go any further
884 * up in the hierarchy!
886 if ((!domain_scope) &&
887 (req->op.search.scope != LDB_SCOPE_BASE) &&
888 (ldb_dn_compare_base(req->op.search.base,
889 data->partitions[i]->ctrl->dn) == 0) &&
890 (ldb_dn_compare(req->op.search.base,
891 data->partitions[i]->ctrl->dn) != 0)) {
892 const char *scheme = ldb_get_opaque(
893 ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
894 char *ref = talloc_asprintf(
897 scheme == NULL ? "ldap" : scheme,
898 lpcfg_dnsdomain(lp_ctx),
899 ldb_dn_get_linearized(
900 data->partitions[i]->ctrl->dn),
901 req->op.search.scope ==
902 LDB_SCOPE_ONELEVEL ? "??base" : "");
908 /* Initialise the referrals list */
909 if (ac->referrals == NULL) {
910 char **l = str_list_make_empty(ac);
911 ac->referrals = discard_const_p(const char *, l);
912 if (ac->referrals == NULL) {
917 /* Check if the new referral candidate is
918 * closer to the base DN than already
919 * saved ones and delete the latters */
921 while (ac->referrals[j] != NULL) {
922 if (strstr(ac->referrals[j],
923 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
924 str_list_remove(ac->referrals,
931 /* Add our new candidate */
932 ac->referrals = str_list_add(ac->referrals, ref);
936 if (ac->referrals == NULL) {
940 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
942 stop = true; /* note that this relies on partition ordering */
947 ret = partition_prep_request(ac, data->partitions[i]);
948 if (ret != LDB_SUCCESS) {
956 /* Perhaps we didn't match any partitions. Try the main partition */
957 if (ac->num_requests == 0) {
959 return ldb_next_request(module, req);
962 /* fire the first one */
963 return partition_call_first(ac);
967 static int partition_add(struct ldb_module *module, struct ldb_request *req)
969 return partition_replicate(module, req, req->op.add.message->dn);
973 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
975 return partition_replicate(module, req, req->op.mod.message->dn);
979 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
981 return partition_replicate(module, req, req->op.del.dn);
985 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
988 struct dsdb_partition *backend, *backend2;
990 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
991 struct partition_private_data);
993 /* Skip the lot if 'data' isn't here yet (initialisation) */
995 return ldb_operr(ldb_module_get_ctx(module));
998 backend = find_partition(data, req->op.rename.olddn, req);
999 backend2 = find_partition(data, req->op.rename.newdn, req);
1001 if ((backend && !backend2) || (!backend && backend2)) {
1002 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1005 if (backend != backend2) {
1006 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1007 "Cannot rename from %s in %s to %s in %s: %s",
1008 ldb_dn_get_linearized(req->op.rename.olddn),
1009 ldb_dn_get_linearized(backend->ctrl->dn),
1010 ldb_dn_get_linearized(req->op.rename.newdn),
1011 ldb_dn_get_linearized(backend2->ctrl->dn),
1012 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1013 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1016 return partition_replicate(module, req, req->op.rename.olddn);
1019 /* start a transaction */
1020 int partition_start_trans(struct ldb_module *module)
1024 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1025 struct partition_private_data);
1026 /* Look at base DN */
1027 /* Figure out which partition it is under */
1028 /* Skip the lot if 'data' isn't here yet (initialization) */
1029 if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1030 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1034 * We start a transaction on metadata.tdb first and end it last in
1035 * end_trans. This makes locking semantics follow TDB rather than MDB,
1036 * and effectively locks all partitions at once.
1038 * Samba AD is special in that the partitions module (this file)
1039 * combines multiple independently locked databases into one overall
1040 * transaction. Changes across multiple partition DBs in a single
1041 * transaction must ALL be either visible or invisible.
1042 * The way this is achieved is by taking out a write lock on
1043 * metadata.tdb at the start of prepare_commit, while unlocking it at
1044 * the end of end_trans. This is matched by read_lock, ensuring it
1045 * can't progress until that write lock is released.
1047 * metadata.tdb needs to be a TDB file because MDB uses independent
1048 * locks, which means a read lock and a write lock can be held at the
1049 * same time, whereas in TDB, the two locks block each other. The TDB
1050 * behaviour is required to implement the functionality described
1053 * An important additional detail here is that if prepare_commit is
1054 * called on a TDB without any changes being made, no write lock is
1055 * taken. We address this by storing a sequence number in metadata.tdb
1056 * which is updated every time a replicated attribute is modified.
1057 * The possibility of a few unreplicated attributes being out of date
1058 * turns out not to be a problem.
1059 * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
1060 * the same end as locking metadata.tdb, unless we made a modification
1061 * to the @ records found there before every prepare_commit.
1063 ret = partition_metadata_start_trans(module);
1064 if (ret != LDB_SUCCESS) {
1068 ret = ldb_next_start_trans(module);
1069 if (ret != LDB_SUCCESS) {
1070 partition_metadata_del_trans(module);
1074 ret = partition_reload_if_required(module, data, NULL);
1075 if (ret != LDB_SUCCESS) {
1076 ldb_next_del_trans(module);
1077 partition_metadata_del_trans(module);
1082 * The following per partition locks are required mostly because TDB
1083 * and MDB require locks before read and write ops are permitted.
1085 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1086 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1087 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1088 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1090 ret = ldb_next_start_trans(data->partitions[i]->module);
1091 if (ret != LDB_SUCCESS) {
1092 /* Back it out, if it fails on one */
1093 for (i--; i >= 0; i--) {
1094 ldb_next_del_trans(data->partitions[i]->module);
1096 ldb_next_del_trans(module);
1097 partition_metadata_del_trans(module);
1102 data->in_transaction++;
1107 /* prepare for a commit */
1108 int partition_prepare_commit(struct ldb_module *module)
1111 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1112 struct partition_private_data);
1116 * Order of prepare_commit calls must match that in
1117 * partition_start_trans. See comment in that function for detail.
1119 ret = partition_metadata_prepare_commit(module);
1120 if (ret != LDB_SUCCESS) {
1124 ret = ldb_next_prepare_commit(module);
1125 if (ret != LDB_SUCCESS) {
1129 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1130 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1131 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1132 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1134 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1135 if (ret != LDB_SUCCESS) {
1136 ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1137 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1138 ldb_errstring(ldb_module_get_ctx(module)));
1143 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1144 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1151 /* end a transaction */
1152 int partition_end_trans(struct ldb_module *module)
1156 struct ldb_context *ldb = ldb_module_get_ctx(module);
1157 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1158 struct partition_private_data);
1159 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1163 if (data->in_transaction == 0) {
1164 DEBUG(0,("partition end transaction mismatch\n"));
1165 ret = LDB_ERR_OPERATIONS_ERROR;
1167 data->in_transaction--;
1171 * Order of end_trans calls must be the reverse of that in
1172 * partition_start_trans. See comment in that function for detail.
1174 if (data && data->partitions) {
1175 /* Just counting the partitions */
1176 for (i=0; data->partitions[i]; i++) {}
1178 /* now walk them backwards */
1179 for (i--; i>=0; i--) {
1180 struct dsdb_partition *p = data->partitions[i];
1184 "partition_end_trans() -> %s",
1185 ldb_dn_get_linearized(p->ctrl->dn));
1187 ret2 = ldb_next_end_trans(p->module);
1188 if (ret2 != LDB_SUCCESS) {
1189 ldb_asprintf_errstring(ldb,
1190 "end_trans error on %s: %s",
1191 ldb_dn_get_linearized(p->ctrl->dn),
1192 ldb_errstring(ldb));
1199 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1201 ret2 = ldb_next_end_trans(module);
1202 if (ret2 != LDB_SUCCESS) {
1206 ret2 = partition_metadata_end_trans(module);
1207 if (ret2 != LDB_SUCCESS) {
1214 /* delete a transaction */
1215 int partition_del_trans(struct ldb_module *module)
1217 int ret, final_ret = LDB_SUCCESS;
1219 struct ldb_context *ldb = ldb_module_get_ctx(module);
1220 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1221 struct partition_private_data);
1222 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1225 DEBUG(0,("partion delete transaction with no private data\n"));
1226 return ldb_operr(ldb);
1230 * Order of del_trans calls must be the reverse of that in
1231 * partition_start_trans. See comment in that function for detail.
1233 if (data->partitions) {
1234 /* Just counting the partitions */
1235 for (i=0; data->partitions[i]; i++) {}
1237 /* now walk them backwards */
1238 for (i--; i>=0; i--) {
1239 struct dsdb_partition *p = data->partitions[i];
1243 "partition_del_trans() -> %s",
1244 ldb_dn_get_linearized(p->ctrl->dn));
1246 ret = ldb_next_del_trans(p->module);
1247 if (ret != LDB_SUCCESS) {
1248 ldb_asprintf_errstring(ldb,
1249 "del_trans error on %s: %s",
1250 ldb_dn_get_linearized(p->ctrl->dn),
1251 ldb_errstring(ldb));
1258 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1260 ret = ldb_next_del_trans(module);
1261 if (ret != LDB_SUCCESS) {
1265 ret = partition_metadata_del_trans(module);
1266 if (ret != LDB_SUCCESS) {
1270 if (data->in_transaction == 0) {
1271 DEBUG(0,("partition del transaction mismatch\n"));
1272 return ldb_operr(ldb_module_get_ctx(module));
1274 data->in_transaction--;
1279 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1280 uint64_t *seq_number,
1281 struct ldb_request *parent)
1284 struct ldb_result *res;
1285 struct ldb_seqnum_request *tseq;
1286 struct ldb_seqnum_result *seqr;
1288 tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1290 return ldb_oom(ldb_module_get_ctx(module));
1292 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1294 ret = dsdb_module_extended(module, tseq, &res,
1295 LDB_EXTENDED_SEQUENCE_NUMBER,
1297 DSDB_FLAG_NEXT_MODULE,
1299 if (ret != LDB_SUCCESS) {
1304 seqr = talloc_get_type_abort(res->extended->data,
1305 struct ldb_seqnum_result);
1306 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1308 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1309 "Primary backend in partition module returned a timestamp based seq");
1312 *seq_number = seqr->seq_num;
1319 * Older version of sequence number as sum of sequence numbers for each partition
1321 int partition_sequence_number_from_partitions(struct ldb_module *module,
1326 uint64_t seq_number = 0;
1327 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1328 struct partition_private_data);
1330 ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1331 if (ret != LDB_SUCCESS) {
1335 /* Skip the lot if 'data' isn't here yet (initialisation) */
1336 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1337 struct ldb_seqnum_request *tseq;
1338 struct ldb_seqnum_result *tseqr;
1339 struct ldb_request *treq;
1340 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1342 return ldb_oom(ldb_module_get_ctx(module));
1344 tseq = talloc_zero(res, struct ldb_seqnum_request);
1347 return ldb_oom(ldb_module_get_ctx(module));
1349 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1351 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1352 LDB_EXTENDED_SEQUENCE_NUMBER,
1356 ldb_extended_default_callback,
1358 LDB_REQ_SET_LOCATION(treq);
1359 if (ret != LDB_SUCCESS) {
1364 ret = partition_request(data->partitions[i]->module, treq);
1365 if (ret != LDB_SUCCESS) {
1369 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1370 if (ret != LDB_SUCCESS) {
1374 tseqr = talloc_get_type(res->extended->data,
1375 struct ldb_seqnum_result);
1376 seq_number += tseqr->seq_num;
1386 * Newer version of sequence number using metadata tdb
1388 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1390 struct ldb_extended *ext;
1391 struct ldb_seqnum_request *seq;
1392 struct ldb_seqnum_result *seqr;
1393 uint64_t seq_number;
1396 seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1397 switch (seq->type) {
1399 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1400 if (ret != LDB_SUCCESS) {
1405 case LDB_SEQ_HIGHEST_SEQ:
1406 ret = partition_metadata_sequence_number(module, &seq_number);
1407 if (ret != LDB_SUCCESS) {
1412 case LDB_SEQ_HIGHEST_TIMESTAMP:
1413 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1414 "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1417 ext = talloc_zero(req, struct ldb_extended);
1419 return ldb_module_oom(module);
1421 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1424 return ldb_module_oom(module);
1426 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1429 seqr->seq_num = seq_number;
1430 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1432 /* send request done */
1433 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1436 /* lock all the backends */
1437 int partition_read_lock(struct ldb_module *module)
1442 struct ldb_context *ldb = ldb_module_get_ctx(module);
1443 struct partition_private_data *data = \
1444 talloc_get_type(ldb_module_get_private(module),
1445 struct partition_private_data);
1447 if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1448 ldb_debug(ldb, LDB_DEBUG_TRACE,
1449 "partition_read_lock() -> (metadata partition)");
1453 * It is important to only do this for LOCK because:
1454 * - we don't want to unlock what we did not lock
1456 * - we don't want to make a new lock on the sam.ldb
1457 * (triggered inside this routine due to the seq num check)
1458 * during an unlock phase as that will violate the lock
1463 TALLOC_CTX *mem_ctx = talloc_new(module);
1465 data = talloc_zero(mem_ctx, struct partition_private_data);
1467 talloc_free(mem_ctx);
1468 return ldb_operr(ldb);
1472 * When used from Samba4, this message is set by the
1473 * samba4 module, as a fixed value not read from the
1474 * DB. This avoids listing modules in the DB
1476 data->forced_module_msg = talloc_get_type(
1478 DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1479 struct ldb_message);
1481 ldb_module_set_private(module, talloc_steal(module,
1483 talloc_free(mem_ctx);
1487 * This will lock sam.ldb and will also call event loops,
1488 * so we do it before we get the whole db lock.
1490 ret = partition_reload_if_required(module, data, NULL);
1491 if (ret != LDB_SUCCESS) {
1496 * Order of read_lock calls must match that in partition_start_trans.
1497 * See comment in that function for detail.
1499 ret = partition_metadata_read_lock(module);
1500 if (ret != LDB_SUCCESS) {
1505 * The top level DB (sam.ldb) lock is not enough to block another
1506 * process in prepare_commit(), because if nothing was changed in the
1507 * specific backend, then prepare_commit() is a no-op. Therefore the
1508 * metadata.tdb lock is taken out above, as it is the best we can do
1511 ret = ldb_next_read_lock(module);
1512 if (ret != LDB_SUCCESS) {
1515 "Failed to lock db: %s / %s for metadata partition",
1523 * The following per partition locks are required mostly because TDB
1524 * and MDB require locks before reads are permitted.
1526 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1527 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1528 ldb_debug(ldb, LDB_DEBUG_TRACE,
1529 "partition_read_lock() -> %s",
1530 ldb_dn_get_linearized(
1531 data->partitions[i]->ctrl->dn));
1533 ret = ldb_next_read_lock(data->partitions[i]->module);
1534 if (ret == LDB_SUCCESS) {
1540 "Failed to lock db: %s / %s for %s",
1543 ldb_dn_get_linearized(
1544 data->partitions[i]->ctrl->dn));
1552 /* Back it out, if it fails on one */
1553 for (i--; i >= 0; i--) {
1554 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1555 if (ret2 != LDB_SUCCESS) {
1558 "Failed to unlock db: %s / %s",
1560 ldb_strerror(ret2));
1563 ret2 = ldb_next_read_unlock(module);
1564 if (ret2 != LDB_SUCCESS) {
1567 "Failed to unlock db: %s / %s",
1569 ldb_strerror(ret2));
1574 /* unlock all the backends */
1575 int partition_read_unlock(struct ldb_module *module)
1578 int ret = LDB_SUCCESS;
1580 struct ldb_context *ldb = ldb_module_get_ctx(module);
1581 struct partition_private_data *data = \
1582 talloc_get_type(ldb_module_get_private(module),
1583 struct partition_private_data);
1584 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1587 * Order of read_unlock calls must be the reverse of that in
1588 * partition_start_trans. See comment in that function for detail.
1590 if (data && data->partitions) {
1591 /* Just counting the partitions */
1592 for (i=0; data->partitions[i]; i++) {}
1594 /* now walk them backwards */
1595 for (i--; i>=0; i--) {
1596 struct dsdb_partition *p = data->partitions[i];
1598 ldb_debug(ldb, LDB_DEBUG_TRACE,
1599 "partition_read_unlock() -> %s",
1600 ldb_dn_get_linearized(p->ctrl->dn));
1602 ret2 = ldb_next_read_unlock(p->module);
1603 if (ret2 != LDB_SUCCESS) {
1606 "Failed to lock db: %s / %s for %s",
1609 ldb_dn_get_linearized(p->ctrl->dn));
1612 * Don't overwrite the original failure code
1615 if (ret == LDB_SUCCESS) {
1623 ldb_debug(ldb, LDB_DEBUG_TRACE,
1624 "partition_read_unlock() -> (metadata partition)");
1627 ret2 = ldb_next_read_unlock(module);
1628 if (ret2 != LDB_SUCCESS) {
1631 "Failed to unlock db: %s / %s for metadata partition",
1633 ldb_strerror(ret2));
1636 * Don't overwrite the original failure code
1639 if (ret == LDB_SUCCESS) {
1644 ret = partition_metadata_read_unlock(module);
1647 * Don't overwrite the original failure code
1650 if (ret == LDB_SUCCESS) {
1658 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1660 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1661 struct partition_private_data);
1662 struct partition_context *ac;
1665 /* if we aren't initialised yet go further */
1667 return ldb_next_request(module, req);
1670 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1671 /* Update the metadata.tdb to increment the schema version if needed*/
1672 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1673 ret = partition_metadata_inc_schema_sequence(module);
1674 return ldb_module_done(req, NULL, NULL, ret);
1677 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1678 return partition_sequence_number(module, req);
1681 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1682 return partition_create(module, req);
1686 * as the extended operation has no dn
1687 * we need to send it to all partitions
1690 ac = partition_init_ctx(module, req);
1692 return ldb_operr(ldb_module_get_ctx(module));
1695 return partition_send_all(module, ac, req);
1698 static const struct ldb_module_ops ldb_partition_module_ops = {
1699 .name = "partition",
1700 .init_context = partition_init,
1701 .search = partition_search,
1702 .add = partition_add,
1703 .modify = partition_modify,
1704 .del = partition_delete,
1705 .rename = partition_rename,
1706 .extended = partition_extended,
1707 .start_transaction = partition_start_trans,
1708 .prepare_commit = partition_prepare_commit,
1709 .end_transaction = partition_end_trans,
1710 .del_transaction = partition_del_trans,
1711 .read_lock = partition_read_lock,
1712 .read_unlock = partition_read_unlock
1715 int ldb_partition_module_init(const char *version)
1717 LDB_MODULE_CHECK_VERSION(version);
1718 return ldb_register_module(&ldb_partition_module_ops);