f75085e8a2f3d8aed9afe1e15427da811012f8fc
[nivanova/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct partition_private_data *data;
138         struct ldb_control *partition_ctrl;
139
140         ac = talloc_get_type(req->context, struct partition_context);
141         data = talloc_get_type(ldb_module_get_private(ac->module), struct partition_private_data);
142
143         if (!ares) {
144                 return ldb_module_done(ac->req, NULL, NULL,
145                                         LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150                 /* If we didn't fan this request out to mulitple partitions,
151                  * or this is an individual search result, we can
152                  * deterministically tell the caller what partition this was
153                  * written to (repl_meta_data likes to know) */
154                 ret = ldb_reply_add_control(ares,
155                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
156                                             false, partition_ctrl->data);
157                 if (ret != LDB_SUCCESS) {
158                         return ldb_module_done(ac->req, NULL, NULL,
159                                                ret);
160                 }
161         }
162
163         if (ares->error != LDB_SUCCESS) {
164                 return ldb_module_done(ac->req, ares->controls,
165                                         ares->response, ares->error);
166         }
167
168         switch (ares->type) {
169         case LDB_REPLY_REFERRAL:
170                 return ldb_module_send_referral(ac->req, ares->referral);
171
172         case LDB_REPLY_ENTRY:
173                 if (ac->req->operation != LDB_SEARCH) {
174                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
175                                 "partition_req_callback:"
176                                 " Unsupported reply type for this request");
177                         return ldb_module_done(ac->req, NULL, NULL,
178                                                 LDB_ERR_OPERATIONS_ERROR);
179                 }
180                 
181                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182
183         case LDB_REPLY_DONE:
184                 if (ac->req->operation == LDB_EXTENDED) {
185                         /* FIXME: check for ares->response, replmd does not fill it ! */
186                         if (ares->response) {
187                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
189                                                           "partition_req_callback:"
190                                                           " Unknown extended reply, "
191                                                           "only supports START_TLS");
192                                         talloc_free(ares);
193                                         return ldb_module_done(ac->req, NULL, NULL,
194                                                                 LDB_ERR_OPERATIONS_ERROR);
195                                 }
196                         }
197                 }
198
199                 ac->finished_requests++;
200                 if (ac->finished_requests == ac->num_requests) {
201                         /* Send back referrals if they do exist (search ops) */
202                         if (ac->referrals != NULL) {
203                                 const char **ref;
204                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
205                                         ret = ldb_module_send_referral(ac->req,
206                                                                        talloc_strdup(ac->req, *ref));
207                                         if (ret != LDB_SUCCESS) {
208                                                 return ldb_module_done(ac->req, NULL, NULL,
209                                                                        ret);
210                                         }
211                                 }
212                         }
213
214                         /* this was the last one, call callback */
215                         return ldb_module_done(ac->req, ares->controls,
216                                                ares->response, 
217                                                ares->error);
218                 }
219
220                 /* not the last, now call the next one */
221                 module = ac->part_req[ac->finished_requests].module;
222                 nreq = ac->part_req[ac->finished_requests].req;
223
224                 ret = partition_request(module, nreq);
225                 if (ret != LDB_SUCCESS) {
226                         talloc_free(ares);
227                         return ldb_module_done(ac->req, NULL, NULL, ret);
228                 }
229
230                 break;
231         }
232
233         talloc_free(ares);
234         return LDB_SUCCESS;
235 }
236
237 static int partition_prep_request(struct partition_context *ac,
238                                   struct dsdb_partition *partition)
239 {
240         int ret;
241         struct ldb_request *req;
242
243         ac->part_req = talloc_realloc(ac, ac->part_req,
244                                         struct part_request,
245                                         ac->num_requests + 1);
246         if (ac->part_req == NULL) {
247                 return ldb_oom(ldb_module_get_ctx(ac->module));
248         }
249
250         switch (ac->req->operation) {
251         case LDB_SEARCH:
252                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
253                                         ac->part_req,
254                                         ac->req->op.search.base,
255                                         ac->req->op.search.scope,
256                                         ac->req->op.search.tree,
257                                         ac->req->op.search.attrs,
258                                         ac->req->controls,
259                                         ac, partition_req_callback,
260                                         ac->req);
261                 LDB_REQ_SET_LOCATION(req);
262                 break;
263         case LDB_ADD:
264                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265                                         ac->req->op.add.message,
266                                         ac->req->controls,
267                                         ac, partition_req_callback,
268                                         ac->req);
269                 LDB_REQ_SET_LOCATION(req);
270                 break;
271         case LDB_MODIFY:
272                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273                                         ac->req->op.mod.message,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 LDB_REQ_SET_LOCATION(req);
278                 break;
279         case LDB_DELETE:
280                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
281                                         ac->req->op.del.dn,
282                                         ac->req->controls,
283                                         ac, partition_req_callback,
284                                         ac->req);
285                 LDB_REQ_SET_LOCATION(req);
286                 break;
287         case LDB_RENAME:
288                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
289                                         ac->req->op.rename.olddn,
290                                         ac->req->op.rename.newdn,
291                                         ac->req->controls,
292                                         ac, partition_req_callback,
293                                         ac->req);
294                 LDB_REQ_SET_LOCATION(req);
295                 break;
296         case LDB_EXTENDED:
297                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298                                         ac->part_req,
299                                         ac->req->op.extended.oid,
300                                         ac->req->op.extended.data,
301                                         ac->req->controls,
302                                         ac, partition_req_callback,
303                                         ac->req);
304                 LDB_REQ_SET_LOCATION(req);
305                 break;
306         default:
307                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
308                                   "Unsupported request type!");
309                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
310         }
311
312         if (ret != LDB_SUCCESS) {
313                 return ret;
314         }
315
316         ac->part_req[ac->num_requests].req = req;
317
318         if (ac->req->controls) {
319                 req->controls = talloc_memdup(req, ac->req->controls,
320                                         talloc_get_size(ac->req->controls));
321                 if (req->controls == NULL) {
322                         return ldb_oom(ldb_module_get_ctx(ac->module));
323                 }
324         }
325
326         if (partition) {
327                 ac->part_req[ac->num_requests].module = partition->module;
328
329                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
330                         ret = ldb_request_add_control(req,
331                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
332                                                       false, partition->ctrl);
333                         if (ret != LDB_SUCCESS) {
334                                 return ret;
335                         }
336                 }
337
338                 if (req->operation == LDB_SEARCH) {
339                         /* If the search is for 'more' than this partition,
340                          * then change the basedn, so a remote LDAP server
341                          * doesn't object */
342                         if (ldb_dn_compare_base(partition->ctrl->dn,
343                                                 req->op.search.base) != 0) {
344                                 req->op.search.base = partition->ctrl->dn;
345                         }
346                 }
347
348         } else {
349                 /* make sure you put the module here, or
350                  * or ldb_next_request() will skip a module */
351                 ac->part_req[ac->num_requests].module = ac->module;
352         }
353
354         ac->num_requests++;
355
356         return LDB_SUCCESS;
357 }
358
359 static int partition_call_first(struct partition_context *ac)
360 {
361         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
362 }
363
364 /**
365  * Send a request down to all the partitions
366  */
367 static int partition_send_all(struct ldb_module *module, 
368                               struct partition_context *ac, 
369                               struct ldb_request *req) 
370 {
371         unsigned int i;
372         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
373                                                               struct partition_private_data);
374         int ret = partition_prep_request(ac, NULL);
375         if (ret != LDB_SUCCESS) {
376                 return ret;
377         }
378         for (i=0; data && data->partitions && data->partitions[i]; i++) {
379                 ret = partition_prep_request(ac, data->partitions[i]);
380                 if (ret != LDB_SUCCESS) {
381                         return ret;
382                 }
383         }
384
385         /* fire the first one */
386         return partition_call_first(ac);
387 }
388
389 /**
390  * Figure out which backend a request needs to be aimed at.  Some
391  * requests must be replicated to all backends
392  */
393 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
394 {
395         struct partition_context *ac;
396         unsigned int i;
397         int ret;
398         struct dsdb_partition *partition;
399         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
400                                                               struct partition_private_data);
401
402         /* if we aren't initialised yet go further */
403         if (!data || !data->partitions) {
404                 return ldb_next_request(module, req);
405         }
406
407         if (ldb_dn_is_special(dn)) {
408                 /* Is this a special DN, we need to replicate to every backend? */
409                 for (i=0; data->replicate && data->replicate[i]; i++) {
410                         if (ldb_dn_compare(data->replicate[i], 
411                                            dn) == 0) {
412                                 
413                                 ac = partition_init_ctx(module, req);
414                                 if (!ac) {
415                                         return ldb_operr(ldb_module_get_ctx(module));
416                                 }
417                                 
418                                 return partition_send_all(module, ac, req);
419                         }
420                 }
421         }
422
423         /* Otherwise, we need to find the partition to fire it to */
424
425         /* Find partition */
426         partition = find_partition(data, dn, req);
427         if (!partition) {
428                 /*
429                  * if we haven't found a matching partition
430                  * pass the request to the main ldb
431                  *
432                  * TODO: we should maybe return an error here
433                  *       if it's not a special dn
434                  */
435
436                 return ldb_next_request(module, req);
437         }
438
439         ac = partition_init_ctx(module, req);
440         if (!ac) {
441                 return ldb_operr(ldb_module_get_ctx(module));
442         }
443
444         /* we need to add a control but we never touch the original request */
445         ret = partition_prep_request(ac, partition);
446         if (ret != LDB_SUCCESS) {
447                 return ret;
448         }
449
450         /* fire the first one */
451         return partition_call_first(ac);
452 }
453
454 /* search */
455 static int partition_search(struct ldb_module *module, struct ldb_request *req)
456 {
457         struct ldb_control **saved_controls;
458         /* Find backend */
459         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
460                                                               struct partition_private_data);
461         struct partition_context *ac;
462         struct ldb_context *ldb;
463         struct loadparm_context *lp_ctx;
464
465         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
466         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
467         
468         struct ldb_search_options_control *search_options = NULL;
469         struct dsdb_partition *p;
470         unsigned int i, j;
471         int ret;
472         bool domain_scope = false, phantom_root = false;
473
474         /* see if we are still up-to-date */
475         ret = partition_reload_if_required(module, data, req);
476         if (ret != LDB_SUCCESS) {
477                 return ret;
478         }
479
480         p = find_partition(data, NULL, req);
481         if (p != NULL) {
482                 /* the caller specified what partition they want the
483                  * search - just pass it on
484                  */
485                 return ldb_next_request(p->module, req);
486         }
487
488         /* Get back the search options from the search control, and mark it as
489          * non-critical (to make backends and also dcpromo happy).
490          */
491         if (search_control) {
492                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
493                 search_control->critical = 0;
494
495         }
496
497         /* Remove the "domain_scope" control, so we don't confuse a backend
498          * server */
499         if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
500                 return ldb_oom(ldb_module_get_ctx(module));
501         }
502
503         /* if we aren't initialised yet go further */
504         if (!data || !data->partitions) {
505                 return ldb_next_request(module, req);
506         }
507
508         /* Locate the options */
509         domain_scope = (search_options
510                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
511                 || domain_scope_control;
512         phantom_root = search_options
513                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
514
515         /* Remove handled options from the search control flag */
516         if (search_options) {
517                 search_options->search_options = search_options->search_options
518                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
519                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
520         }
521
522         ac = partition_init_ctx(module, req);
523         if (!ac) {
524                 return ldb_operr(ldb_module_get_ctx(module));
525         }
526
527         ldb = ldb_module_get_ctx(ac->module);
528         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
529                                                 struct loadparm_context);
530
531         /* Search from the base DN */
532         if (ldb_dn_is_null(req->op.search.base)) {
533                 return partition_send_all(module, ac, req);
534         }
535
536         for (i=0; data->partitions[i]; i++) {
537                 bool match = false, stop = false;
538
539                 if (phantom_root) {
540                         /* Phantom root: Find all partitions under the
541                          * search base. We match if:
542                          *
543                          * 1) the DN we are looking for exactly matches a
544                          *    certain partition and always stop
545                          * 2) the DN we are looking for is a parent of certain
546                          *    partitions and it isn't a scope base search
547                          * 3) the DN we are looking for is a child of a certain
548                          *    partition and always stop
549                          *    - we don't need to go any further up in the
550                          *    hierarchy!
551                          */
552                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
553                                            req->op.search.base) == 0) {
554                                 match = true;
555                                 stop = true;
556                         }
557                         if (!match &&
558                             (ldb_dn_compare_base(req->op.search.base,
559                                                  data->partitions[i]->ctrl->dn) == 0 &&
560                              req->op.search.scope != LDB_SCOPE_BASE)) {
561                                 match = true;
562                         }
563                         if (!match &&
564                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
565                                                 req->op.search.base) == 0) {
566                                 match = true;
567                                 stop = true; /* note that this relies on partition ordering */
568                         }
569                 } else {
570                         /* Domain scope: Find all partitions under the search
571                          * base.
572                          *
573                          * We generate referral candidates if we haven't
574                          * specified the domain scope control, haven't a base
575                          * search* scope and the DN we are looking for is a real
576                          * predecessor of certain partitions. When a new
577                          * referral candidate is nearer to the DN than an
578                          * existing one delete the latter (we want to have only
579                          * the closest ones). When we checked this for all
580                          * candidates we have the final referrals.
581                          *
582                          * We match if the DN we are looking for is a child of
583                          * a certain partition or the partition
584                          * DN itself - we don't need to go any further
585                          * up in the hierarchy!
586                          */
587                         if ((!domain_scope) &&
588                             (req->op.search.scope != LDB_SCOPE_BASE) &&
589                             (ldb_dn_compare_base(req->op.search.base,
590                                                  data->partitions[i]->ctrl->dn) == 0) &&
591                             (ldb_dn_compare(req->op.search.base,
592                                             data->partitions[i]->ctrl->dn) != 0)) {
593                                 char *ref = talloc_asprintf(ac,
594                                                             "ldap://%s/%s%s",
595                                                             lpcfg_dnsdomain(lp_ctx),
596                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
597                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
598
599                                 if (ref == NULL) {
600                                         return ldb_oom(ldb);
601                                 }
602
603                                 /* Initialise the referrals list */
604                                 if (ac->referrals == NULL) {
605                                         ac->referrals = (const char **) str_list_make_empty(ac);
606                                         if (ac->referrals == NULL) {
607                                                 return ldb_oom(ldb);
608                                         }
609                                 }
610
611                                 /* Check if the new referral candidate is
612                                  * closer to the base DN than already
613                                  * saved ones and delete the latters */
614                                 j = 0;
615                                 while (ac->referrals[j] != NULL) {
616                                         if (strstr(ac->referrals[j],
617                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
618                                                 str_list_remove(ac->referrals,
619                                                                 ac->referrals[j]);
620                                         } else {
621                                                 ++j;
622                                         }
623                                 }
624
625                                 /* Add our new candidate */
626                                 ac->referrals = str_list_add(ac->referrals, ref);
627
628                                 talloc_free(ref);
629
630                                 if (ac->referrals == NULL) {
631                                         return ldb_oom(ldb);
632                                 }
633                         }
634                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
635                                 match = true;
636                                 stop = true; /* note that this relies on partition ordering */
637                         }
638                 }
639
640                 if (match) {
641                         ret = partition_prep_request(ac, data->partitions[i]);
642                         if (ret != LDB_SUCCESS) {
643                                 return ret;
644                         }
645                 }
646
647                 if (stop) break;
648         }
649
650         /* Perhaps we didn't match any partitions. Try the main partition */
651         if (ac->num_requests == 0) {
652                 talloc_free(ac);
653                 return ldb_next_request(module, req);
654         }
655
656         /* fire the first one */
657         return partition_call_first(ac);
658 }
659
660 /* add */
661 static int partition_add(struct ldb_module *module, struct ldb_request *req)
662 {
663         return partition_replicate(module, req, req->op.add.message->dn);
664 }
665
666 /* modify */
667 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
668 {
669         return partition_replicate(module, req, req->op.mod.message->dn);
670 }
671
672 /* delete */
673 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
674 {
675         return partition_replicate(module, req, req->op.del.dn);
676 }
677
678 /* rename */
679 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
680 {
681         /* Find backend */
682         struct dsdb_partition *backend, *backend2;
683         
684         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
685                                                               struct partition_private_data);
686
687         /* Skip the lot if 'data' isn't here yet (initialisation) */
688         if (!data) {
689                 return ldb_operr(ldb_module_get_ctx(module));
690         }
691
692         backend = find_partition(data, req->op.rename.olddn, req);
693         backend2 = find_partition(data, req->op.rename.newdn, req);
694
695         if ((backend && !backend2) || (!backend && backend2)) {
696                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
697         }
698
699         if (backend != backend2) {
700                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
701                                        "Cannot rename from %s in %s to %s in %s: %s",
702                                        ldb_dn_get_linearized(req->op.rename.olddn),
703                                        ldb_dn_get_linearized(backend->ctrl->dn),
704                                        ldb_dn_get_linearized(req->op.rename.newdn),
705                                        ldb_dn_get_linearized(backend2->ctrl->dn),
706                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
707                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
708         }
709
710         return partition_replicate(module, req, req->op.rename.olddn);
711 }
712
713 /* start a transaction */
714 static int partition_start_trans(struct ldb_module *module)
715 {
716         unsigned int i;
717         int ret;
718         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
719                                                               struct partition_private_data);
720         /* Look at base DN */
721         /* Figure out which partition it is under */
722         /* Skip the lot if 'data' isn't here yet (initialization) */
723         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
724                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
725         }
726         ret = ldb_next_start_trans(module);
727         if (ret != LDB_SUCCESS) {
728                 return ret;
729         }
730
731         ret = partition_reload_if_required(module, data, NULL);
732         if (ret != LDB_SUCCESS) {
733                 return ret;
734         }
735
736         for (i=0; data && data->partitions && data->partitions[i]; i++) {
737                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
738                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
739                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
740                 }
741                 ret = ldb_next_start_trans(data->partitions[i]->module);
742                 if (ret != LDB_SUCCESS) {
743                         /* Back it out, if it fails on one */
744                         for (i--; i >= 0; i--) {
745                                 ldb_next_del_trans(data->partitions[i]->module);
746                         }
747                         ldb_next_del_trans(module);
748                         return ret;
749                 }
750         }
751
752         data->in_transaction++;
753
754         return LDB_SUCCESS;
755 }
756
757 /* prepare for a commit */
758 static int partition_prepare_commit(struct ldb_module *module)
759 {
760         unsigned int i;
761         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
762                                                               struct partition_private_data);
763
764         for (i=0; data && data->partitions && data->partitions[i]; i++) {
765                 int ret;
766
767                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
768                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
769                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
770                 }
771                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
772                 if (ret != LDB_SUCCESS) {
773                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
774                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
775                                                ldb_errstring(ldb_module_get_ctx(module)));
776                         return ret;
777                 }
778         }
779
780         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
781                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
782         }
783         return ldb_next_prepare_commit(module);
784 }
785
786
787 /* end a transaction */
788 static int partition_end_trans(struct ldb_module *module)
789 {
790         int ret, ret2;
791         unsigned int i;
792         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
793                                                               struct partition_private_data);
794
795         ret = LDB_SUCCESS;
796
797         if (data->in_transaction == 0) {
798                 DEBUG(0,("partition end transaction mismatch\n"));
799                 ret = LDB_ERR_OPERATIONS_ERROR;
800         } else {
801                 data->in_transaction--;
802         }
803
804         for (i=0; data && data->partitions && data->partitions[i]; i++) {
805                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
806                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
807                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
808                 }
809                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
810                 if (ret2 != LDB_SUCCESS) {
811                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
812                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
813                                                ldb_errstring(ldb_module_get_ctx(module)));
814                         ret = ret2;
815                 }
816         }
817
818         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
819                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
820         }
821         ret2 = ldb_next_end_trans(module);
822         if (ret2 != LDB_SUCCESS) {
823                 ret = ret2;
824         }
825         return ret;
826 }
827
828 /* delete a transaction */
829 static int partition_del_trans(struct ldb_module *module)
830 {
831         int ret, final_ret = LDB_SUCCESS;
832         unsigned int i;
833         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
834                                                               struct partition_private_data);
835         for (i=0; data && data->partitions && data->partitions[i]; i++) {
836                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
837                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
838                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
839                 }
840                 ret = ldb_next_del_trans(data->partitions[i]->module);
841                 if (ret != LDB_SUCCESS) {
842                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
843                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
844                                                ldb_errstring(ldb_module_get_ctx(module)));
845                         final_ret = ret;
846                 }
847         }       
848
849         if (data->in_transaction == 0) {
850                 DEBUG(0,("partition del transaction mismatch\n"));
851                 return ldb_operr(ldb_module_get_ctx(module));
852         }
853         data->in_transaction--;
854
855         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
856                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
857         }
858         ret = ldb_next_del_trans(module);
859         if (ret != LDB_SUCCESS) {
860                 final_ret = ret;
861         }
862         return final_ret;
863 }
864
865 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
866                                      enum ldb_sequence_type type, uint64_t *seq_number) 
867 {
868         int ret;
869         struct ldb_result *res;
870         struct ldb_seqnum_request *tseq;
871         struct ldb_request *treq;
872         struct ldb_seqnum_result *seqr;
873         res = talloc_zero(mem_ctx, struct ldb_result);
874         if (res == NULL) {
875                 return ldb_oom(ldb_module_get_ctx(module));
876         }
877         tseq = talloc_zero(res, struct ldb_seqnum_request);
878         if (tseq == NULL) {
879                 talloc_free(res);
880                 return ldb_oom(ldb_module_get_ctx(module));
881         }
882         tseq->type = type;
883         
884         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
885                                      LDB_EXTENDED_SEQUENCE_NUMBER,
886                                      tseq,
887                                      NULL,
888                                      res,
889                                      ldb_extended_default_callback,
890                                      NULL);
891         LDB_REQ_SET_LOCATION(treq);
892         if (ret != LDB_SUCCESS) {
893                 talloc_free(res);
894                 return ret;
895         }
896         
897         ret = ldb_next_request(module, treq);
898         if (ret != LDB_SUCCESS) {
899                 talloc_free(res);
900                 return ret;
901         }
902         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
903         if (ret != LDB_SUCCESS) {
904                 talloc_free(res);
905                 return ret;
906         }
907         
908         seqr = talloc_get_type(res->extended->data,
909                                struct ldb_seqnum_result);
910         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
911                 ret = LDB_ERR_OPERATIONS_ERROR;
912                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
913                 talloc_free(res);
914                 return ret;
915         } else {
916                 *seq_number = seqr->seq_num;
917         }
918         talloc_free(res);
919         return LDB_SUCCESS;
920 }
921
922 /* FIXME: This function is still semi-async */
923 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
924 {
925         int ret;
926         unsigned int i;
927         uint64_t seq_number = 0;
928         uint64_t timestamp_sequence = 0;
929         uint64_t timestamp = 0;
930         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
931                                                               struct partition_private_data);
932         struct ldb_seqnum_request *seq;
933         struct ldb_seqnum_result *seqr;
934         struct ldb_request *treq;
935         struct ldb_seqnum_request *tseq;
936         struct ldb_seqnum_result *tseqr;
937         struct ldb_extended *ext;
938         struct ldb_result *res;
939         struct dsdb_partition *p;
940
941         p = find_partition(data, NULL, req);
942         if (p != NULL) {
943                 /* the caller specified what partition they want the
944                  * sequence number operation on - just pass it on
945                  */
946                 return ldb_next_request(p->module, req);                
947         }
948
949         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
950
951         switch (seq->type) {
952         case LDB_SEQ_NEXT:
953         case LDB_SEQ_HIGHEST_SEQ:
954
955                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
956                 if (ret != LDB_SUCCESS) {
957                         return ret;
958                 }
959
960                 /* Skip the lot if 'data' isn't here yet (initialisation) */
961                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
962
963                         res = talloc_zero(req, struct ldb_result);
964                         if (res == NULL) {
965                                 return ldb_oom(ldb_module_get_ctx(module));
966                         }
967                         tseq = talloc_zero(res, struct ldb_seqnum_request);
968                         if (tseq == NULL) {
969                                 talloc_free(res);
970                                 return ldb_oom(ldb_module_get_ctx(module));
971                         }
972                         tseq->type = seq->type;
973
974                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
975                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
976                                                      tseq,
977                                                      NULL,
978                                                      res,
979                                                      ldb_extended_default_callback,
980                                                      req);
981                         LDB_REQ_SET_LOCATION(treq);
982                         if (ret != LDB_SUCCESS) {
983                                 talloc_free(res);
984                                 return ret;
985                         }
986
987                         ret = ldb_request_add_control(treq,
988                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
989                                                       false, data->partitions[i]->ctrl);
990                         if (ret != LDB_SUCCESS) {
991                                 talloc_free(res);
992                                 return ret;
993                         }
994
995                         ret = partition_request(data->partitions[i]->module, treq);
996                         if (ret != LDB_SUCCESS) {
997                                 talloc_free(res);
998                                 return ret;
999                         }
1000                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1001                         if (ret != LDB_SUCCESS) {
1002                                 talloc_free(res);
1003                                 return ret;
1004                         }
1005                         tseqr = talloc_get_type(res->extended->data,
1006                                                 struct ldb_seqnum_result);
1007                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1008                                 timestamp_sequence = MAX(timestamp_sequence,
1009                                                          tseqr->seq_num);
1010                         } else {
1011                                 seq_number += tseqr->seq_num;
1012                         }
1013                         talloc_free(res);
1014                 }
1015                 /* fall through */
1016         case LDB_SEQ_HIGHEST_TIMESTAMP:
1017
1018                 res = talloc_zero(req, struct ldb_result);
1019                 if (res == NULL) {
1020                         return ldb_oom(ldb_module_get_ctx(module));
1021                 }
1022
1023                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1024                 if (tseq == NULL) {
1025                         talloc_free(res);
1026                         return ldb_oom(ldb_module_get_ctx(module));
1027                 }
1028                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1029
1030                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1031                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1032                                              tseq,
1033                                              NULL,
1034                                              res,
1035                                              ldb_extended_default_callback,
1036                                              req);
1037                 LDB_REQ_SET_LOCATION(treq);
1038                 if (ret != LDB_SUCCESS) {
1039                         talloc_free(res);
1040                         return ret;
1041                 }
1042
1043                 ret = ldb_next_request(module, treq);
1044                 if (ret != LDB_SUCCESS) {
1045                         talloc_free(res);
1046                         return ret;
1047                 }
1048                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1049                 if (ret != LDB_SUCCESS) {
1050                         talloc_free(res);
1051                         return ret;
1052                 }
1053
1054                 tseqr = talloc_get_type(res->extended->data,
1055                                            struct ldb_seqnum_result);
1056                 timestamp = tseqr->seq_num;
1057
1058                 talloc_free(res);
1059
1060                 /* Skip the lot if 'data' isn't here yet (initialisation) */
1061                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1062
1063                         res = talloc_zero(req, struct ldb_result);
1064                         if (res == NULL) {
1065                                 return ldb_oom(ldb_module_get_ctx(module));
1066                         }
1067
1068                         tseq = talloc_zero(res, struct ldb_seqnum_request);
1069                         if (tseq == NULL) {
1070                                 talloc_free(res);
1071                                 return ldb_oom(ldb_module_get_ctx(module));
1072                         }
1073                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1074
1075                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1076                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
1077                                                      tseq,
1078                                                      NULL,
1079                                                      res,
1080                                                      ldb_extended_default_callback,
1081                                                      req);
1082                         LDB_REQ_SET_LOCATION(treq);
1083                         if (ret != LDB_SUCCESS) {
1084                                 talloc_free(res);
1085                                 return ret;
1086                         }
1087
1088                         ret = ldb_request_add_control(treq,
1089                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
1090                                                       false, data->partitions[i]->ctrl);
1091                         if (ret != LDB_SUCCESS) {
1092                                 talloc_free(res);
1093                                 return ret;
1094                         }
1095
1096                         ret = partition_request(data->partitions[i]->module, treq);
1097                         if (ret != LDB_SUCCESS) {
1098                                 talloc_free(res);
1099                                 return ret;
1100                         }
1101                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1102                         if (ret != LDB_SUCCESS) {
1103                                 talloc_free(res);
1104                                 return ret;
1105                         }
1106
1107                         tseqr = talloc_get_type(res->extended->data,
1108                                                   struct ldb_seqnum_result);
1109                         timestamp = MAX(timestamp, tseqr->seq_num);
1110
1111                         talloc_free(res);
1112                 }
1113
1114                 break;
1115         }
1116
1117         ext = talloc_zero(req, struct ldb_extended);
1118         if (!ext) {
1119                 return ldb_oom(ldb_module_get_ctx(module));
1120         }
1121         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1122         if (seqr == NULL) {
1123                 talloc_free(ext);
1124                 return ldb_oom(ldb_module_get_ctx(module));
1125         }
1126         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1127         ext->data = seqr;
1128
1129         switch (seq->type) {
1130         case LDB_SEQ_NEXT:
1131         case LDB_SEQ_HIGHEST_SEQ:
1132
1133                 /* Has someone above set a timebase sequence? */
1134                 if (timestamp_sequence) {
1135                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1136                 } else {
1137                         seqr->seq_num = seq_number;
1138                 }
1139
1140                 if (timestamp_sequence > seqr->seq_num) {
1141                         seqr->seq_num = timestamp_sequence;
1142                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1143                 }
1144
1145                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1146                 break;
1147         case LDB_SEQ_HIGHEST_TIMESTAMP:
1148                 seqr->seq_num = timestamp;
1149                 break;
1150         }
1151
1152         if (seq->type == LDB_SEQ_NEXT) {
1153                 seqr->seq_num++;
1154         }
1155
1156         /* send request done */
1157         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1158 }
1159
1160 /* extended */
1161 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1162 {
1163         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1164                                                               struct partition_private_data);
1165         struct partition_context *ac;
1166         int ret;
1167
1168         /* if we aren't initialised yet go further */
1169         if (!data) {
1170                 return ldb_next_request(module, req);
1171         }
1172
1173         /* see if we are still up-to-date */
1174         ret = partition_reload_if_required(module, data, req);
1175         if (ret != LDB_SUCCESS) {
1176                 return ret;
1177         }
1178         
1179         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1180                 return partition_sequence_number(module, req);
1181         }
1182
1183         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1184                 return partition_create(module, req);
1185         }
1186
1187         /* 
1188          * as the extended operation has no dn
1189          * we need to send it to all partitions
1190          */
1191
1192         ac = partition_init_ctx(module, req);
1193         if (!ac) {
1194                 return ldb_operr(ldb_module_get_ctx(module));
1195         }
1196
1197         return partition_send_all(module, ac, req);
1198 }
1199
1200 static const struct ldb_module_ops ldb_partition_module_ops = {
1201         .name              = "partition",
1202         .init_context      = partition_init,
1203         .search            = partition_search,
1204         .add               = partition_add,
1205         .modify            = partition_modify,
1206         .del               = partition_delete,
1207         .rename            = partition_rename,
1208         .extended          = partition_extended,
1209         .start_transaction = partition_start_trans,
1210         .prepare_commit    = partition_prepare_commit,
1211         .end_transaction   = partition_end_trans,
1212         .del_transaction   = partition_del_trans,
1213 };
1214
1215 int ldb_partition_module_init(const char *version)
1216 {
1217         LDB_MODULE_CHECK_VERSION(version);
1218         return ldb_register_module(&ldb_partition_module_ops);
1219 }