4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
83 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
92 struct ldb_request *req)
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
137 struct partition_private_data *data;
138 struct ldb_control *partition_ctrl;
140 ac = talloc_get_type(req->context, struct partition_context);
141 data = talloc_get_type(ac->module->private_data, struct partition_private_data);
144 return ldb_module_done(ac->req, NULL, NULL,
145 LDB_ERR_OPERATIONS_ERROR);
148 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150 /* If we didn't fan this request out to mulitple partitions,
151 * or this is an individual search result, we can
152 * deterministily tell the caller what partition this was
153 * written to (repl_meta_data likes to know) */
154 ret = ldb_reply_add_control(ares,
155 DSDB_CONTROL_CURRENT_PARTITION_OID,
156 false, partition_ctrl->data);
157 if (ret != LDB_SUCCESS) {
158 return ldb_module_done(ac->req, NULL, NULL,
163 if (ares->error != LDB_SUCCESS) {
164 return ldb_module_done(ac->req, ares->controls,
165 ares->response, ares->error);
168 switch (ares->type) {
169 case LDB_REPLY_REFERRAL:
170 return ldb_module_send_referral(ac->req, ares->referral);
172 case LDB_REPLY_ENTRY:
173 if (ac->req->operation != LDB_SEARCH) {
174 ldb_set_errstring(ldb_module_get_ctx(ac->module),
175 "partition_req_callback:"
176 " Unsupported reply type for this request");
177 return ldb_module_done(ac->req, NULL, NULL,
178 LDB_ERR_OPERATIONS_ERROR);
181 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
184 if (ac->req->operation == LDB_EXTENDED) {
185 /* FIXME: check for ares->response, replmd does not fill it ! */
186 if (ares->response) {
187 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188 ldb_set_errstring(ldb_module_get_ctx(ac->module),
189 "partition_req_callback:"
190 " Unknown extended reply, "
191 "only supports START_TLS");
193 return ldb_module_done(ac->req, NULL, NULL,
194 LDB_ERR_OPERATIONS_ERROR);
199 ac->finished_requests++;
200 if (ac->finished_requests == ac->num_requests) {
201 /* Send back referrals if they do exist (search ops) */
202 if (ac->referrals != NULL) {
204 for (ref = ac->referrals; *ref != NULL; ++ref) {
205 ret = ldb_module_send_referral(ac->req,
206 talloc_strdup(ac->req, *ref));
207 if (ret != LDB_SUCCESS) {
208 return ldb_module_done(ac->req, NULL, NULL,
214 /* this was the last one, call callback */
215 return ldb_module_done(ac->req, ares->controls,
220 /* not the last, now call the next one */
221 module = ac->part_req[ac->finished_requests].module;
222 nreq = ac->part_req[ac->finished_requests].req;
224 ret = partition_request(module, nreq);
225 if (ret != LDB_SUCCESS) {
227 return ldb_module_done(ac->req, NULL, NULL, ret);
237 static int partition_prep_request(struct partition_context *ac,
238 struct dsdb_partition *partition)
241 struct ldb_request *req;
243 ac->part_req = talloc_realloc(ac, ac->part_req,
245 ac->num_requests + 1);
246 if (ac->part_req == NULL) {
247 return ldb_oom(ldb_module_get_ctx(ac->module));
250 switch (ac->req->operation) {
252 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
254 ac->req->op.search.base,
255 ac->req->op.search.scope,
256 ac->req->op.search.tree,
257 ac->req->op.search.attrs,
259 ac, partition_req_callback,
261 LDB_REQ_SET_LOCATION(req);
264 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265 ac->req->op.add.message,
267 ac, partition_req_callback,
269 LDB_REQ_SET_LOCATION(req);
272 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273 ac->req->op.mod.message,
275 ac, partition_req_callback,
277 LDB_REQ_SET_LOCATION(req);
280 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
283 ac, partition_req_callback,
285 LDB_REQ_SET_LOCATION(req);
288 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
289 ac->req->op.rename.olddn,
290 ac->req->op.rename.newdn,
292 ac, partition_req_callback,
294 LDB_REQ_SET_LOCATION(req);
297 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
299 ac->req->op.extended.oid,
300 ac->req->op.extended.data,
302 ac, partition_req_callback,
304 LDB_REQ_SET_LOCATION(req);
307 ldb_set_errstring(ldb_module_get_ctx(ac->module),
308 "Unsupported request type!");
309 ret = LDB_ERR_UNWILLING_TO_PERFORM;
312 if (ret != LDB_SUCCESS) {
316 ac->part_req[ac->num_requests].req = req;
318 if (ac->req->controls) {
319 req->controls = talloc_memdup(req, ac->req->controls,
320 talloc_get_size(ac->req->controls));
321 if (req->controls == NULL) {
322 return ldb_oom(ldb_module_get_ctx(ac->module));
327 ac->part_req[ac->num_requests].module = partition->module;
329 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
330 ret = ldb_request_add_control(req,
331 DSDB_CONTROL_CURRENT_PARTITION_OID,
332 false, partition->ctrl);
333 if (ret != LDB_SUCCESS) {
338 if (req->operation == LDB_SEARCH) {
339 /* If the search is for 'more' than this partition,
340 * then change the basedn, so a remote LDAP server
342 if (ldb_dn_compare_base(partition->ctrl->dn,
343 req->op.search.base) != 0) {
344 req->op.search.base = partition->ctrl->dn;
349 /* make sure you put the module here, or
350 * or ldb_next_request() will skip a module */
351 ac->part_req[ac->num_requests].module = ac->module;
359 static int partition_call_first(struct partition_context *ac)
361 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
365 * Send a request down to all the partitions
367 static int partition_send_all(struct ldb_module *module,
368 struct partition_context *ac,
369 struct ldb_request *req)
372 struct partition_private_data *data = talloc_get_type(module->private_data,
373 struct partition_private_data);
374 int ret = partition_prep_request(ac, NULL);
375 if (ret != LDB_SUCCESS) {
378 for (i=0; data && data->partitions && data->partitions[i]; i++) {
379 ret = partition_prep_request(ac, data->partitions[i]);
380 if (ret != LDB_SUCCESS) {
385 /* fire the first one */
386 return partition_call_first(ac);
390 * Figure out which backend a request needs to be aimed at. Some
391 * requests must be replicated to all backends
393 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
395 struct partition_context *ac;
398 struct dsdb_partition *partition;
399 struct partition_private_data *data = talloc_get_type(module->private_data,
400 struct partition_private_data);
401 if (!data || !data->partitions) {
402 return ldb_next_request(module, req);
405 if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
406 /* Is this a special DN, we need to replicate to every backend? */
407 for (i=0; data->replicate && data->replicate[i]; i++) {
408 if (ldb_dn_compare(data->replicate[i],
411 ac = partition_init_ctx(module, req);
413 return ldb_operr(ldb_module_get_ctx(module));
416 return partition_send_all(module, ac, req);
421 /* Otherwise, we need to find the partition to fire it to */
424 partition = find_partition(data, dn, req);
427 * if we haven't found a matching partition
428 * pass the request to the main ldb
430 * TODO: we should maybe return an error here
431 * if it's not a special dn
434 return ldb_next_request(module, req);
437 ac = partition_init_ctx(module, req);
439 return ldb_operr(ldb_module_get_ctx(module));
442 /* we need to add a control but we never touch the original request */
443 ret = partition_prep_request(ac, partition);
444 if (ret != LDB_SUCCESS) {
448 /* fire the first one */
449 return partition_call_first(ac);
453 static int partition_search(struct ldb_module *module, struct ldb_request *req)
455 struct ldb_control **saved_controls;
457 struct partition_private_data *data = talloc_get_type(module->private_data,
458 struct partition_private_data);
459 struct partition_context *ac;
460 struct ldb_context *ldb;
461 struct loadparm_context *lp_ctx;
463 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
464 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
466 struct ldb_search_options_control *search_options = NULL;
467 struct dsdb_partition *p;
470 bool domain_scope = false, phantom_root = false;
472 ret = partition_reload_if_required(module, data);
473 if (ret != LDB_SUCCESS) {
477 p = find_partition(data, NULL, req);
479 /* the caller specified what partition they want the
480 * search - just pass it on
482 return ldb_next_request(p->module, req);
485 /* Get back the search options from the search control, and mark it as
486 * non-critical (to make backends and also dcpromo happy).
488 if (search_control) {
489 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
490 search_control->critical = 0;
494 /* Remove the "domain_scope" control, so we don't confuse a backend
496 if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
497 return ldb_oom(ldb_module_get_ctx(module));
500 /* Locate the options */
501 domain_scope = (search_options
502 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
503 || domain_scope_control;
504 phantom_root = search_options
505 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
507 /* Remove handled options from the search control flag */
508 if (search_options) {
509 search_options->search_options = search_options->search_options
510 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
511 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
514 if (!data || !data->partitions) {
515 return ldb_next_request(module, req);
518 ac = partition_init_ctx(module, req);
520 return ldb_operr(ldb_module_get_ctx(module));
523 ldb = ldb_module_get_ctx(ac->module);
524 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
525 struct loadparm_context);
527 /* Search from the base DN */
528 if (ldb_dn_is_null(req->op.search.base)) {
529 return partition_send_all(module, ac, req);
532 for (i=0; data->partitions[i]; i++) {
533 bool match = false, stop = false;
536 /* Phantom root: Find all partitions under the
537 * search base. We match if:
539 * 1) the DN we are looking for exactly matches a
540 * certain partition and always stop
541 * 2) the DN we are looking for is a parent of certain
542 * partitions and it isn't a scope base search
543 * 3) the DN we are looking for is a child of a certain
544 * partition and always stop
545 * - we don't need to go any further up in the
548 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
549 req->op.search.base) == 0) {
554 (ldb_dn_compare_base(req->op.search.base,
555 data->partitions[i]->ctrl->dn) == 0 &&
556 req->op.search.scope != LDB_SCOPE_BASE)) {
560 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
561 req->op.search.base) == 0) {
563 stop = true; /* note that this relies on partition ordering */
566 /* Domain scope: Find all partitions under the search
569 * We generate referral candidates if we haven't
570 * specified the domain scope control, haven't a base
571 * search* scope and the DN we are looking for is a real
572 * predecessor of certain partitions. When a new
573 * referral candidate is nearer to the DN than an
574 * existing one delete the latter (we want to have only
575 * the closest ones). When we checked this for all
576 * candidates we have the final referrals.
578 * We match if the DN we are looking for is a child of
579 * a certain partition or the partition
580 * DN itself - we don't need to go any further
581 * up in the hierarchy!
583 if ((!domain_scope) &&
584 (req->op.search.scope != LDB_SCOPE_BASE) &&
585 (ldb_dn_compare_base(req->op.search.base,
586 data->partitions[i]->ctrl->dn) == 0) &&
587 (ldb_dn_compare(req->op.search.base,
588 data->partitions[i]->ctrl->dn) != 0)) {
589 char *ref = talloc_asprintf(ac,
591 lpcfg_dnsdomain(lp_ctx),
592 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
593 req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
599 /* Initialise the referrals list */
600 if (ac->referrals == NULL) {
601 ac->referrals = (const char **) str_list_make_empty(ac);
602 if (ac->referrals == NULL) {
607 /* Check if the new referral candidate is
608 * closer to the base DN than already
609 * saved ones and delete the latters */
611 while (ac->referrals[j] != NULL) {
612 if (strstr(ac->referrals[j],
613 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
614 str_list_remove(ac->referrals,
621 /* Add our new candidate */
622 ac->referrals = str_list_add(ac->referrals, ref);
626 if (ac->referrals == NULL) {
630 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
632 stop = true; /* note that this relies on partition ordering */
637 ret = partition_prep_request(ac, data->partitions[i]);
638 if (ret != LDB_SUCCESS) {
646 /* Perhaps we didn't match any partitions. Try the main partition */
647 if (ac->num_requests == 0) {
649 return ldb_next_request(module, req);
652 /* fire the first one */
653 return partition_call_first(ac);
657 static int partition_add(struct ldb_module *module, struct ldb_request *req)
659 return partition_replicate(module, req, req->op.add.message->dn);
663 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
665 return partition_replicate(module, req, req->op.mod.message->dn);
669 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
671 return partition_replicate(module, req, req->op.del.dn);
675 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
678 struct dsdb_partition *backend, *backend2;
680 struct partition_private_data *data = talloc_get_type(module->private_data,
681 struct partition_private_data);
683 /* Skip the lot if 'data' isn't here yet (initialisation) */
685 return ldb_operr(ldb_module_get_ctx(module));
688 backend = find_partition(data, req->op.rename.olddn, req);
689 backend2 = find_partition(data, req->op.rename.newdn, req);
691 if ((backend && !backend2) || (!backend && backend2)) {
692 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
695 if (backend != backend2) {
696 ldb_asprintf_errstring(ldb_module_get_ctx(module),
697 "Cannot rename from %s in %s to %s in %s: %s",
698 ldb_dn_get_linearized(req->op.rename.olddn),
699 ldb_dn_get_linearized(backend->ctrl->dn),
700 ldb_dn_get_linearized(req->op.rename.newdn),
701 ldb_dn_get_linearized(backend2->ctrl->dn),
702 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
703 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
706 return partition_replicate(module, req, req->op.rename.olddn);
709 /* start a transaction */
710 static int partition_start_trans(struct ldb_module *module)
714 struct partition_private_data *data = talloc_get_type(module->private_data,
715 struct partition_private_data);
716 /* Look at base DN */
717 /* Figure out which partition it is under */
718 /* Skip the lot if 'data' isn't here yet (initialization) */
719 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
720 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
722 ret = ldb_next_start_trans(module);
723 if (ret != LDB_SUCCESS) {
727 ret = partition_reload_if_required(module, data);
728 if (ret != LDB_SUCCESS) {
732 for (i=0; data && data->partitions && data->partitions[i]; i++) {
733 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
734 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
735 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
737 ret = ldb_next_start_trans(data->partitions[i]->module);
738 if (ret != LDB_SUCCESS) {
739 /* Back it out, if it fails on one */
740 for (i--; i >= 0; i--) {
741 ldb_next_del_trans(data->partitions[i]->module);
743 ldb_next_del_trans(module);
748 data->in_transaction++;
753 /* prepare for a commit */
754 static int partition_prepare_commit(struct ldb_module *module)
757 struct partition_private_data *data = talloc_get_type(module->private_data,
758 struct partition_private_data);
760 for (i=0; data && data->partitions && data->partitions[i]; i++) {
763 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
764 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
765 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
767 ret = ldb_next_prepare_commit(data->partitions[i]->module);
768 if (ret != LDB_SUCCESS) {
769 ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s",
770 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
771 ldb_errstring(module->ldb));
776 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
777 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
779 return ldb_next_prepare_commit(module);
783 /* end a transaction */
784 static int partition_end_trans(struct ldb_module *module)
788 struct partition_private_data *data = talloc_get_type(module->private_data,
789 struct partition_private_data);
793 if (data->in_transaction == 0) {
794 DEBUG(0,("partition end transaction mismatch\n"));
795 ret = LDB_ERR_OPERATIONS_ERROR;
797 data->in_transaction--;
800 for (i=0; data && data->partitions && data->partitions[i]; i++) {
801 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
802 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
803 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
805 ret2 = ldb_next_end_trans(data->partitions[i]->module);
806 if (ret2 != LDB_SUCCESS) {
807 ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s",
808 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
809 ldb_errstring(module->ldb));
814 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
815 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
817 ret2 = ldb_next_end_trans(module);
818 if (ret2 != LDB_SUCCESS) {
824 /* delete a transaction */
825 static int partition_del_trans(struct ldb_module *module)
827 int ret, final_ret = LDB_SUCCESS;
829 struct partition_private_data *data = talloc_get_type(module->private_data,
830 struct partition_private_data);
831 for (i=0; data && data->partitions && data->partitions[i]; i++) {
832 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
833 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
834 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
836 ret = ldb_next_del_trans(data->partitions[i]->module);
837 if (ret != LDB_SUCCESS) {
838 ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s",
839 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
840 ldb_errstring(module->ldb));
845 if (data->in_transaction == 0) {
846 DEBUG(0,("partition del transaction mismatch\n"));
847 return ldb_operr(ldb_module_get_ctx(module));
849 data->in_transaction--;
851 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
852 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
854 ret = ldb_next_del_trans(module);
855 if (ret != LDB_SUCCESS) {
861 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
862 enum ldb_sequence_type type, uint64_t *seq_number)
865 struct ldb_result *res;
866 struct ldb_seqnum_request *tseq;
867 struct ldb_request *treq;
868 struct ldb_seqnum_result *seqr;
869 res = talloc_zero(mem_ctx, struct ldb_result);
871 return ldb_oom(ldb_module_get_ctx(module));
873 tseq = talloc_zero(res, struct ldb_seqnum_request);
876 return ldb_oom(ldb_module_get_ctx(module));
880 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
881 LDB_EXTENDED_SEQUENCE_NUMBER,
885 ldb_extended_default_callback,
887 LDB_REQ_SET_LOCATION(treq);
888 if (ret != LDB_SUCCESS) {
893 ret = ldb_next_request(module, treq);
894 if (ret != LDB_SUCCESS) {
898 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
899 if (ret != LDB_SUCCESS) {
904 seqr = talloc_get_type(res->extended->data,
905 struct ldb_seqnum_result);
906 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
907 ret = LDB_ERR_OPERATIONS_ERROR;
908 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
912 *seq_number = seqr->seq_num;
918 /* FIXME: This function is still semi-async */
919 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
923 uint64_t seq_number = 0;
924 uint64_t timestamp_sequence = 0;
925 uint64_t timestamp = 0;
926 struct partition_private_data *data = talloc_get_type(module->private_data,
927 struct partition_private_data);
928 struct ldb_seqnum_request *seq;
929 struct ldb_seqnum_result *seqr;
930 struct ldb_request *treq;
931 struct ldb_seqnum_request *tseq;
932 struct ldb_seqnum_result *tseqr;
933 struct ldb_extended *ext;
934 struct ldb_result *res;
935 struct dsdb_partition *p;
937 p = find_partition(data, NULL, req);
939 /* the caller specified what partition they want the
940 * sequence number operation on - just pass it on
942 return ldb_next_request(p->module, req);
945 seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
949 case LDB_SEQ_HIGHEST_SEQ:
951 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
952 if (ret != LDB_SUCCESS) {
956 /* Skip the lot if 'data' isn't here yet (initialisation) */
957 for (i=0; data && data->partitions && data->partitions[i]; i++) {
959 res = talloc_zero(req, struct ldb_result);
961 return ldb_oom(ldb_module_get_ctx(module));
963 tseq = talloc_zero(res, struct ldb_seqnum_request);
966 return ldb_oom(ldb_module_get_ctx(module));
968 tseq->type = seq->type;
970 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
971 LDB_EXTENDED_SEQUENCE_NUMBER,
975 ldb_extended_default_callback,
977 LDB_REQ_SET_LOCATION(treq);
978 if (ret != LDB_SUCCESS) {
983 if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
984 ret = ldb_request_add_control(treq,
985 DSDB_CONTROL_CURRENT_PARTITION_OID,
986 false, data->partitions[i]->ctrl);
987 if (ret != LDB_SUCCESS) {
993 ret = partition_request(data->partitions[i]->module, treq);
994 if (ret != LDB_SUCCESS) {
998 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
999 if (ret != LDB_SUCCESS) {
1003 tseqr = talloc_get_type(res->extended->data,
1004 struct ldb_seqnum_result);
1005 if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1006 timestamp_sequence = MAX(timestamp_sequence,
1009 seq_number += tseqr->seq_num;
1014 case LDB_SEQ_HIGHEST_TIMESTAMP:
1016 res = talloc_zero(req, struct ldb_result);
1018 return ldb_oom(ldb_module_get_ctx(module));
1021 tseq = talloc_zero(res, struct ldb_seqnum_request);
1024 return ldb_oom(ldb_module_get_ctx(module));
1026 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1028 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1029 LDB_EXTENDED_SEQUENCE_NUMBER,
1033 ldb_extended_default_callback,
1035 LDB_REQ_SET_LOCATION(treq);
1036 if (ret != LDB_SUCCESS) {
1041 ret = ldb_next_request(module, treq);
1042 if (ret != LDB_SUCCESS) {
1046 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1047 if (ret != LDB_SUCCESS) {
1052 tseqr = talloc_get_type(res->extended->data,
1053 struct ldb_seqnum_result);
1054 timestamp = tseqr->seq_num;
1058 /* Skip the lot if 'data' isn't here yet (initialisation) */
1059 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1061 res = talloc_zero(req, struct ldb_result);
1063 return ldb_oom(ldb_module_get_ctx(module));
1066 tseq = talloc_zero(res, struct ldb_seqnum_request);
1069 return ldb_oom(ldb_module_get_ctx(module));
1071 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1073 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1074 LDB_EXTENDED_SEQUENCE_NUMBER,
1078 ldb_extended_default_callback,
1080 LDB_REQ_SET_LOCATION(treq);
1081 if (ret != LDB_SUCCESS) {
1086 if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
1087 ret = ldb_request_add_control(treq,
1088 DSDB_CONTROL_CURRENT_PARTITION_OID,
1089 false, data->partitions[i]->ctrl);
1090 if (ret != LDB_SUCCESS) {
1096 ret = partition_request(data->partitions[i]->module, treq);
1097 if (ret != LDB_SUCCESS) {
1101 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1102 if (ret != LDB_SUCCESS) {
1107 tseqr = talloc_get_type(res->extended->data,
1108 struct ldb_seqnum_result);
1109 timestamp = MAX(timestamp, tseqr->seq_num);
1117 ext = talloc_zero(req, struct ldb_extended);
1119 return ldb_oom(ldb_module_get_ctx(module));
1121 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1124 return ldb_oom(ldb_module_get_ctx(module));
1126 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1129 switch (seq->type) {
1131 case LDB_SEQ_HIGHEST_SEQ:
1133 /* Has someone above set a timebase sequence? */
1134 if (timestamp_sequence) {
1135 seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1137 seqr->seq_num = seq_number;
1140 if (timestamp_sequence > seqr->seq_num) {
1141 seqr->seq_num = timestamp_sequence;
1142 seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1145 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1147 case LDB_SEQ_HIGHEST_TIMESTAMP:
1148 seqr->seq_num = timestamp;
1152 if (seq->type == LDB_SEQ_NEXT) {
1156 /* send request done */
1157 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1161 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1163 struct partition_private_data *data;
1164 struct partition_context *ac;
1167 data = talloc_get_type(module->private_data, struct partition_private_data);
1169 return ldb_next_request(module, req);
1172 ret = partition_reload_if_required(module, data);
1173 if (ret != LDB_SUCCESS) {
1177 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1178 return partition_sequence_number(module, req);
1181 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1182 return partition_create(module, req);
1186 * as the extended operation has no dn
1187 * we need to send it to all partitions
1190 ac = partition_init_ctx(module, req);
1192 return ldb_operr(ldb_module_get_ctx(module));
1195 return partition_send_all(module, ac, req);
1198 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1199 .name = "partition",
1200 .init_context = partition_init,
1201 .search = partition_search,
1202 .add = partition_add,
1203 .modify = partition_modify,
1204 .del = partition_delete,
1205 .rename = partition_rename,
1206 .extended = partition_extended,
1207 .start_transaction = partition_start_trans,
1208 .prepare_commit = partition_prepare_commit,
1209 .end_transaction = partition_end_trans,
1210 .del_transaction = partition_del_trans,