c5bbdf8dce35b9c3fff78735026797b6ed5bd887
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "includes.h"
33 #include "lib/ldb/include/ldb.h"
34 #include "lib/ldb/include/ldb_errors.h"
35 #include "lib/ldb/include/ldb_module.h"
36 #include "lib/ldb/include/ldb_private.h"
37 #include "dsdb/samdb/samdb.h"
38
39 struct dsdb_partition {
40         struct ldb_module *module;
41         struct dsdb_control_current_partition *ctrl;
42 };
43
44 struct partition_private_data {
45         struct dsdb_partition **partitions;
46         struct ldb_dn **replicate;
47 };
48
49 struct part_request {
50         struct ldb_module *module;
51         struct ldb_request *req;
52 };
53
54 struct partition_context {
55         struct ldb_module *module;
56         struct ldb_request *req;
57         bool got_success;
58
59         struct part_request *part_req;
60         int num_requests;
61         int finished_requests;
62 };
63
64 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
65 {
66         struct partition_context *ac;
67
68         ac = talloc_zero(req, struct partition_context);
69         if (ac == NULL) {
70                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
71                 return NULL;
72         }
73
74         ac->module = module;
75         ac->req = req;
76
77         return ac;
78 }
79
80 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
81         while (module && module->ops->op == NULL) module = module->next; \
82 } while (0)
83
84 #define PARTITION_FIND_OP(module, op) do { \
85         PARTITION_FIND_OP_NOERROR(module, op); \
86         if (module == NULL) { \
87                 ldb_asprintf_errstring(ldb_module_get_ctx(module), \
88                         "Unable to find backend operation for " #op ); \
89                 return LDB_ERR_OPERATIONS_ERROR; \
90         } \
91 } while (0)
92
93 /*
94  *    helper functions to call the next module in chain
95  *    */
96
97 static int partition_request(struct ldb_module *module, struct ldb_request *request)
98 {
99         int ret;
100         switch (request->operation) {
101         case LDB_SEARCH:
102                 PARTITION_FIND_OP(module, search);
103                 ret = module->ops->search(module, request);
104                 break;
105         case LDB_ADD:
106                 PARTITION_FIND_OP(module, add);
107                 ret = module->ops->add(module, request);
108                 break;
109         case LDB_MODIFY:
110                 PARTITION_FIND_OP(module, modify);
111                 ret = module->ops->modify(module, request);
112                 break;
113         case LDB_DELETE:
114                 PARTITION_FIND_OP(module, del);
115                 ret = module->ops->del(module, request);
116                 break;
117         case LDB_RENAME:
118                 PARTITION_FIND_OP(module, rename);
119                 ret = module->ops->rename(module, request);
120                 break;
121         case LDB_EXTENDED:
122                 PARTITION_FIND_OP(module, extended);
123                 ret = module->ops->extended(module, request);
124                 break;
125         default:
126                 PARTITION_FIND_OP(module, request);
127                 ret = module->ops->request(module, request);
128                 break;
129         }
130         if (ret == LDB_SUCCESS) {
131                 return ret;
132         }
133         if (!ldb_errstring(ldb_module_get_ctx(module))) {
134                 /* Set a default error string, to place the blame somewhere */
135                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
136                                         "error in module %s: %s (%d)",
137                                         module->ops->name,
138                                         ldb_strerror(ret), ret);
139         }
140         return ret;
141 }
142
143 static struct dsdb_partition *find_partition(struct partition_private_data *data,
144                                              struct ldb_dn *dn,
145                                              struct ldb_request *req)
146 {
147         int i;
148         struct ldb_control *partition_ctrl;
149
150         /* see if the request has the partition DN specified in a
151          * control. The repl_meta_data module can specify this to
152          * ensure that replication happens to the right partition
153          */
154         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
155         if (partition_ctrl) {
156                 const struct dsdb_control_current_partition *partition;
157                 partition = talloc_get_type(partition_ctrl->data,
158                                             struct dsdb_control_current_partition);
159                 if (partition != NULL) {
160                         dn = partition->dn;
161                 }
162         }
163
164         if (dn == NULL) {
165                 return NULL;
166         }
167
168         /* Look at base DN */
169         /* Figure out which partition it is under */
170         /* Skip the lot if 'data' isn't here yet (initialisation) */
171         for (i=0; data && data->partitions && data->partitions[i]; i++) {
172                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
173                         return data->partitions[i];
174                 }
175         }
176
177         return NULL;
178 }
179
180 /**
181  * fire the caller's callback for every entry, but only send 'done' once.
182  */
183 static int partition_req_callback(struct ldb_request *req,
184                                   struct ldb_reply *ares)
185 {
186         struct partition_context *ac;
187         struct ldb_module *module;
188         struct ldb_request *nreq;
189         int ret, i;
190         struct partition_private_data *data;
191
192         ac = talloc_get_type(req->context, struct partition_context);
193         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
194
195         if (!ares) {
196                 return ldb_module_done(ac->req, NULL, NULL,
197                                         LDB_ERR_OPERATIONS_ERROR);
198         }
199
200         if (ares->error != LDB_SUCCESS && !ac->got_success) {
201                 return ldb_module_done(ac->req, ares->controls,
202                                         ares->response, ares->error);
203         }
204
205         switch (ares->type) {
206         case LDB_REPLY_REFERRAL:
207                 /* ignore referrals for now */
208                 break;
209
210         case LDB_REPLY_ENTRY:
211                 if (ac->req->operation != LDB_SEARCH) {
212                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
213                                 "partition_req_callback:"
214                                 " Unsupported reply type for this request");
215                         return ldb_module_done(ac->req, NULL, NULL,
216                                                 LDB_ERR_OPERATIONS_ERROR);
217                 }
218                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
219                         if (ldb_dn_compare(ares->message->dn, data->partitions[i]->ctrl->dn) == 0) {
220                                 struct ldb_control *part_control;
221                                 /* this is a partition root message - make
222                                    sure it isn't one of our fake root
223                                    entries from a parent partition */
224                                 part_control = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
225                                 if (part_control && part_control->data != data->partitions[i]->ctrl) {
226                                         DEBUG(6,(__location__ ": Discarding partition mount object %s\n",
227                                                  ldb_dn_get_linearized(ares->message->dn)));
228                                         talloc_free(ares);
229                                         return LDB_SUCCESS;
230                                 }
231                         }
232                 }
233                 
234                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
235
236         case LDB_REPLY_DONE:
237                 if (ares->error == LDB_SUCCESS) {
238                         ac->got_success = true;
239                 }
240                 if (ac->req->operation == LDB_EXTENDED) {
241                         /* FIXME: check for ares->response, replmd does not fill it ! */
242                         if (ares->response) {
243                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
244                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
245                                                           "partition_req_callback:"
246                                                           " Unknown extended reply, "
247                                                           "only supports START_TLS");
248                                         talloc_free(ares);
249                                         return ldb_module_done(ac->req, NULL, NULL,
250                                                                 LDB_ERR_OPERATIONS_ERROR);
251                                 }
252                         }
253                 }
254
255                 ac->finished_requests++;
256                 if (ac->finished_requests == ac->num_requests) {
257                         /* this was the last one, call callback */
258                         return ldb_module_done(ac->req, ares->controls,
259                                                ares->response, 
260                                                ac->got_success?LDB_SUCCESS:ares->error);
261                 }
262
263                 /* not the last, now call the next one */
264                 module = ac->part_req[ac->finished_requests].module;
265                 nreq = ac->part_req[ac->finished_requests].req;
266
267                 ret = partition_request(module, nreq);
268                 if (ret != LDB_SUCCESS) {
269                         talloc_free(ares);
270                         return ldb_module_done(ac->req, NULL, NULL, ret);
271                 }
272
273                 break;
274         }
275
276         talloc_free(ares);
277         return LDB_SUCCESS;
278 }
279
280 static int partition_prep_request(struct partition_context *ac,
281                                   struct dsdb_partition *partition)
282 {
283         int ret;
284         struct ldb_request *req;
285
286         ac->part_req = talloc_realloc(ac, ac->part_req,
287                                         struct part_request,
288                                         ac->num_requests + 1);
289         if (ac->part_req == NULL) {
290                 ldb_oom(ldb_module_get_ctx(ac->module));
291                 return LDB_ERR_OPERATIONS_ERROR;
292         }
293
294         switch (ac->req->operation) {
295         case LDB_SEARCH:
296                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
297                                         ac->part_req,
298                                         ac->req->op.search.base,
299                                         ac->req->op.search.scope,
300                                         ac->req->op.search.tree,
301                                         ac->req->op.search.attrs,
302                                         ac->req->controls,
303                                         ac, partition_req_callback,
304                                         ac->req);
305                 break;
306         case LDB_ADD:
307                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
308                                         ac->req->op.add.message,
309                                         ac->req->controls,
310                                         ac, partition_req_callback,
311                                         ac->req);
312                 break;
313         case LDB_MODIFY:
314                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
315                                         ac->req->op.mod.message,
316                                         ac->req->controls,
317                                         ac, partition_req_callback,
318                                         ac->req);
319                 break;
320         case LDB_DELETE:
321                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
322                                         ac->req->op.del.dn,
323                                         ac->req->controls,
324                                         ac, partition_req_callback,
325                                         ac->req);
326                 break;
327         case LDB_RENAME:
328                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
329                                         ac->req->op.rename.olddn,
330                                         ac->req->op.rename.newdn,
331                                         ac->req->controls,
332                                         ac, partition_req_callback,
333                                         ac->req);
334                 break;
335         case LDB_EXTENDED:
336                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
337                                         ac->part_req,
338                                         ac->req->op.extended.oid,
339                                         ac->req->op.extended.data,
340                                         ac->req->controls,
341                                         ac, partition_req_callback,
342                                         ac->req);
343                 break;
344         default:
345                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
346                                   "Unsupported request type!");
347                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
348         }
349
350         if (ret != LDB_SUCCESS) {
351                 return ret;
352         }
353
354         ac->part_req[ac->num_requests].req = req;
355
356         if (ac->req->controls) {
357                 req->controls = talloc_memdup(req, ac->req->controls,
358                                         talloc_get_size(ac->req->controls));
359                 if (req->controls == NULL) {
360                         ldb_oom(ldb_module_get_ctx(ac->module));
361                         return LDB_ERR_OPERATIONS_ERROR;
362                 }
363         }
364
365         if (partition) {
366                 ac->part_req[ac->num_requests].module = partition->module;
367
368                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
369                         ret = ldb_request_add_control(req,
370                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
371                                                       false, partition->ctrl);
372                         if (ret != LDB_SUCCESS) {
373                                 return ret;
374                         }
375                 }
376
377                 if (req->operation == LDB_SEARCH) {
378                         /* If the search is for 'more' than this partition,
379                          * then change the basedn, so a remote LDAP server
380                          * doesn't object */
381                         if (ldb_dn_compare_base(partition->ctrl->dn,
382                                                 req->op.search.base) != 0) {
383                                 req->op.search.base = partition->ctrl->dn;
384                         }
385                 }
386
387         } else {
388                 /* make sure you put the NEXT module here, or
389                  * partition_request() will simply loop forever on itself */
390                 ac->part_req[ac->num_requests].module = ac->module->next;
391         }
392
393         ac->num_requests++;
394
395         return LDB_SUCCESS;
396 }
397
398 static int partition_call_first(struct partition_context *ac)
399 {
400         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
401 }
402
403 /**
404  * Send a request down to all the partitions
405  */
406 static int partition_send_all(struct ldb_module *module, 
407                               struct partition_context *ac, 
408                               struct ldb_request *req) 
409 {
410         int i;
411         struct partition_private_data *data = talloc_get_type(module->private_data, 
412                                                               struct partition_private_data);
413         int ret = partition_prep_request(ac, NULL);
414         if (ret != LDB_SUCCESS) {
415                 return ret;
416         }
417         for (i=0; data && data->partitions && data->partitions[i]; i++) {
418                 ret = partition_prep_request(ac, data->partitions[i]);
419                 if (ret != LDB_SUCCESS) {
420                         return ret;
421                 }
422         }
423
424         /* fire the first one */
425         return partition_call_first(ac);
426 }
427
428 /**
429  * Figure out which backend a request needs to be aimed at.  Some
430  * requests must be replicated to all backends
431  */
432 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
433 {
434         struct partition_context *ac;
435         unsigned i;
436         int ret;
437         struct dsdb_partition *partition;
438         struct partition_private_data *data = talloc_get_type(module->private_data, 
439                                                               struct partition_private_data);
440         if (!data || !data->partitions) {
441                 return ldb_next_request(module, req);
442         }
443         
444         if (req->operation != LDB_SEARCH) {
445                 /* Is this a special DN, we need to replicate to every backend? */
446                 for (i=0; data->replicate && data->replicate[i]; i++) {
447                         if (ldb_dn_compare(data->replicate[i], 
448                                            dn) == 0) {
449                                 
450                                 ac = partition_init_ctx(module, req);
451                                 if (!ac) {
452                                         return LDB_ERR_OPERATIONS_ERROR;
453                                 }
454                                 
455                                 return partition_send_all(module, ac, req);
456                         }
457                 }
458         }
459
460         /* Otherwise, we need to find the partition to fire it to */
461
462         /* Find partition */
463         partition = find_partition(data, dn, req);
464         if (!partition) {
465                 /*
466                  * if we haven't found a matching partition
467                  * pass the request to the main ldb
468                  *
469                  * TODO: we should maybe return an error here
470                  *       if it's not a special dn
471                  */
472
473                 return ldb_next_request(module, req);
474         }
475
476         ac = partition_init_ctx(module, req);
477         if (!ac) {
478                 return LDB_ERR_OPERATIONS_ERROR;
479         }
480
481         /* we need to add a control but we never touch the original request */
482         ret = partition_prep_request(ac, partition);
483         if (ret != LDB_SUCCESS) {
484                 return ret;
485         }
486
487         /* fire the first one */
488         return partition_call_first(ac);
489 }
490
491 /* search */
492 static int partition_search(struct ldb_module *module, struct ldb_request *req)
493 {
494         struct ldb_control **saved_controls;
495         /* Find backend */
496         struct partition_private_data *data = talloc_get_type(module->private_data, 
497                                                               struct partition_private_data);
498
499         /* issue request */
500
501         /* (later) consider if we should be searching multiple
502          * partitions (for 'invisible' partition behaviour */
503
504         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
505         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
506         
507         struct ldb_search_options_control *search_options = NULL;
508         struct dsdb_partition *p;
509         
510         p = find_partition(data, NULL, req);
511         if (p != NULL) {
512                 /* the caller specified what partition they want the
513                  * search - just pass it on
514                  */
515                 return ldb_next_request(p->module, req);                
516         }
517
518
519         if (search_control) {
520                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
521         }
522
523         /* Remove the domain_scope control, so we don't confuse a backend server */
524         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
525                 ldb_oom(ldb_module_get_ctx(module));
526                 return LDB_ERR_OPERATIONS_ERROR;
527         }
528
529         /*
530          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
531          * down as uncritical to make windows 2008 dcpromo happy.
532          */
533         if (search_control) {
534                 search_control->critical = 0;
535         }
536
537         /* TODO:
538            Generate referrals (look for a partition under this DN) if we don't have the above control specified
539         */
540         
541         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
542                 int ret, i;
543                 struct partition_context *ac;
544                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
545                         /* We have processed this flag, so we are done with this control now */
546
547                         /* Remove search control, so we don't confuse a backend server */
548                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
549                                 ldb_oom(ldb_module_get_ctx(module));
550                                 return LDB_ERR_OPERATIONS_ERROR;
551                         }
552                 }
553                 ac = partition_init_ctx(module, req);
554                 if (!ac) {
555                         return LDB_ERR_OPERATIONS_ERROR;
556                 }
557
558                 /* Search from the base DN */
559                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
560                         return partition_send_all(module, ac, req);
561                 }
562                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
563                         bool match = false, stop = false;
564                         /* Find all partitions under the search base 
565                            
566                            we match if:
567
568                               1) the DN we are looking for exactly matches the partition
569                              or
570                               2) the DN we are looking for is a parent of the partition and it isn't
571                                  a scope base search
572                              or
573                               3) the DN we are looking for is a child of the partition
574                          */
575                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
576                                 match = true;
577                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
578                                         stop = true;
579                                 }
580                         }
581                         if (!match && 
582                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
583                              req->op.search.scope != LDB_SCOPE_BASE)) {
584                                 match = true;
585                         }
586                         if (!match &&
587                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
588                                 match = true;
589                                 stop = true; /* note that this relies on partition ordering */
590                         }
591                         if (match) {
592                                 ret = partition_prep_request(ac, data->partitions[i]);
593                                 if (ret != LDB_SUCCESS) {
594                                         return ret;
595                                 }
596                         }
597                         if (stop) break;
598                 }
599
600                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
601                 if (ac->num_requests == 0) {
602                         talloc_free(ac);
603                         return ldb_next_request(module, req);
604                 }
605
606                 /* fire the first one */
607                 return partition_call_first(ac);
608
609         } else {
610                 /* Handle this like all other requests */
611                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
612                         /* We have processed this flag, so we are done with this control now */
613
614                         /* Remove search control, so we don't confuse a backend server */
615                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
616                                 ldb_oom(ldb_module_get_ctx(module));
617                                 return LDB_ERR_OPERATIONS_ERROR;
618                         }
619                 }
620
621                 return partition_replicate(module, req, req->op.search.base);
622         }
623 }
624
625 /* add */
626 static int partition_add(struct ldb_module *module, struct ldb_request *req)
627 {
628         return partition_replicate(module, req, req->op.add.message->dn);
629 }
630
631 /* modify */
632 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
633 {
634         return partition_replicate(module, req, req->op.mod.message->dn);
635 }
636
637 /* delete */
638 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
639 {
640         return partition_replicate(module, req, req->op.del.dn);
641 }
642
643 /* rename */
644 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
645 {
646         /* Find backend */
647         struct dsdb_partition *backend, *backend2;
648         
649         struct partition_private_data *data = talloc_get_type(module->private_data, 
650                                                               struct partition_private_data);
651
652         /* Skip the lot if 'data' isn't here yet (initialisation) */
653         if (!data) {
654                 return LDB_ERR_OPERATIONS_ERROR;
655         }
656
657         backend = find_partition(data, req->op.rename.olddn, req);
658         backend2 = find_partition(data, req->op.rename.newdn, req);
659
660         if ((backend && !backend2) || (!backend && backend2)) {
661                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
662         }
663
664         if (backend != backend2) {
665                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
666                                        "Cannot rename from %s in %s to %s in %s: %s",
667                                        ldb_dn_get_linearized(req->op.rename.olddn),
668                                        ldb_dn_get_linearized(backend->ctrl->dn),
669                                        ldb_dn_get_linearized(req->op.rename.newdn),
670                                        ldb_dn_get_linearized(backend2->ctrl->dn),
671                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
672                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
673         }
674
675         return partition_replicate(module, req, req->op.rename.olddn);
676 }
677
678 /* start a transaction */
679 static int partition_start_trans(struct ldb_module *module)
680 {
681         int i, ret;
682         struct partition_private_data *data = talloc_get_type(module->private_data, 
683                                                               struct partition_private_data);
684         /* Look at base DN */
685         /* Figure out which partition it is under */
686         /* Skip the lot if 'data' isn't here yet (initialization) */
687         ret = ldb_next_start_trans(module);
688         if (ret != LDB_SUCCESS) {
689                 return ret;
690         }
691
692         for (i=0; data && data->partitions && data->partitions[i]; i++) {
693                 struct ldb_module *next = data->partitions[i]->module;
694                 PARTITION_FIND_OP(next, start_transaction);
695
696                 ret = next->ops->start_transaction(next);
697                 if (ret != LDB_SUCCESS) {
698                         /* Back it out, if it fails on one */
699                         for (i--; i >= 0; i--) {
700                                 next = data->partitions[i]->module;
701                                 PARTITION_FIND_OP(next, del_transaction);
702
703                                 next->ops->del_transaction(next);
704                         }
705                         ldb_next_del_trans(module);
706                         return ret;
707                 }
708         }
709         return LDB_SUCCESS;
710 }
711
712 /* prepare for a commit */
713 static int partition_prepare_commit(struct ldb_module *module)
714 {
715         int i;
716         struct partition_private_data *data = talloc_get_type(module->private_data, 
717                                                               struct partition_private_data);
718
719         for (i=0; data && data->partitions && data->partitions[i]; i++) {
720                 struct ldb_module *next_prepare = data->partitions[i]->module;
721                 int ret;
722
723                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
724                 if (next_prepare == NULL) {
725                         continue;
726                 }
727
728                 ret = next_prepare->ops->prepare_commit(next_prepare);
729                 if (ret != LDB_SUCCESS) {
730                         return ret;
731                 }
732         }
733
734         return ldb_next_prepare_commit(module);
735 }
736
737
738 /* end a transaction */
739 static int partition_end_trans(struct ldb_module *module)
740 {
741         int i;
742         struct partition_private_data *data = talloc_get_type(module->private_data, 
743                                                               struct partition_private_data);
744         for (i=0; data && data->partitions && data->partitions[i]; i++) {
745                 struct ldb_module *next_end = data->partitions[i]->module;
746                 int ret;
747
748                 PARTITION_FIND_OP(next_end, end_transaction);
749
750                 ret = next_end->ops->end_transaction(next_end);
751                 if (ret != LDB_SUCCESS) {
752                         return ret;
753                 }
754         }
755
756         return ldb_next_end_trans(module);
757 }
758
759 /* delete a transaction */
760 static int partition_del_trans(struct ldb_module *module)
761 {
762         int i, ret, final_ret = LDB_SUCCESS;
763         struct partition_private_data *data = talloc_get_type(module->private_data, 
764                                                               struct partition_private_data);
765         for (i=0; data && data->partitions && data->partitions[i]; i++) {
766                 struct ldb_module *next = data->partitions[i]->module;
767                 PARTITION_FIND_OP(next, del_transaction);
768
769                 ret = next->ops->del_transaction(next);
770                 if (ret != LDB_SUCCESS) {
771                         final_ret = ret;
772                 }
773         }       
774
775         ret = ldb_next_del_trans(module);
776         if (ret != LDB_SUCCESS) {
777                 final_ret = ret;
778         }
779         return final_ret;
780 }
781
782
783 /* FIXME: This function is still semi-async */
784 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
785 {
786         int i, ret;
787         uint64_t seq_number = 0;
788         uint64_t timestamp_sequence = 0;
789         uint64_t timestamp = 0;
790         struct partition_private_data *data = talloc_get_type(module->private_data, 
791                                                               struct partition_private_data);
792         struct ldb_seqnum_request *seq;
793         struct ldb_seqnum_result *seqr;
794         struct ldb_request *treq;
795         struct ldb_seqnum_request *tseq;
796         struct ldb_seqnum_result *tseqr;
797         struct ldb_extended *ext;
798         struct ldb_result *res;
799         struct dsdb_partition *p;
800
801         p = find_partition(data, NULL, req);
802         if (p != NULL) {
803                 /* the caller specified what partition they want the
804                  * sequence number operation on - just pass it on
805                  */
806                 return ldb_next_request(p->module, req);                
807         }
808
809         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
810
811         switch (seq->type) {
812         case LDB_SEQ_NEXT:
813         case LDB_SEQ_HIGHEST_SEQ:
814                 res = talloc_zero(req, struct ldb_result);
815                 if (res == NULL) {
816                         return LDB_ERR_OPERATIONS_ERROR;
817                 }
818                 tseq = talloc_zero(res, struct ldb_seqnum_request);
819                 if (tseq == NULL) {
820                         talloc_free(res);
821                         return LDB_ERR_OPERATIONS_ERROR;
822                 }
823                 tseq->type = seq->type;
824
825                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
826                                              LDB_EXTENDED_SEQUENCE_NUMBER,
827                                              tseq,
828                                              NULL,
829                                              res,
830                                              ldb_extended_default_callback,
831                                              NULL);
832                 if (ret != LDB_SUCCESS) {
833                         talloc_free(res);
834                         return ret;
835                 }
836
837                 ret = ldb_next_request(module, treq);
838                 if (ret != LDB_SUCCESS) {
839                         talloc_free(res);
840                         return ret;
841                 }
842                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
843                 if (ret != LDB_SUCCESS) {
844                         talloc_free(res);
845                         return ret;
846                 }
847
848                 seqr = talloc_get_type(res->extended->data,
849                                         struct ldb_seqnum_result);
850                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
851                         timestamp_sequence = seqr->seq_num;
852                 } else {
853                         seq_number += seqr->seq_num;
854                 }
855                 talloc_free(res);
856
857                 /* Skip the lot if 'data' isn't here yet (initialisation) */
858                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
859
860                         res = talloc_zero(req, struct ldb_result);
861                         if (res == NULL) {
862                                 return LDB_ERR_OPERATIONS_ERROR;
863                         }
864                         tseq = talloc_zero(res, struct ldb_seqnum_request);
865                         if (tseq == NULL) {
866                                 talloc_free(res);
867                                 return LDB_ERR_OPERATIONS_ERROR;
868                         }
869                         tseq->type = seq->type;
870
871                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
872                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
873                                                      tseq,
874                                                      NULL,
875                                                      res,
876                                                      ldb_extended_default_callback,
877                                                      NULL);
878                         if (ret != LDB_SUCCESS) {
879                                 talloc_free(res);
880                                 return ret;
881                         }
882
883                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
884                                 ret = ldb_request_add_control(treq,
885                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
886                                                               false, data->partitions[i]->ctrl);
887                                 if (ret != LDB_SUCCESS) {
888                                         talloc_free(res);
889                                         return ret;
890                                 }
891                         }
892
893                         ret = partition_request(data->partitions[i]->module, treq);
894                         if (ret != LDB_SUCCESS) {
895                                 talloc_free(res);
896                                 return ret;
897                         }
898                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
899                         if (ret != LDB_SUCCESS) {
900                                 talloc_free(res);
901                                 return ret;
902                         }
903                         tseqr = talloc_get_type(res->extended->data,
904                                                 struct ldb_seqnum_result);
905                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
906                                 timestamp_sequence = MAX(timestamp_sequence,
907                                                          tseqr->seq_num);
908                         } else {
909                                 seq_number += tseqr->seq_num;
910                         }
911                         talloc_free(res);
912                 }
913                 /* fall through */
914         case LDB_SEQ_HIGHEST_TIMESTAMP:
915
916                 res = talloc_zero(req, struct ldb_result);
917                 if (res == NULL) {
918                         return LDB_ERR_OPERATIONS_ERROR;
919                 }
920
921                 tseq = talloc_zero(res, struct ldb_seqnum_request);
922                 if (tseq == NULL) {
923                         talloc_free(res);
924                         return LDB_ERR_OPERATIONS_ERROR;
925                 }
926                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
927
928                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
929                                              LDB_EXTENDED_SEQUENCE_NUMBER,
930                                              tseq,
931                                              NULL,
932                                              res,
933                                              ldb_extended_default_callback,
934                                              NULL);
935                 if (ret != LDB_SUCCESS) {
936                         talloc_free(res);
937                         return ret;
938                 }
939
940                 ret = ldb_next_request(module, treq);
941                 if (ret != LDB_SUCCESS) {
942                         talloc_free(res);
943                         return ret;
944                 }
945                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
946                 if (ret != LDB_SUCCESS) {
947                         talloc_free(res);
948                         return ret;
949                 }
950
951                 tseqr = talloc_get_type(res->extended->data,
952                                            struct ldb_seqnum_result);
953                 timestamp = tseqr->seq_num;
954
955                 talloc_free(res);
956
957                 /* Skip the lot if 'data' isn't here yet (initialisation) */
958                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
959
960                         res = talloc_zero(req, struct ldb_result);
961                         if (res == NULL) {
962                                 return LDB_ERR_OPERATIONS_ERROR;
963                         }
964
965                         tseq = talloc_zero(res, struct ldb_seqnum_request);
966                         if (tseq == NULL) {
967                                 talloc_free(res);
968                                 return LDB_ERR_OPERATIONS_ERROR;
969                         }
970                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
971
972                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
973                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
974                                                      tseq,
975                                                      NULL,
976                                                      res,
977                                                      ldb_extended_default_callback,
978                                                      NULL);
979                         if (ret != LDB_SUCCESS) {
980                                 talloc_free(res);
981                                 return ret;
982                         }
983
984                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
985                                 ret = ldb_request_add_control(treq,
986                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
987                                                               false, data->partitions[i]->ctrl);
988                                 if (ret != LDB_SUCCESS) {
989                                         talloc_free(res);
990                                         return ret;
991                                 }
992                         }
993
994                         ret = partition_request(data->partitions[i]->module, treq);
995                         if (ret != LDB_SUCCESS) {
996                                 talloc_free(res);
997                                 return ret;
998                         }
999                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1000                         if (ret != LDB_SUCCESS) {
1001                                 talloc_free(res);
1002                                 return ret;
1003                         }
1004
1005                         tseqr = talloc_get_type(res->extended->data,
1006                                                   struct ldb_seqnum_result);
1007                         timestamp = MAX(timestamp, tseqr->seq_num);
1008
1009                         talloc_free(res);
1010                 }
1011
1012                 break;
1013         }
1014
1015         ext = talloc_zero(req, struct ldb_extended);
1016         if (!ext) {
1017                 return LDB_ERR_OPERATIONS_ERROR;
1018         }
1019         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1020         if (seqr == NULL) {
1021                 talloc_free(ext);
1022                 return LDB_ERR_OPERATIONS_ERROR;
1023         }
1024         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1025         ext->data = seqr;
1026
1027         switch (seq->type) {
1028         case LDB_SEQ_NEXT:
1029         case LDB_SEQ_HIGHEST_SEQ:
1030
1031                 /* Has someone above set a timebase sequence? */
1032                 if (timestamp_sequence) {
1033                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1034                 } else {
1035                         seqr->seq_num = seq_number;
1036                 }
1037
1038                 if (timestamp_sequence > seqr->seq_num) {
1039                         seqr->seq_num = timestamp_sequence;
1040                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1041                 }
1042
1043                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1044                 break;
1045         case LDB_SEQ_HIGHEST_TIMESTAMP:
1046                 seqr->seq_num = timestamp;
1047                 break;
1048         }
1049
1050         if (seq->type == LDB_SEQ_NEXT) {
1051                 seqr->seq_num++;
1052         }
1053
1054         /* send request done */
1055         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1056 }
1057
1058 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1059 {
1060         struct dsdb_partition *partition;
1061         struct partition_private_data *data;
1062         struct ldb_dn *schema_dn;
1063         struct partition_context *ac;
1064         int ret;
1065
1066         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1067         if (!schema_dn) {
1068                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1069                 return LDB_ERR_PROTOCOL_ERROR;
1070         }
1071
1072         data = talloc_get_type(module->private_data, struct partition_private_data);
1073         if (!data) {
1074                 return LDB_ERR_OPERATIONS_ERROR;
1075         }
1076         
1077         partition = find_partition( data, schema_dn, req);
1078         if (!partition) {
1079                 return ldb_next_request(module, req);
1080         }
1081
1082         ac = partition_init_ctx(module, req);
1083         if (!ac) {
1084                 return LDB_ERR_OPERATIONS_ERROR;
1085         }
1086
1087         /* we need to add a control but we never touch the original request */
1088         ret = partition_prep_request(ac, partition);
1089         if (ret != LDB_SUCCESS) {
1090                 return ret;
1091         }
1092
1093         /* fire the first one */
1094         ret = partition_call_first(ac);
1095
1096         if (ret != LDB_SUCCESS){
1097                 return ret;
1098         }
1099
1100         return ldb_request_done(req, ret);
1101 }
1102
1103
1104 /* extended */
1105 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1106 {
1107         struct partition_private_data *data;
1108         struct partition_context *ac;
1109
1110         data = talloc_get_type(module->private_data, struct partition_private_data);
1111         if (!data || !data->partitions) {
1112                 return ldb_next_request(module, req);
1113         }
1114
1115         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1116                 return partition_sequence_number(module, req);
1117         }
1118
1119         /* forward schemaUpdateNow operation to schema_fsmo module*/
1120         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1121                 return partition_extended_schema_update_now( module, req );
1122         }       
1123
1124         /* 
1125          * as the extended operation has no dn
1126          * we need to send it to all partitions
1127          */
1128
1129         ac = partition_init_ctx(module, req);
1130         if (!ac) {
1131                 return LDB_ERR_OPERATIONS_ERROR;
1132         }
1133
1134         return partition_send_all(module, ac, req);
1135 }
1136
1137 static int partition_sort_compare(const void *v1, const void *v2)
1138 {
1139         const struct dsdb_partition *p1;
1140         const struct dsdb_partition *p2;
1141
1142         p1 = *((struct dsdb_partition * const*)v1);
1143         p2 = *((struct dsdb_partition * const*)v2);
1144
1145         return ldb_dn_compare(p1->ctrl->dn, p2->ctrl->dn);
1146 }
1147
1148 static int partition_init(struct ldb_module *module)
1149 {
1150         int ret, i;
1151         TALLOC_CTX *mem_ctx = talloc_new(module);
1152         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1153         struct ldb_result *res;
1154         struct ldb_message *msg;
1155         struct ldb_message_element *partition_attributes;
1156         struct ldb_message_element *replicate_attributes;
1157         struct ldb_message_element *modules_attributes;
1158
1159         struct partition_private_data *data;
1160
1161         if (!mem_ctx) {
1162                 return LDB_ERR_OPERATIONS_ERROR;
1163         }
1164
1165         data = talloc(mem_ctx, struct partition_private_data);
1166         if (data == NULL) {
1167                 return LDB_ERR_OPERATIONS_ERROR;
1168         }
1169
1170         ret = ldb_search(ldb_module_get_ctx(module), mem_ctx, &res,
1171                          ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), "@PARTITION"),
1172                          LDB_SCOPE_BASE, attrs, NULL);
1173         if (ret != LDB_SUCCESS) {
1174                 talloc_free(mem_ctx);
1175                 return ret;
1176         }
1177         if (res->count == 0) {
1178                 talloc_free(mem_ctx);
1179                 return ldb_next_init(module);
1180         }
1181
1182         if (res->count > 1) {
1183                 talloc_free(mem_ctx);
1184                 return LDB_ERR_CONSTRAINT_VIOLATION;
1185         }
1186
1187         msg = res->msgs[0];
1188
1189         partition_attributes = ldb_msg_find_element(msg, "partition");
1190         if (!partition_attributes) {
1191                 ldb_set_errstring(ldb_module_get_ctx(module), "partition_init: no partitions specified");
1192                 talloc_free(mem_ctx);
1193                 return LDB_ERR_CONSTRAINT_VIOLATION;
1194         }
1195         data->partitions = talloc_array(data, struct dsdb_partition *, partition_attributes->num_values + 1);
1196         if (!data->partitions) {
1197                 talloc_free(mem_ctx);
1198                 return LDB_ERR_OPERATIONS_ERROR;
1199         }
1200         for (i=0; i < partition_attributes->num_values; i++) {
1201                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1202                 char *p = strchr(base, ':');
1203                 const char *backend;
1204
1205                 if (!p) {
1206                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1207                                                 "partition_init: "
1208                                                 "invalid form for partition record (missing ':'): %s", base);
1209                         talloc_free(mem_ctx);
1210                         return LDB_ERR_CONSTRAINT_VIOLATION;
1211                 }
1212                 p[0] = '\0';
1213                 p++;
1214                 if (!p[0]) {
1215                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1216                                                 "partition_init: "
1217                                                 "invalid form for partition record (missing backend database): %s", base);
1218                         talloc_free(mem_ctx);
1219                         return LDB_ERR_CONSTRAINT_VIOLATION;
1220                 }
1221                 data->partitions[i] = talloc(data->partitions, struct dsdb_partition);
1222                 if (!data->partitions[i]) {
1223                         talloc_free(mem_ctx);
1224                         return LDB_ERR_OPERATIONS_ERROR;
1225                 }
1226                 data->partitions[i]->ctrl = talloc(data->partitions[i], struct dsdb_control_current_partition);
1227                 if (!data->partitions[i]->ctrl) {
1228                         talloc_free(mem_ctx);
1229                         return LDB_ERR_OPERATIONS_ERROR;
1230                 }
1231                 data->partitions[i]->ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1232                 data->partitions[i]->ctrl->dn = ldb_dn_new(data->partitions[i], ldb_module_get_ctx(module), base);
1233                 if (!data->partitions[i]->ctrl->dn) {
1234                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1235                                                 "partition_init: invalid DN in partition record: %s", base);
1236                         talloc_free(mem_ctx);
1237                         return LDB_ERR_CONSTRAINT_VIOLATION;
1238                 }
1239
1240                 backend = samdb_relative_path(ldb_module_get_ctx(module), 
1241                                                                    data->partitions[i], 
1242                                                                    p);
1243                 if (!backend) {
1244                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1245                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1246                         talloc_free(mem_ctx);                   
1247                 }
1248                 ret = ldb_connect_backend(ldb_module_get_ctx(module), backend, NULL, &data->partitions[i]->module);
1249                 if (ret != LDB_SUCCESS) {
1250                         talloc_free(mem_ctx);
1251                         return ret;
1252                 }
1253         }
1254         data->partitions[i] = NULL;
1255
1256         /* sort these into order, most to least specific */
1257         qsort(data->partitions, partition_attributes->num_values,
1258               sizeof(*data->partitions), partition_sort_compare);
1259
1260         for (i=0; data->partitions[i]; i++) {
1261                 struct ldb_request *req;
1262                 req = talloc_zero(mem_ctx, struct ldb_request);
1263                 if (req == NULL) {
1264                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1265                         talloc_free(mem_ctx);
1266                         return LDB_ERR_OPERATIONS_ERROR;
1267                 }
1268                 
1269                 req->operation = LDB_REQ_REGISTER_PARTITION;
1270                 req->op.reg_partition.dn = data->partitions[i]->ctrl->dn;
1271                 req->callback = ldb_op_default_callback;
1272
1273                 ldb_set_timeout(ldb_module_get_ctx(module), req, 0);
1274
1275                 req->handle = ldb_handle_new(req, ldb_module_get_ctx(module));
1276                 if (req->handle == NULL) {
1277                         return LDB_ERR_OPERATIONS_ERROR;
1278                 }
1279                 
1280                 ret = ldb_request(ldb_module_get_ctx(module), req);
1281                 if (ret == LDB_SUCCESS) {
1282                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1283                 }
1284                 if (ret != LDB_SUCCESS) {
1285                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1286                         talloc_free(mem_ctx);
1287                         return LDB_ERR_OTHER;
1288                 }
1289                 talloc_free(req);
1290         }
1291
1292         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1293         if (!replicate_attributes) {
1294                 data->replicate = NULL;
1295         } else {
1296                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1297                 if (!data->replicate) {
1298                         talloc_free(mem_ctx);
1299                         return LDB_ERR_OPERATIONS_ERROR;
1300                 }
1301
1302                 for (i=0; i < replicate_attributes->num_values; i++) {
1303                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, ldb_module_get_ctx(module), &replicate_attributes->values[i]);
1304                         if (!ldb_dn_validate(data->replicate[i])) {
1305                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1306                                                         "partition_init: "
1307                                                         "invalid DN in partition replicate record: %s", 
1308                                                         replicate_attributes->values[i].data);
1309                                 talloc_free(mem_ctx);
1310                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1311                         }
1312                 }
1313                 data->replicate[i] = NULL;
1314         }
1315
1316         /* Make the private data available to any searches the modules may trigger in initialisation */
1317         module->private_data = data;
1318         talloc_steal(module, data);
1319         
1320         modules_attributes = ldb_msg_find_element(msg, "modules");
1321         if (modules_attributes) {
1322                 for (i=0; i < modules_attributes->num_values; i++) {
1323                         struct ldb_dn *base_dn;
1324                         int partition_idx;
1325                         struct dsdb_partition *partition = NULL;
1326                         const char **modules = NULL;
1327
1328                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1329                         char *p = strchr(base, ':');
1330                         if (!p) {
1331                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1332                                                         "partition_init: "
1333                                                         "invalid form for partition module record (missing ':'): %s", base);
1334                                 talloc_free(mem_ctx);
1335                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1336                         }
1337                         p[0] = '\0';
1338                         p++;
1339                         if (!p[0]) {
1340                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1341                                                         "partition_init: "
1342                                                         "invalid form for partition module record (missing backend database): %s", base);
1343                                 talloc_free(mem_ctx);
1344                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1345                         }
1346
1347                         modules = ldb_modules_list_from_string(ldb_module_get_ctx(module), mem_ctx,
1348                                                                p);
1349                         
1350                         base_dn = ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), base);
1351                         if (!ldb_dn_validate(base_dn)) {
1352                                 talloc_free(mem_ctx);
1353                                 return LDB_ERR_OPERATIONS_ERROR;
1354                         }
1355                         
1356                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1357                                 if (ldb_dn_compare(data->partitions[partition_idx]->ctrl->dn, base_dn) == 0) {
1358                                         partition = data->partitions[partition_idx];
1359                                         break;
1360                                 }
1361                         }
1362                         
1363                         if (!partition) {
1364                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1365                                                         "partition_init: "
1366                                                         "invalid form for partition module record (no such partition): %s", base);
1367                                 talloc_free(mem_ctx);
1368                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1369                         }
1370                         
1371                         ret = ldb_load_modules_list(ldb_module_get_ctx(module), modules, partition->module, &partition->module);
1372                         if (ret != LDB_SUCCESS) {
1373                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1374                                                        "partition_init: "
1375                                                        "loading backend for %s failed: %s", 
1376                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1377                                 talloc_free(mem_ctx);
1378                                 return ret;
1379                         }
1380                         ret = ldb_init_module_chain(ldb_module_get_ctx(module), partition->module);
1381                         if (ret != LDB_SUCCESS) {
1382                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1383                                                        "partition_init: "
1384                                                        "initialising backend for %s failed: %s", 
1385                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1386                                 talloc_free(mem_ctx);
1387                                 return ret;
1388                         }
1389                 }
1390         }
1391
1392         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1393         if (ret != LDB_SUCCESS) {
1394                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1395                         "partition: Unable to register control with rootdse!\n");
1396                 return ret;
1397         }
1398
1399         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1400         if (ret != LDB_SUCCESS) {
1401                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1402                         "partition: Unable to register control with rootdse!\n");
1403                 return ret;
1404         }
1405
1406         talloc_free(mem_ctx);
1407         return ldb_next_init(module);
1408 }
1409
1410 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1411         .name              = "partition",
1412         .init_context      = partition_init,
1413         .search            = partition_search,
1414         .add               = partition_add,
1415         .modify            = partition_modify,
1416         .del               = partition_delete,
1417         .rename            = partition_rename,
1418         .extended          = partition_extended,
1419         .start_transaction = partition_start_trans,
1420         .prepare_commit    = partition_prepare_commit,
1421         .end_transaction   = partition_end_trans,
1422         .del_transaction   = partition_del_trans,
1423 };