4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
83 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
92 struct ldb_request *req)
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
137 struct ldb_control *partition_ctrl;
139 ac = talloc_get_type(req->context, struct partition_context);
142 return ldb_module_done(ac->req, NULL, NULL,
143 LDB_ERR_OPERATIONS_ERROR);
146 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148 /* If we didn't fan this request out to mulitple partitions,
149 * or this is an individual search result, we can
150 * deterministically tell the caller what partition this was
151 * written to (repl_meta_data likes to know) */
152 ret = ldb_reply_add_control(ares,
153 DSDB_CONTROL_CURRENT_PARTITION_OID,
154 false, partition_ctrl->data);
155 if (ret != LDB_SUCCESS) {
156 return ldb_module_done(ac->req, NULL, NULL,
161 if (ares->error != LDB_SUCCESS) {
162 return ldb_module_done(ac->req, ares->controls,
163 ares->response, ares->error);
166 switch (ares->type) {
167 case LDB_REPLY_REFERRAL:
168 return ldb_module_send_referral(ac->req, ares->referral);
170 case LDB_REPLY_ENTRY:
171 if (ac->req->operation != LDB_SEARCH) {
172 ldb_set_errstring(ldb_module_get_ctx(ac->module),
173 "partition_req_callback:"
174 " Unsupported reply type for this request");
175 return ldb_module_done(ac->req, NULL, NULL,
176 LDB_ERR_OPERATIONS_ERROR);
179 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182 if (ac->req->operation == LDB_EXTENDED) {
183 /* FIXME: check for ares->response, replmd does not fill it ! */
184 if (ares->response) {
185 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186 ldb_set_errstring(ldb_module_get_ctx(ac->module),
187 "partition_req_callback:"
188 " Unknown extended reply, "
189 "only supports START_TLS");
191 return ldb_module_done(ac->req, NULL, NULL,
192 LDB_ERR_OPERATIONS_ERROR);
197 ac->finished_requests++;
198 if (ac->finished_requests == ac->num_requests) {
199 /* Send back referrals if they do exist (search ops) */
200 if (ac->referrals != NULL) {
202 for (ref = ac->referrals; *ref != NULL; ++ref) {
203 ret = ldb_module_send_referral(ac->req,
204 talloc_strdup(ac->req, *ref));
205 if (ret != LDB_SUCCESS) {
206 return ldb_module_done(ac->req, NULL, NULL,
212 /* this was the last one, call callback */
213 return ldb_module_done(ac->req, ares->controls,
218 /* not the last, now call the next one */
219 module = ac->part_req[ac->finished_requests].module;
220 nreq = ac->part_req[ac->finished_requests].req;
222 ret = partition_request(module, nreq);
223 if (ret != LDB_SUCCESS) {
225 return ldb_module_done(ac->req, NULL, NULL, ret);
235 static int partition_prep_request(struct partition_context *ac,
236 struct dsdb_partition *partition)
239 struct ldb_request *req;
240 struct ldb_control *partition_ctrl = NULL;
242 ac->part_req = talloc_realloc(ac, ac->part_req,
244 ac->num_requests + 1);
245 if (ac->part_req == NULL) {
246 return ldb_oom(ldb_module_get_ctx(ac->module));
249 switch (ac->req->operation) {
251 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
253 ac->req->op.search.base,
254 ac->req->op.search.scope,
255 ac->req->op.search.tree,
256 ac->req->op.search.attrs,
258 ac, partition_req_callback,
260 LDB_REQ_SET_LOCATION(req);
263 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264 ac->req->op.add.message,
266 ac, partition_req_callback,
268 LDB_REQ_SET_LOCATION(req);
271 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272 ac->req->op.mod.message,
274 ac, partition_req_callback,
276 LDB_REQ_SET_LOCATION(req);
279 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
282 ac, partition_req_callback,
284 LDB_REQ_SET_LOCATION(req);
287 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288 ac->req->op.rename.olddn,
289 ac->req->op.rename.newdn,
291 ac, partition_req_callback,
293 LDB_REQ_SET_LOCATION(req);
296 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298 ac->req->op.extended.oid,
299 ac->req->op.extended.data,
301 ac, partition_req_callback,
303 LDB_REQ_SET_LOCATION(req);
306 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307 "Unsupported request type!");
308 ret = LDB_ERR_UNWILLING_TO_PERFORM;
311 if (ret != LDB_SUCCESS) {
315 ac->part_req[ac->num_requests].req = req;
317 if (ac->req->controls) {
318 /* Duplicate everything beside the current partition control */
319 partition_ctrl = ldb_request_get_control(ac->req,
320 DSDB_CONTROL_CURRENT_PARTITION_OID);
321 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322 return ldb_module_oom(ac->module);
327 void *part_data = partition->ctrl;
329 ac->part_req[ac->num_requests].module = partition->module;
331 if (partition_ctrl != NULL) {
332 if (partition_ctrl->data != NULL) {
333 part_data = partition_ctrl->data;
337 * If the provided current partition control is without
338 * data then use the calculated one.
340 ret = ldb_request_add_control(req,
341 DSDB_CONTROL_CURRENT_PARTITION_OID,
343 if (ret != LDB_SUCCESS) {
348 if (req->operation == LDB_SEARCH) {
349 /* If the search is for 'more' than this partition,
350 * then change the basedn, so a remote LDAP server
352 if (ldb_dn_compare_base(partition->ctrl->dn,
353 req->op.search.base) != 0) {
354 req->op.search.base = partition->ctrl->dn;
359 /* make sure you put the module here, or
360 * or ldb_next_request() will skip a module */
361 ac->part_req[ac->num_requests].module = ac->module;
369 static int partition_call_first(struct partition_context *ac)
371 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
375 * Send a request down to all the partitions (but not the sam.ldb file)
377 static int partition_send_all(struct ldb_module *module,
378 struct partition_context *ac,
379 struct ldb_request *req)
382 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383 struct partition_private_data);
386 for (i=0; data && data->partitions && data->partitions[i]; i++) {
387 ret = partition_prep_request(ac, data->partitions[i]);
388 if (ret != LDB_SUCCESS) {
393 /* fire the first one */
394 return partition_call_first(ac);
397 struct partition_copy_context {
398 struct ldb_module *module;
399 struct partition_context *partition_context;
400 struct ldb_request *request;
405 * A special DN has been updated in the primary partition. Now propagate those
406 * changes to the remaining partitions.
408 * Note: that the operations are asynchronous and this function is called
409 * from partition_copy_all_callback_handler in response to an async
412 static int partition_copy_all_callback_action(
413 struct ldb_module *module,
414 struct partition_context *ac,
415 struct ldb_request *req,
421 struct partition_private_data *data =
423 ldb_module_get_private(module),
424 struct partition_private_data);
426 struct ldb_result *res;
427 /* now fetch the resulting object, and then copy it to all the
428 * other partitions. We need this approach to cope with the
429 * partitions getting out of sync. If for example the
430 * @ATTRIBUTES object exists on one partition but not the
431 * others then just doing each of the partitions in turn will
434 search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
435 if (search_ret != LDB_SUCCESS) {
439 /* now delete the object in the other partitions, if requried
441 if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
442 for (i=0; data->partitions && data->partitions[i]; i++) {
444 pret = dsdb_module_del(data->partitions[i]->module,
446 DSDB_FLAG_NEXT_MODULE,
448 if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
449 /* we should only get success or no
450 such object from the other partitions */
455 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
458 /* now add/modify in the other partitions */
459 for (i=0; data->partitions && data->partitions[i]; i++) {
460 struct ldb_message *modify_msg = NULL;
464 pret = dsdb_module_add(data->partitions[i]->module,
466 DSDB_FLAG_NEXT_MODULE,
468 if (pret == LDB_SUCCESS) {
472 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
476 modify_msg = ldb_msg_copy(req, res->msgs[0]);
477 if (modify_msg == NULL) {
478 return ldb_module_oom(module);
482 * mark all the message elements as
483 * LDB_FLAG_MOD_REPLACE
486 el_idx < modify_msg->num_elements;
488 modify_msg->elements[el_idx].flags
489 = LDB_FLAG_MOD_REPLACE;
492 if (req->operation == LDB_MODIFY) {
493 const struct ldb_message *req_msg = req->op.mod.message;
495 * mark elements to be removed, if there were
496 * deleted entirely above we need to delete
499 for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
500 if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
501 || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
502 req_msg->elements[el_idx].num_values == 0)) {
503 if (ldb_msg_find_element(modify_msg,
504 req_msg->elements[el_idx].name) != NULL) {
507 ldb_msg_add_empty(modify_msg,
508 req_msg->elements[el_idx].name,
509 LDB_FLAG_MOD_REPLACE,
515 pret = dsdb_module_modify(data->partitions[i]->module,
517 DSDB_FLAG_NEXT_MODULE,
520 if (pret != LDB_SUCCESS) {
525 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
530 * @brief call back function for the ldb operations on special DN's.
532 * As the LDB operations are async, and we wish to use the result
533 * the operations, a callback needs to be registered to process the results
534 * of the LDB operations.
536 * @param req the ldb request
537 * @param res the result of the operation
539 * @return the LDB_STATUS
541 static int partition_copy_all_callback_handler(
542 struct ldb_request *req,
543 struct ldb_reply *ares)
545 struct partition_copy_context *ac = NULL;
546 int error = ares->error;
548 ac = talloc_get_type(
550 struct partition_copy_context);
553 return ldb_module_done(
557 LDB_ERR_OPERATIONS_ERROR);
560 /* pass on to the callback */
561 switch (ares->type) {
562 case LDB_REPLY_ENTRY:
563 return ldb_module_send_entry(
568 case LDB_REPLY_REFERRAL:
569 return ldb_module_send_referral(
575 if (error == LDB_SUCCESS) {
576 error = partition_copy_all_callback_action(
578 ac->partition_context,
582 return ldb_module_done(
590 return LDB_ERR_OPERATIONS_ERROR;
595 * send an operation to the top partition, then copy the resulting
596 * object to all other partitions.
598 static int partition_copy_all(
599 struct ldb_module *module,
600 struct partition_context *partition_context,
601 struct ldb_request *req,
604 struct ldb_request *new_req = NULL;
605 struct ldb_context *ldb = NULL;
606 struct partition_copy_context *context = NULL;
610 ldb = ldb_module_get_ctx(module);
612 context = talloc_zero(req, struct partition_copy_context);
613 if (context == NULL) {
616 context->module = module;
617 context->request = req;
619 context->partition_context = partition_context;
621 switch (req->operation) {
623 ret = ldb_build_add_req(
630 partition_copy_all_callback_handler,
634 ret = ldb_build_mod_req(
641 partition_copy_all_callback_handler,
645 ret = ldb_build_del_req(
652 partition_copy_all_callback_handler,
656 ret = ldb_build_rename_req(
660 req->op.rename.olddn,
661 req->op.rename.newdn,
664 partition_copy_all_callback_handler,
674 "Unexpected operation type (%d)\n", req->operation);
675 ret = LDB_ERR_OPERATIONS_ERROR;
678 if (ret != LDB_SUCCESS) {
681 return ldb_next_request(module, new_req);
684 * Figure out which backend a request needs to be aimed at. Some
685 * requests must be replicated to all backends
687 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
689 struct partition_context *ac;
692 struct dsdb_partition *partition;
693 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
694 struct partition_private_data);
696 /* if we aren't initialised yet go further */
697 if (!data || !data->partitions) {
698 return ldb_next_request(module, req);
701 if (ldb_dn_is_special(dn)) {
702 /* Is this a special DN, we need to replicate to every backend? */
703 for (i=0; data->replicate && data->replicate[i]; i++) {
704 if (ldb_dn_compare(data->replicate[i],
707 ac = partition_init_ctx(module, req);
709 return ldb_operr(ldb_module_get_ctx(module));
712 return partition_copy_all(module, ac, req, dn);
717 /* Otherwise, we need to find the partition to fire it to */
720 partition = find_partition(data, dn, req);
723 * if we haven't found a matching partition
724 * pass the request to the main ldb
726 * TODO: we should maybe return an error here
727 * if it's not a special dn
730 return ldb_next_request(module, req);
733 ac = partition_init_ctx(module, req);
735 return ldb_operr(ldb_module_get_ctx(module));
738 /* we need to add a control but we never touch the original request */
739 ret = partition_prep_request(ac, partition);
740 if (ret != LDB_SUCCESS) {
744 /* fire the first one */
745 return partition_call_first(ac);
749 static int partition_search(struct ldb_module *module, struct ldb_request *req)
751 struct ldb_control **saved_controls;
753 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
754 struct partition_private_data);
755 struct partition_context *ac;
756 struct ldb_context *ldb;
757 struct loadparm_context *lp_ctx;
759 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
760 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
761 struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
763 struct ldb_search_options_control *search_options = NULL;
764 struct dsdb_partition *p;
767 bool domain_scope = false, phantom_root = false;
769 p = find_partition(data, NULL, req);
771 /* the caller specified what partition they want the
772 * search - just pass it on
774 return ldb_next_request(p->module, req);
777 /* Get back the search options from the search control, and mark it as
778 * non-critical (to make backends and also dcpromo happy).
780 if (search_control) {
781 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
782 search_control->critical = 0;
786 /* Remove the "domain_scope" control, so we don't confuse a backend
788 if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
789 return ldb_oom(ldb_module_get_ctx(module));
792 /* if we aren't initialised yet go further */
793 if (!data || !data->partitions) {
794 return ldb_next_request(module, req);
797 /* Special DNs without specified partition should go further */
798 if (ldb_dn_is_special(req->op.search.base)) {
799 return ldb_next_request(module, req);
802 /* Locate the options */
803 domain_scope = (search_options
804 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
805 || domain_scope_control;
806 phantom_root = search_options
807 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
809 /* Remove handled options from the search control flag */
810 if (search_options) {
811 search_options->search_options = search_options->search_options
812 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
813 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
816 ac = partition_init_ctx(module, req);
818 return ldb_operr(ldb_module_get_ctx(module));
821 ldb = ldb_module_get_ctx(ac->module);
822 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
823 struct loadparm_context);
825 /* Search from the base DN */
826 if (ldb_dn_is_null(req->op.search.base)) {
828 return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
830 return partition_send_all(module, ac, req);
833 for (i=0; data->partitions[i]; i++) {
834 bool match = false, stop = false;
836 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
837 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
838 req->op.search.base) == 0) {
839 /* base DN is in a partial replica
840 with the NO_GLOBAL_CATALOG
841 control. This partition is invisible */
842 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
848 /* Phantom root: Find all partitions under the
849 * search base. We match if:
851 * 1) the DN we are looking for exactly matches a
852 * certain partition and always stop
853 * 2) the DN we are looking for is a parent of certain
854 * partitions and it isn't a scope base search
855 * 3) the DN we are looking for is a child of a certain
856 * partition and always stop
857 * - we don't need to go any further up in the
860 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
861 req->op.search.base) == 0) {
866 (ldb_dn_compare_base(req->op.search.base,
867 data->partitions[i]->ctrl->dn) == 0 &&
868 req->op.search.scope != LDB_SCOPE_BASE)) {
872 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
873 req->op.search.base) == 0) {
875 stop = true; /* note that this relies on partition ordering */
878 /* Domain scope: Find all partitions under the search
881 * We generate referral candidates if we haven't
882 * specified the domain scope control, haven't a base
883 * search* scope and the DN we are looking for is a real
884 * predecessor of certain partitions. When a new
885 * referral candidate is nearer to the DN than an
886 * existing one delete the latter (we want to have only
887 * the closest ones). When we checked this for all
888 * candidates we have the final referrals.
890 * We match if the DN we are looking for is a child of
891 * a certain partition or the partition
892 * DN itself - we don't need to go any further
893 * up in the hierarchy!
895 if ((!domain_scope) &&
896 (req->op.search.scope != LDB_SCOPE_BASE) &&
897 (ldb_dn_compare_base(req->op.search.base,
898 data->partitions[i]->ctrl->dn) == 0) &&
899 (ldb_dn_compare(req->op.search.base,
900 data->partitions[i]->ctrl->dn) != 0)) {
901 char *ref = talloc_asprintf(ac,
903 lpcfg_dnsdomain(lp_ctx),
904 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
905 req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
911 /* Initialise the referrals list */
912 if (ac->referrals == NULL) {
913 char **l = str_list_make_empty(ac);
914 ac->referrals = discard_const_p(const char *, l);
915 if (ac->referrals == NULL) {
920 /* Check if the new referral candidate is
921 * closer to the base DN than already
922 * saved ones and delete the latters */
924 while (ac->referrals[j] != NULL) {
925 if (strstr(ac->referrals[j],
926 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
927 str_list_remove(ac->referrals,
934 /* Add our new candidate */
935 ac->referrals = str_list_add(ac->referrals, ref);
939 if (ac->referrals == NULL) {
943 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
945 stop = true; /* note that this relies on partition ordering */
950 ret = partition_prep_request(ac, data->partitions[i]);
951 if (ret != LDB_SUCCESS) {
959 /* Perhaps we didn't match any partitions. Try the main partition */
960 if (ac->num_requests == 0) {
962 return ldb_next_request(module, req);
965 /* fire the first one */
966 return partition_call_first(ac);
970 static int partition_add(struct ldb_module *module, struct ldb_request *req)
972 return partition_replicate(module, req, req->op.add.message->dn);
976 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
978 return partition_replicate(module, req, req->op.mod.message->dn);
982 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
984 return partition_replicate(module, req, req->op.del.dn);
988 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
991 struct dsdb_partition *backend, *backend2;
993 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
994 struct partition_private_data);
996 /* Skip the lot if 'data' isn't here yet (initialisation) */
998 return ldb_operr(ldb_module_get_ctx(module));
1001 backend = find_partition(data, req->op.rename.olddn, req);
1002 backend2 = find_partition(data, req->op.rename.newdn, req);
1004 if ((backend && !backend2) || (!backend && backend2)) {
1005 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1008 if (backend != backend2) {
1009 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1010 "Cannot rename from %s in %s to %s in %s: %s",
1011 ldb_dn_get_linearized(req->op.rename.olddn),
1012 ldb_dn_get_linearized(backend->ctrl->dn),
1013 ldb_dn_get_linearized(req->op.rename.newdn),
1014 ldb_dn_get_linearized(backend2->ctrl->dn),
1015 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1016 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1019 return partition_replicate(module, req, req->op.rename.olddn);
1022 /* start a transaction */
1023 int partition_start_trans(struct ldb_module *module)
1027 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1028 struct partition_private_data);
1029 /* Look at base DN */
1030 /* Figure out which partition it is under */
1031 /* Skip the lot if 'data' isn't here yet (initialization) */
1032 if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1033 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1036 /* This order must match that in prepare_commit() and read_lock() */
1037 ret = ldb_next_start_trans(module);
1038 if (ret != LDB_SUCCESS) {
1042 ret = partition_reload_if_required(module, data, NULL);
1043 if (ret != LDB_SUCCESS) {
1044 ldb_next_del_trans(module);
1048 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1049 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1050 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1051 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1053 ret = ldb_next_start_trans(data->partitions[i]->module);
1054 if (ret != LDB_SUCCESS) {
1055 /* Back it out, if it fails on one */
1056 for (i--; i >= 0; i--) {
1057 ldb_next_del_trans(data->partitions[i]->module);
1059 ldb_next_del_trans(module);
1060 partition_metadata_del_trans(module);
1066 * Because in prepare_commit this must come last, to ensure
1067 * lock ordering we have to do this last here also
1069 ret = partition_metadata_start_trans(module);
1070 if (ret != LDB_SUCCESS) {
1071 /* Back it out, if it fails on one */
1072 for (i--; i >= 0; i--) {
1073 ldb_next_del_trans(data->partitions[i]->module);
1075 ldb_next_del_trans(module);
1079 data->in_transaction++;
1084 /* prepare for a commit */
1085 int partition_prepare_commit(struct ldb_module *module)
1088 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1089 struct partition_private_data);
1092 ret = ldb_next_prepare_commit(module);
1093 if (ret != LDB_SUCCESS) {
1097 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1098 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1099 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1100 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1102 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1103 if (ret != LDB_SUCCESS) {
1104 ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1105 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1106 ldb_errstring(ldb_module_get_ctx(module)));
1111 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1112 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1115 /* metadata prepare commit must come last, as other partitions could modify
1116 * the database inside the prepare commit method of a module */
1117 return partition_metadata_prepare_commit(module);
1121 /* end a transaction */
1122 int partition_end_trans(struct ldb_module *module)
1126 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1127 struct partition_private_data);
1131 if (data->in_transaction == 0) {
1132 DEBUG(0,("partition end transaction mismatch\n"));
1133 ret = LDB_ERR_OPERATIONS_ERROR;
1135 data->in_transaction--;
1139 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1140 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1141 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
1142 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1144 ret2 = ldb_next_end_trans(data->partitions[i]->module);
1145 if (ret2 != LDB_SUCCESS) {
1146 ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
1147 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1148 ldb_errstring(ldb_module_get_ctx(module)));
1153 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1154 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1156 ret2 = ldb_next_end_trans(module);
1157 if (ret2 != LDB_SUCCESS) {
1161 ret2 = partition_metadata_end_trans(module);
1162 if (ret2 != LDB_SUCCESS) {
1169 /* delete a transaction */
1170 int partition_del_trans(struct ldb_module *module)
1172 int ret, final_ret = LDB_SUCCESS;
1174 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1175 struct partition_private_data);
1177 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1178 if (ldb_module_flags(ldb_module_get_ctx(module)) &
1179 LDB_FLG_ENABLE_TRACING) {
1180 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
1181 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1183 ret = ldb_next_del_trans(data->partitions[i]->module);
1184 if (ret != LDB_SUCCESS) {
1185 ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
1186 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1187 ldb_errstring(ldb_module_get_ctx(module)));
1192 if (data->in_transaction == 0) {
1193 DEBUG(0,("partition del transaction mismatch\n"));
1194 return ldb_operr(ldb_module_get_ctx(module));
1196 data->in_transaction--;
1198 if (ldb_module_flags(ldb_module_get_ctx(module)) &
1199 LDB_FLG_ENABLE_TRACING) {
1200 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1202 ret = ldb_next_del_trans(module);
1203 if (ret != LDB_SUCCESS) {
1207 ret = partition_metadata_del_trans(module);
1208 if (ret != LDB_SUCCESS) {
1215 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1216 uint64_t *seq_number,
1217 struct ldb_request *parent)
1220 struct ldb_result *res;
1221 struct ldb_seqnum_request *tseq;
1222 struct ldb_seqnum_result *seqr;
1224 tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1226 return ldb_oom(ldb_module_get_ctx(module));
1228 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1230 ret = dsdb_module_extended(module, tseq, &res,
1231 LDB_EXTENDED_SEQUENCE_NUMBER,
1233 DSDB_FLAG_NEXT_MODULE,
1235 if (ret != LDB_SUCCESS) {
1240 seqr = talloc_get_type_abort(res->extended->data,
1241 struct ldb_seqnum_result);
1242 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1244 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1245 "Primary backend in partition module returned a timestamp based seq");
1248 *seq_number = seqr->seq_num;
1255 * Older version of sequence number as sum of sequence numbers for each partition
1257 int partition_sequence_number_from_partitions(struct ldb_module *module,
1262 uint64_t seq_number = 0;
1263 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1264 struct partition_private_data);
1266 ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1267 if (ret != LDB_SUCCESS) {
1271 /* Skip the lot if 'data' isn't here yet (initialisation) */
1272 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1273 struct ldb_seqnum_request *tseq;
1274 struct ldb_seqnum_result *tseqr;
1275 struct ldb_request *treq;
1276 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1278 return ldb_oom(ldb_module_get_ctx(module));
1280 tseq = talloc_zero(res, struct ldb_seqnum_request);
1283 return ldb_oom(ldb_module_get_ctx(module));
1285 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1287 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1288 LDB_EXTENDED_SEQUENCE_NUMBER,
1292 ldb_extended_default_callback,
1294 LDB_REQ_SET_LOCATION(treq);
1295 if (ret != LDB_SUCCESS) {
1300 ret = partition_request(data->partitions[i]->module, treq);
1301 if (ret != LDB_SUCCESS) {
1305 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1306 if (ret != LDB_SUCCESS) {
1310 tseqr = talloc_get_type(res->extended->data,
1311 struct ldb_seqnum_result);
1312 seq_number += tseqr->seq_num;
1322 * Newer version of sequence number using metadata tdb
1324 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1326 struct ldb_extended *ext;
1327 struct ldb_seqnum_request *seq;
1328 struct ldb_seqnum_result *seqr;
1329 uint64_t seq_number;
1332 seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1333 switch (seq->type) {
1335 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1336 if (ret != LDB_SUCCESS) {
1341 case LDB_SEQ_HIGHEST_SEQ:
1342 ret = partition_metadata_sequence_number(module, &seq_number);
1343 if (ret != LDB_SUCCESS) {
1348 case LDB_SEQ_HIGHEST_TIMESTAMP:
1349 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1350 "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1353 ext = talloc_zero(req, struct ldb_extended);
1355 return ldb_module_oom(module);
1357 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1360 return ldb_module_oom(module);
1362 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1365 seqr->seq_num = seq_number;
1366 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1368 /* send request done */
1369 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1372 /* lock all the backends */
1373 int partition_read_lock(struct ldb_module *module)
1378 struct ldb_context *ldb = ldb_module_get_ctx(module);
1379 struct partition_private_data *data = \
1380 talloc_get_type(ldb_module_get_private(module),
1381 struct partition_private_data);
1383 if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1384 ldb_debug(ldb, LDB_DEBUG_TRACE,
1385 "partition_read_lock() -> (metadata partition)");
1389 * It is important to only do this for LOCK because:
1390 * - we don't want to unlock what we did not lock
1392 * - we don't want to make a new lock on the sam.ldb
1393 * (triggered inside this routine due to the seq num check)
1394 * during an unlock phase as that will violate the lock
1399 TALLOC_CTX *mem_ctx = talloc_new(module);
1401 data = talloc_zero(mem_ctx, struct partition_private_data);
1403 talloc_free(mem_ctx);
1404 return ldb_operr(ldb);
1408 * When used from Samba4, this message is set by the
1409 * samba4 module, as a fixed value not read from the
1410 * DB. This avoids listing modules in the DB
1412 data->forced_module_msg = talloc_get_type(
1414 DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1415 struct ldb_message);
1417 ldb_module_set_private(module, talloc_steal(module,
1419 talloc_free(mem_ctx);
1423 * This will lock the metadata partition (sam.ldb) and
1424 * will also call event loops, so we do it before we
1425 * get the whole db lock.
1427 ret = partition_reload_if_required(module, data, NULL);
1428 if (ret != LDB_SUCCESS) {
1433 * This order must match that in prepare_commit(), start with
1434 * the metadata partition (sam.ldb) lock
1436 ret = ldb_next_read_lock(module);
1437 if (ret != LDB_SUCCESS) {
1440 "Failed to lock db: %s / %s for metadata partition",
1448 * The metadata partition (sam.ldb) lock is not
1449 * enough to block another process in prepare_commit(),
1450 * because prepare_commit() is a no-op, if nothing
1451 * was changed in the specific backend.
1453 * That means the following per partition locks are required.
1455 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1456 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1457 ldb_debug(ldb, LDB_DEBUG_TRACE,
1458 "partition_read_lock() -> %s",
1459 ldb_dn_get_linearized(
1460 data->partitions[i]->ctrl->dn));
1462 ret = ldb_next_read_lock(data->partitions[i]->module);
1463 if (ret == LDB_SUCCESS) {
1469 "Failed to lock db: %s / %s for %s",
1472 ldb_dn_get_linearized(
1473 data->partitions[i]->ctrl->dn));
1475 /* Back it out, if it fails on one */
1476 for (i--; i >= 0; i--) {
1477 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1478 if (ret2 != LDB_SUCCESS) {
1481 "Failed to unlock db: %s / %s",
1483 ldb_strerror(ret2));
1486 ret2 = ldb_next_read_unlock(module);
1487 if (ret2 != LDB_SUCCESS) {
1490 "Failed to unlock db: %s / %s",
1492 ldb_strerror(ret2));
1500 /* unlock all the backends */
1501 int partition_read_unlock(struct ldb_module *module)
1504 int ret = LDB_SUCCESS;
1506 struct ldb_context *ldb = ldb_module_get_ctx(module);
1507 struct partition_private_data *data = \
1508 talloc_get_type(ldb_module_get_private(module),
1509 struct partition_private_data);
1512 * This order must be similar to partition_{end,del}_trans()
1513 * the metadata partition (sam.ldb) unlock must be at the end.
1516 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1517 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1518 ldb_debug(ldb, LDB_DEBUG_TRACE,
1519 "partition_read_unlock() -> %s",
1520 ldb_dn_get_linearized(
1521 data->partitions[i]->ctrl->dn));
1523 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1524 if (ret2 != LDB_SUCCESS) {
1527 "Failed to lock db: %s / %s for %s",
1530 ldb_dn_get_linearized(
1531 data->partitions[i]->ctrl->dn));
1534 * Don't overwrite the original failure code
1537 if (ret == LDB_SUCCESS) {
1543 if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1544 ldb_debug(ldb, LDB_DEBUG_TRACE,
1545 "partition_read_unlock() -> (metadata partition)");
1548 ret2 = ldb_next_read_unlock(module);
1549 if (ret2 != LDB_SUCCESS) {
1552 "Failed to unlock db: %s / %s for metadata partition",
1554 ldb_strerror(ret2));
1557 * Don't overwrite the original failure code
1560 if (ret == LDB_SUCCESS) {
1569 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1571 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1572 struct partition_private_data);
1573 struct partition_context *ac;
1576 /* if we aren't initialised yet go further */
1578 return ldb_next_request(module, req);
1581 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1582 /* Update the metadata.tdb to increment the schema version if needed*/
1583 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1584 ret = partition_metadata_inc_schema_sequence(module);
1585 return ldb_module_done(req, NULL, NULL, ret);
1588 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1589 return partition_sequence_number(module, req);
1592 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1593 return partition_create(module, req);
1597 * as the extended operation has no dn
1598 * we need to send it to all partitions
1601 ac = partition_init_ctx(module, req);
1603 return ldb_operr(ldb_module_get_ctx(module));
1606 return partition_send_all(module, ac, req);
1609 static const struct ldb_module_ops ldb_partition_module_ops = {
1610 .name = "partition",
1611 .init_context = partition_init,
1612 .search = partition_search,
1613 .add = partition_add,
1614 .modify = partition_modify,
1615 .del = partition_delete,
1616 .rename = partition_rename,
1617 .extended = partition_extended,
1618 .start_transaction = partition_start_trans,
1619 .prepare_commit = partition_prepare_commit,
1620 .end_transaction = partition_end_trans,
1621 .del_transaction = partition_del_trans,
1622 .read_lock = partition_read_lock,
1623 .read_unlock = partition_read_unlock
1626 int ldb_partition_module_init(const char *version)
1628 LDB_MODULE_CHECK_VERSION(version);
1629 return ldb_register_module(&ldb_partition_module_ops);