Merge branch 'master' of /home/tridge/samba/git/combined
[garming/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "includes.h"
33 #include "lib/ldb/include/ldb.h"
34 #include "lib/ldb/include/ldb_errors.h"
35 #include "lib/ldb/include/ldb_module.h"
36 #include "lib/ldb/include/ldb_private.h"
37 #include "dsdb/samdb/samdb.h"
38
39 struct dsdb_partition {
40         struct ldb_module *module;
41         struct dsdb_control_current_partition *ctrl;
42 };
43
44 struct partition_private_data {
45         struct dsdb_partition **partitions;
46         struct ldb_dn **replicate;
47 };
48
49 struct part_request {
50         struct ldb_module *module;
51         struct ldb_request *req;
52 };
53
54 struct partition_context {
55         struct ldb_module *module;
56         struct ldb_request *req;
57         bool got_success;
58
59         struct part_request *part_req;
60         int num_requests;
61         int finished_requests;
62 };
63
64 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
65 {
66         struct partition_context *ac;
67
68         ac = talloc_zero(req, struct partition_context);
69         if (ac == NULL) {
70                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
71                 return NULL;
72         }
73
74         ac->module = module;
75         ac->req = req;
76
77         return ac;
78 }
79
80 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
81         while (module && module->ops->op == NULL) module = module->next; \
82 } while (0)
83
84 #define PARTITION_FIND_OP(module, op) do { \
85         PARTITION_FIND_OP_NOERROR(module, op); \
86         if (module == NULL) { \
87                 ldb_asprintf_errstring(ldb_module_get_ctx(module), \
88                         "Unable to find backend operation for " #op ); \
89                 return LDB_ERR_OPERATIONS_ERROR; \
90         } \
91 } while (0)
92
93 /*
94  *    helper functions to call the next module in chain
95  *    */
96
97 static int partition_request(struct ldb_module *module, struct ldb_request *request)
98 {
99         int ret;
100         switch (request->operation) {
101         case LDB_SEARCH:
102                 PARTITION_FIND_OP(module, search);
103                 ret = module->ops->search(module, request);
104                 break;
105         case LDB_ADD:
106                 PARTITION_FIND_OP(module, add);
107                 ret = module->ops->add(module, request);
108                 break;
109         case LDB_MODIFY:
110                 PARTITION_FIND_OP(module, modify);
111                 ret = module->ops->modify(module, request);
112                 break;
113         case LDB_DELETE:
114                 PARTITION_FIND_OP(module, del);
115                 ret = module->ops->del(module, request);
116                 break;
117         case LDB_RENAME:
118                 PARTITION_FIND_OP(module, rename);
119                 ret = module->ops->rename(module, request);
120                 break;
121         case LDB_EXTENDED:
122                 PARTITION_FIND_OP(module, extended);
123                 ret = module->ops->extended(module, request);
124                 break;
125         default:
126                 PARTITION_FIND_OP(module, request);
127                 ret = module->ops->request(module, request);
128                 break;
129         }
130         if (ret == LDB_SUCCESS) {
131                 return ret;
132         }
133         if (!ldb_errstring(ldb_module_get_ctx(module))) {
134                 /* Set a default error string, to place the blame somewhere */
135                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
136                                         "error in module %s: %s (%d)",
137                                         module->ops->name,
138                                         ldb_strerror(ret), ret);
139         }
140         return ret;
141 }
142
143 static struct dsdb_partition *find_partition(struct partition_private_data *data,
144                                              struct ldb_dn *dn,
145                                              struct ldb_request *req)
146 {
147         int i;
148         struct ldb_control *partition_ctrl;
149
150         /* see if the request has the partition DN specified in a
151          * control. The repl_meta_data module can specify this to
152          * ensure that replication happens to the right partition
153          */
154         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
155         if (partition_ctrl) {
156                 const struct dsdb_control_current_partition *partition;
157                 partition = talloc_get_type(partition_ctrl->data,
158                                             struct dsdb_control_current_partition);
159                 if (partition != NULL) {
160                         dn = partition->dn;
161                 }
162         }
163
164         if (dn == NULL) {
165                 return NULL;
166         }
167
168         /* Look at base DN */
169         /* Figure out which partition it is under */
170         /* Skip the lot if 'data' isn't here yet (initialisation) */
171         for (i=0; data && data->partitions && data->partitions[i]; i++) {
172                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
173                         return data->partitions[i];
174                 }
175         }
176
177         return NULL;
178 }
179
180 /**
181  * fire the caller's callback for every entry, but only send 'done' once.
182  */
183 static int partition_req_callback(struct ldb_request *req,
184                                   struct ldb_reply *ares)
185 {
186         struct partition_context *ac;
187         struct ldb_module *module;
188         struct ldb_request *nreq;
189         int ret;
190
191         ac = talloc_get_type(req->context, struct partition_context);
192
193         if (!ares) {
194                 return ldb_module_done(ac->req, NULL, NULL,
195                                         LDB_ERR_OPERATIONS_ERROR);
196         }
197
198         if (ares->error != LDB_SUCCESS && !ac->got_success) {
199                 return ldb_module_done(ac->req, ares->controls,
200                                         ares->response, ares->error);
201         }
202
203         switch (ares->type) {
204         case LDB_REPLY_REFERRAL:
205                 /* ignore referrals for now */
206                 break;
207
208         case LDB_REPLY_ENTRY:
209                 if (ac->req->operation != LDB_SEARCH) {
210                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
211                                 "partition_req_callback:"
212                                 " Unsupported reply type for this request");
213                         return ldb_module_done(ac->req, NULL, NULL,
214                                                 LDB_ERR_OPERATIONS_ERROR);
215                 }
216                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
217
218         case LDB_REPLY_DONE:
219                 if (ares->error == LDB_SUCCESS) {
220                         ac->got_success = true;
221                 }
222                 if (ac->req->operation == LDB_EXTENDED) {
223                         /* FIXME: check for ares->response, replmd does not fill it ! */
224                         if (ares->response) {
225                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
226                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
227                                                           "partition_req_callback:"
228                                                           " Unknown extended reply, "
229                                                           "only supports START_TLS");
230                                         talloc_free(ares);
231                                         return ldb_module_done(ac->req, NULL, NULL,
232                                                                 LDB_ERR_OPERATIONS_ERROR);
233                                 }
234                         }
235                 }
236
237                 ac->finished_requests++;
238                 if (ac->finished_requests == ac->num_requests) {
239                         /* this was the last one, call callback */
240                         return ldb_module_done(ac->req, ares->controls,
241                                                ares->response, 
242                                                ac->got_success?LDB_SUCCESS:ares->error);
243                 }
244
245                 /* not the last, now call the next one */
246                 module = ac->part_req[ac->finished_requests].module;
247                 nreq = ac->part_req[ac->finished_requests].req;
248
249                 ret = partition_request(module, nreq);
250                 if (ret != LDB_SUCCESS) {
251                         talloc_free(ares);
252                         return ldb_module_done(ac->req, NULL, NULL, ret);
253                 }
254
255                 break;
256         }
257
258         talloc_free(ares);
259         return LDB_SUCCESS;
260 }
261
262 static int partition_prep_request(struct partition_context *ac,
263                                   struct dsdb_partition *partition)
264 {
265         int ret;
266         struct ldb_request *req;
267
268         ac->part_req = talloc_realloc(ac, ac->part_req,
269                                         struct part_request,
270                                         ac->num_requests + 1);
271         if (ac->part_req == NULL) {
272                 ldb_oom(ldb_module_get_ctx(ac->module));
273                 return LDB_ERR_OPERATIONS_ERROR;
274         }
275
276         switch (ac->req->operation) {
277         case LDB_SEARCH:
278                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
279                                         ac->part_req,
280                                         ac->req->op.search.base,
281                                         ac->req->op.search.scope,
282                                         ac->req->op.search.tree,
283                                         ac->req->op.search.attrs,
284                                         ac->req->controls,
285                                         ac, partition_req_callback,
286                                         ac->req);
287                 break;
288         case LDB_ADD:
289                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
290                                         ac->req->op.add.message,
291                                         ac->req->controls,
292                                         ac, partition_req_callback,
293                                         ac->req);
294                 break;
295         case LDB_MODIFY:
296                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
297                                         ac->req->op.mod.message,
298                                         ac->req->controls,
299                                         ac, partition_req_callback,
300                                         ac->req);
301                 break;
302         case LDB_DELETE:
303                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
304                                         ac->req->op.del.dn,
305                                         ac->req->controls,
306                                         ac, partition_req_callback,
307                                         ac->req);
308                 break;
309         case LDB_RENAME:
310                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
311                                         ac->req->op.rename.olddn,
312                                         ac->req->op.rename.newdn,
313                                         ac->req->controls,
314                                         ac, partition_req_callback,
315                                         ac->req);
316                 break;
317         case LDB_EXTENDED:
318                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
319                                         ac->part_req,
320                                         ac->req->op.extended.oid,
321                                         ac->req->op.extended.data,
322                                         ac->req->controls,
323                                         ac, partition_req_callback,
324                                         ac->req);
325                 break;
326         default:
327                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
328                                   "Unsupported request type!");
329                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
330         }
331
332         if (ret != LDB_SUCCESS) {
333                 return ret;
334         }
335
336         ac->part_req[ac->num_requests].req = req;
337
338         if (ac->req->controls) {
339                 req->controls = talloc_memdup(req, ac->req->controls,
340                                         talloc_get_size(ac->req->controls));
341                 if (req->controls == NULL) {
342                         ldb_oom(ldb_module_get_ctx(ac->module));
343                         return LDB_ERR_OPERATIONS_ERROR;
344                 }
345         }
346
347         if (partition) {
348                 ac->part_req[ac->num_requests].module = partition->module;
349
350                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
351                         ret = ldb_request_add_control(req,
352                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
353                                                       false, partition->ctrl);
354                         if (ret != LDB_SUCCESS) {
355                                 return ret;
356                         }
357                 }
358
359                 if (req->operation == LDB_SEARCH) {
360                         /* If the search is for 'more' than this partition,
361                          * then change the basedn, so a remote LDAP server
362                          * doesn't object */
363                         if (ldb_dn_compare_base(partition->ctrl->dn,
364                                                 req->op.search.base) != 0) {
365                                 req->op.search.base = partition->ctrl->dn;
366                         }
367                 }
368
369         } else {
370                 /* make sure you put the NEXT module here, or
371                  * partition_request() will simply loop forever on itself */
372                 ac->part_req[ac->num_requests].module = ac->module->next;
373         }
374
375         ac->num_requests++;
376
377         return LDB_SUCCESS;
378 }
379
380 static int partition_call_first(struct partition_context *ac)
381 {
382         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
383 }
384
385 /**
386  * Send a request down to all the partitions
387  */
388 static int partition_send_all(struct ldb_module *module, 
389                               struct partition_context *ac, 
390                               struct ldb_request *req) 
391 {
392         int i;
393         struct partition_private_data *data = talloc_get_type(module->private_data, 
394                                                               struct partition_private_data);
395         int ret = partition_prep_request(ac, NULL);
396         if (ret != LDB_SUCCESS) {
397                 return ret;
398         }
399         for (i=0; data && data->partitions && data->partitions[i]; i++) {
400                 ret = partition_prep_request(ac, data->partitions[i]);
401                 if (ret != LDB_SUCCESS) {
402                         return ret;
403                 }
404         }
405
406         /* fire the first one */
407         return partition_call_first(ac);
408 }
409
410 /**
411  * Figure out which backend a request needs to be aimed at.  Some
412  * requests must be replicated to all backends
413  */
414 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
415 {
416         struct partition_context *ac;
417         unsigned i;
418         int ret;
419         struct dsdb_partition *partition;
420         struct partition_private_data *data = talloc_get_type(module->private_data, 
421                                                               struct partition_private_data);
422         if (!data || !data->partitions) {
423                 return ldb_next_request(module, req);
424         }
425         
426         if (req->operation != LDB_SEARCH) {
427                 /* Is this a special DN, we need to replicate to every backend? */
428                 for (i=0; data->replicate && data->replicate[i]; i++) {
429                         if (ldb_dn_compare(data->replicate[i], 
430                                            dn) == 0) {
431                                 
432                                 ac = partition_init_ctx(module, req);
433                                 if (!ac) {
434                                         return LDB_ERR_OPERATIONS_ERROR;
435                                 }
436                                 
437                                 return partition_send_all(module, ac, req);
438                         }
439                 }
440         }
441
442         /* Otherwise, we need to find the partition to fire it to */
443
444         /* Find partition */
445         partition = find_partition(data, dn, req);
446         if (!partition) {
447                 /*
448                  * if we haven't found a matching partition
449                  * pass the request to the main ldb
450                  *
451                  * TODO: we should maybe return an error here
452                  *       if it's not a special dn
453                  */
454
455                 return ldb_next_request(module, req);
456         }
457
458         ac = partition_init_ctx(module, req);
459         if (!ac) {
460                 return LDB_ERR_OPERATIONS_ERROR;
461         }
462
463         /* we need to add a control but we never touch the original request */
464         ret = partition_prep_request(ac, partition);
465         if (ret != LDB_SUCCESS) {
466                 return ret;
467         }
468
469         /* fire the first one */
470         return partition_call_first(ac);
471 }
472
473 /* search */
474 static int partition_search(struct ldb_module *module, struct ldb_request *req)
475 {
476         struct ldb_control **saved_controls;
477         /* Find backend */
478         struct partition_private_data *data = talloc_get_type(module->private_data, 
479                                                               struct partition_private_data);
480
481         /* issue request */
482
483         /* (later) consider if we should be searching multiple
484          * partitions (for 'invisible' partition behaviour */
485
486         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
487         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
488         
489         struct ldb_search_options_control *search_options = NULL;
490         struct dsdb_partition *p;
491         
492         p = find_partition(data, NULL, req);
493         if (p != NULL) {
494                 /* the caller specified what partition they want the
495                  * search - just pass it on
496                  */
497                 return ldb_next_request(p->module, req);                
498         }
499
500
501         if (search_control) {
502                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
503         }
504
505         /* Remove the domain_scope control, so we don't confuse a backend server */
506         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
507                 ldb_oom(ldb_module_get_ctx(module));
508                 return LDB_ERR_OPERATIONS_ERROR;
509         }
510
511         /*
512          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
513          * down as uncritical to make windows 2008 dcpromo happy.
514          */
515         if (search_control) {
516                 search_control->critical = 0;
517         }
518
519         /* TODO:
520            Generate referrals (look for a partition under this DN) if we don't have the above control specified
521         */
522         
523         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
524                 int ret, i;
525                 struct partition_context *ac;
526                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
527                         /* We have processed this flag, so we are done with this control now */
528
529                         /* Remove search control, so we don't confuse a backend server */
530                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
531                                 ldb_oom(ldb_module_get_ctx(module));
532                                 return LDB_ERR_OPERATIONS_ERROR;
533                         }
534                 }
535                 ac = partition_init_ctx(module, req);
536                 if (!ac) {
537                         return LDB_ERR_OPERATIONS_ERROR;
538                 }
539
540                 /* Search from the base DN */
541                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
542                         return partition_send_all(module, ac, req);
543                 }
544                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
545                         bool match = false, stop = false;
546                         /* Find all partitions under the search base 
547                            
548                            we match if:
549
550                               1) the DN we are looking for exactly matches the partition
551                              or
552                               2) the DN we are looking for is a parent of the partition and it isn't
553                                  a scope base search
554                              or
555                               3) the DN we are looking for is a child of the partition
556                          */
557                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
558                                 match = true;
559                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
560                                         stop = true;
561                                 }
562                         }
563                         if (!match && 
564                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
565                              req->op.search.scope != LDB_SCOPE_BASE)) {
566                                 match = true;
567                         }
568                         if (!match &&
569                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
570                                 match = true;
571                                 stop = true; /* note that this relies on partition ordering */
572                         }
573                         if (match) {
574                                 ret = partition_prep_request(ac, data->partitions[i]);
575                                 if (ret != LDB_SUCCESS) {
576                                         return ret;
577                                 }
578                         }
579                         if (stop) break;
580                 }
581
582                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
583                 if (ac->num_requests == 0) {
584                         talloc_free(ac);
585                         return ldb_next_request(module, req);
586                 }
587
588                 /* fire the first one */
589                 return partition_call_first(ac);
590
591         } else {
592                 /* Handle this like all other requests */
593                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
594                         /* We have processed this flag, so we are done with this control now */
595
596                         /* Remove search control, so we don't confuse a backend server */
597                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
598                                 ldb_oom(ldb_module_get_ctx(module));
599                                 return LDB_ERR_OPERATIONS_ERROR;
600                         }
601                 }
602
603                 return partition_replicate(module, req, req->op.search.base);
604         }
605 }
606
607 /* add */
608 static int partition_add(struct ldb_module *module, struct ldb_request *req)
609 {
610         return partition_replicate(module, req, req->op.add.message->dn);
611 }
612
613 /* modify */
614 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
615 {
616         return partition_replicate(module, req, req->op.mod.message->dn);
617 }
618
619 /* delete */
620 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
621 {
622         return partition_replicate(module, req, req->op.del.dn);
623 }
624
625 /* rename */
626 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
627 {
628         /* Find backend */
629         struct dsdb_partition *backend, *backend2;
630         
631         struct partition_private_data *data = talloc_get_type(module->private_data, 
632                                                               struct partition_private_data);
633
634         /* Skip the lot if 'data' isn't here yet (initialisation) */
635         if (!data) {
636                 return LDB_ERR_OPERATIONS_ERROR;
637         }
638
639         backend = find_partition(data, req->op.rename.olddn, req);
640         backend2 = find_partition(data, req->op.rename.newdn, req);
641
642         if ((backend && !backend2) || (!backend && backend2)) {
643                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
644         }
645
646         if (backend != backend2) {
647                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
648                                        "Cannot rename from %s in %s to %s in %s: %s",
649                                        ldb_dn_get_linearized(req->op.rename.olddn),
650                                        ldb_dn_get_linearized(backend->ctrl->dn),
651                                        ldb_dn_get_linearized(req->op.rename.newdn),
652                                        ldb_dn_get_linearized(backend2->ctrl->dn),
653                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
654                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
655         }
656
657         return partition_replicate(module, req, req->op.rename.olddn);
658 }
659
660 /* start a transaction */
661 static int partition_start_trans(struct ldb_module *module)
662 {
663         int i, ret;
664         struct partition_private_data *data = talloc_get_type(module->private_data, 
665                                                               struct partition_private_data);
666         /* Look at base DN */
667         /* Figure out which partition it is under */
668         /* Skip the lot if 'data' isn't here yet (initialization) */
669         ret = ldb_next_start_trans(module);
670         if (ret != LDB_SUCCESS) {
671                 return ret;
672         }
673
674         for (i=0; data && data->partitions && data->partitions[i]; i++) {
675                 struct ldb_module *next = data->partitions[i]->module;
676                 PARTITION_FIND_OP(next, start_transaction);
677
678                 ret = next->ops->start_transaction(next);
679                 if (ret != LDB_SUCCESS) {
680                         /* Back it out, if it fails on one */
681                         for (i--; i >= 0; i--) {
682                                 next = data->partitions[i]->module;
683                                 PARTITION_FIND_OP(next, del_transaction);
684
685                                 next->ops->del_transaction(next);
686                         }
687                         ldb_next_del_trans(module);
688                         return ret;
689                 }
690         }
691         return LDB_SUCCESS;
692 }
693
694 /* prepare for a commit */
695 static int partition_prepare_commit(struct ldb_module *module)
696 {
697         int i;
698         struct partition_private_data *data = talloc_get_type(module->private_data, 
699                                                               struct partition_private_data);
700
701         for (i=0; data && data->partitions && data->partitions[i]; i++) {
702                 struct ldb_module *next_prepare = data->partitions[i]->module;
703                 int ret;
704
705                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
706                 if (next_prepare == NULL) {
707                         continue;
708                 }
709
710                 ret = next_prepare->ops->prepare_commit(next_prepare);
711                 if (ret != LDB_SUCCESS) {
712                         return ret;
713                 }
714         }
715
716         return ldb_next_prepare_commit(module);
717 }
718
719
720 /* end a transaction */
721 static int partition_end_trans(struct ldb_module *module)
722 {
723         int i;
724         struct partition_private_data *data = talloc_get_type(module->private_data, 
725                                                               struct partition_private_data);
726         for (i=0; data && data->partitions && data->partitions[i]; i++) {
727                 struct ldb_module *next_end = data->partitions[i]->module;
728                 int ret;
729
730                 PARTITION_FIND_OP(next_end, end_transaction);
731
732                 ret = next_end->ops->end_transaction(next_end);
733                 if (ret != LDB_SUCCESS) {
734                         return ret;
735                 }
736         }
737
738         return ldb_next_end_trans(module);
739 }
740
741 /* delete a transaction */
742 static int partition_del_trans(struct ldb_module *module)
743 {
744         int i, ret, final_ret = LDB_SUCCESS;
745         struct partition_private_data *data = talloc_get_type(module->private_data, 
746                                                               struct partition_private_data);
747         for (i=0; data && data->partitions && data->partitions[i]; i++) {
748                 struct ldb_module *next = data->partitions[i]->module;
749                 PARTITION_FIND_OP(next, del_transaction);
750
751                 ret = next->ops->del_transaction(next);
752                 if (ret != LDB_SUCCESS) {
753                         final_ret = ret;
754                 }
755         }       
756
757         ret = ldb_next_del_trans(module);
758         if (ret != LDB_SUCCESS) {
759                 final_ret = ret;
760         }
761         return final_ret;
762 }
763
764
765 /* FIXME: This function is still semi-async */
766 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
767 {
768         int i, ret;
769         uint64_t seq_number = 0;
770         uint64_t timestamp_sequence = 0;
771         uint64_t timestamp = 0;
772         struct partition_private_data *data = talloc_get_type(module->private_data, 
773                                                               struct partition_private_data);
774         struct ldb_seqnum_request *seq;
775         struct ldb_seqnum_result *seqr;
776         struct ldb_request *treq;
777         struct ldb_seqnum_request *tseq;
778         struct ldb_seqnum_result *tseqr;
779         struct ldb_extended *ext;
780         struct ldb_result *res;
781         struct dsdb_partition *p;
782
783         p = find_partition(data, NULL, req);
784         if (p != NULL) {
785                 /* the caller specified what partition they want the
786                  * sequence number operation on - just pass it on
787                  */
788                 return ldb_next_request(p->module, req);                
789         }
790
791         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
792
793         switch (seq->type) {
794         case LDB_SEQ_NEXT:
795         case LDB_SEQ_HIGHEST_SEQ:
796                 res = talloc_zero(req, struct ldb_result);
797                 if (res == NULL) {
798                         return LDB_ERR_OPERATIONS_ERROR;
799                 }
800                 tseq = talloc_zero(res, struct ldb_seqnum_request);
801                 if (tseq == NULL) {
802                         talloc_free(res);
803                         return LDB_ERR_OPERATIONS_ERROR;
804                 }
805                 tseq->type = seq->type;
806
807                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
808                                              LDB_EXTENDED_SEQUENCE_NUMBER,
809                                              tseq,
810                                              NULL,
811                                              res,
812                                              ldb_extended_default_callback,
813                                              NULL);
814                 ret = ldb_next_request(module, treq);
815                 if (ret == LDB_SUCCESS) {
816                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
817                 }
818                 if (ret != LDB_SUCCESS) {
819                         talloc_free(res);
820                         return ret;
821                 }
822                 seqr = talloc_get_type(res->extended->data,
823                                         struct ldb_seqnum_result);
824                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
825                         timestamp_sequence = seqr->seq_num;
826                 } else {
827                         seq_number += seqr->seq_num;
828                 }
829                 talloc_free(res);
830
831                 /* Skip the lot if 'data' isn't here yet (initialisation) */
832                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
833
834                         res = talloc_zero(req, struct ldb_result);
835                         if (res == NULL) {
836                                 return LDB_ERR_OPERATIONS_ERROR;
837                         }
838                         tseq = talloc_zero(res, struct ldb_seqnum_request);
839                         if (tseq == NULL) {
840                                 talloc_free(res);
841                                 return LDB_ERR_OPERATIONS_ERROR;
842                         }
843                         tseq->type = seq->type;
844
845                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
846                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
847                                                      tseq,
848                                                      NULL,
849                                                      res,
850                                                      ldb_extended_default_callback,
851                                                      NULL);
852                         if (ret != LDB_SUCCESS) {
853                                 talloc_free(res);
854                                 return ret;
855                         }
856
857                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
858                                 ret = ldb_request_add_control(treq,
859                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
860                                                               false, data->partitions[i]->ctrl);
861                                 if (ret != LDB_SUCCESS) {
862                                         talloc_free(res);
863                                         return ret;
864                                 }
865                         }
866
867                         ret = partition_request(data->partitions[i]->module, treq);
868                         if (ret != LDB_SUCCESS) {
869                                 talloc_free(res);
870                                 return ret;
871                         }
872                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
873                         if (ret != LDB_SUCCESS) {
874                                 talloc_free(res);
875                                 return ret;
876                         }
877                         tseqr = talloc_get_type(res->extended->data,
878                                                 struct ldb_seqnum_result);
879                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
880                                 timestamp_sequence = MAX(timestamp_sequence,
881                                                          tseqr->seq_num);
882                         } else {
883                                 seq_number += tseqr->seq_num;
884                         }
885                         talloc_free(res);
886                 }
887                 /* fall through */
888         case LDB_SEQ_HIGHEST_TIMESTAMP:
889
890                 res = talloc_zero(req, struct ldb_result);
891                 if (res == NULL) {
892                         return LDB_ERR_OPERATIONS_ERROR;
893                 }
894
895                 tseq = talloc_zero(res, struct ldb_seqnum_request);
896                 if (tseq == NULL) {
897                         talloc_free(res);
898                         return LDB_ERR_OPERATIONS_ERROR;
899                 }
900                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
901
902                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
903                                              LDB_EXTENDED_SEQUENCE_NUMBER,
904                                              tseq,
905                                              NULL,
906                                              res,
907                                              ldb_extended_default_callback,
908                                              NULL);
909                 if (ret != LDB_SUCCESS) {
910                         talloc_free(res);
911                         return ret;
912                 }
913
914                 ret = ldb_next_request(module, treq);
915                 if (ret != LDB_SUCCESS) {
916                         talloc_free(res);
917                         return ret;
918                 }
919                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
920                 if (ret != LDB_SUCCESS) {
921                         talloc_free(res);
922                         return ret;
923                 }
924
925                 tseqr = talloc_get_type(res->extended->data,
926                                            struct ldb_seqnum_result);
927                 timestamp = tseqr->seq_num;
928
929                 talloc_free(res);
930
931                 /* Skip the lot if 'data' isn't here yet (initialisation) */
932                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
933
934                         res = talloc_zero(req, struct ldb_result);
935                         if (res == NULL) {
936                                 return LDB_ERR_OPERATIONS_ERROR;
937                         }
938
939                         tseq = talloc_zero(res, struct ldb_seqnum_request);
940                         if (tseq == NULL) {
941                                 talloc_free(res);
942                                 return LDB_ERR_OPERATIONS_ERROR;
943                         }
944                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
945
946                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
947                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
948                                                      tseq,
949                                                      NULL,
950                                                      res,
951                                                      ldb_extended_default_callback,
952                                                      NULL);
953                         if (ret != LDB_SUCCESS) {
954                                 talloc_free(res);
955                                 return ret;
956                         }
957
958                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
959                                 ret = ldb_request_add_control(treq,
960                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
961                                                               false, data->partitions[i]->ctrl);
962                                 if (ret != LDB_SUCCESS) {
963                                         talloc_free(res);
964                                         return ret;
965                                 }
966                         }
967
968                         ret = partition_request(data->partitions[i]->module, treq);
969                         if (ret != LDB_SUCCESS) {
970                                 talloc_free(res);
971                                 return ret;
972                         }
973                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
974                         if (ret != LDB_SUCCESS) {
975                                 talloc_free(res);
976                                 return ret;
977                         }
978
979                         tseqr = talloc_get_type(res->extended->data,
980                                                   struct ldb_seqnum_result);
981                         timestamp = MAX(timestamp, tseqr->seq_num);
982
983                         talloc_free(res);
984                 }
985
986                 break;
987         }
988
989         ext = talloc_zero(req, struct ldb_extended);
990         if (!ext) {
991                 return LDB_ERR_OPERATIONS_ERROR;
992         }
993         seqr = talloc_zero(ext, struct ldb_seqnum_result);
994         if (seqr == NULL) {
995                 talloc_free(ext);
996                 return LDB_ERR_OPERATIONS_ERROR;
997         }
998         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
999         ext->data = seqr;
1000
1001         switch (seq->type) {
1002         case LDB_SEQ_NEXT:
1003         case LDB_SEQ_HIGHEST_SEQ:
1004
1005                 /* Has someone above set a timebase sequence? */
1006                 if (timestamp_sequence) {
1007                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1008                 } else {
1009                         seqr->seq_num = seq_number;
1010                 }
1011
1012                 if (timestamp_sequence > seqr->seq_num) {
1013                         seqr->seq_num = timestamp_sequence;
1014                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1015                 }
1016
1017                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1018                 break;
1019         case LDB_SEQ_HIGHEST_TIMESTAMP:
1020                 seqr->seq_num = timestamp;
1021                 break;
1022         }
1023
1024         if (seq->type == LDB_SEQ_NEXT) {
1025                 seqr->seq_num++;
1026         }
1027
1028         /* send request done */
1029         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1030 }
1031
1032 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1033 {
1034         struct dsdb_partition *partition;
1035         struct partition_private_data *data;
1036         struct ldb_dn *schema_dn;
1037         struct partition_context *ac;
1038         int ret;
1039
1040         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1041         if (!schema_dn) {
1042                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1043                 return LDB_ERR_PROTOCOL_ERROR;
1044         }
1045
1046         data = talloc_get_type(module->private_data, struct partition_private_data);
1047         if (!data) {
1048                 return LDB_ERR_OPERATIONS_ERROR;
1049         }
1050         
1051         partition = find_partition( data, schema_dn, req);
1052         if (!partition) {
1053                 return ldb_next_request(module, req);
1054         }
1055
1056         ac = partition_init_ctx(module, req);
1057         if (!ac) {
1058                 return LDB_ERR_OPERATIONS_ERROR;
1059         }
1060
1061         /* we need to add a control but we never touch the original request */
1062         ret = partition_prep_request(ac, partition);
1063         if (ret != LDB_SUCCESS) {
1064                 return ret;
1065         }
1066
1067         /* fire the first one */
1068         ret =  partition_call_first(ac);
1069
1070         if (ret != LDB_SUCCESS){
1071                 return ret;
1072         }
1073
1074         return ldb_request_done(req, ret);
1075 }
1076
1077
1078 /* extended */
1079 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1080 {
1081         struct partition_private_data *data;
1082         struct partition_context *ac;
1083
1084         data = talloc_get_type(module->private_data, struct partition_private_data);
1085         if (!data || !data->partitions) {
1086                 return ldb_next_request(module, req);
1087         }
1088
1089         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1090                 return partition_sequence_number(module, req);
1091         }
1092
1093         /* forward schemaUpdateNow operation to schema_fsmo module*/
1094         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1095                 return partition_extended_schema_update_now( module, req );
1096         }       
1097
1098         /* 
1099          * as the extended operation has no dn
1100          * we need to send it to all partitions
1101          */
1102
1103         ac = partition_init_ctx(module, req);
1104         if (!ac) {
1105                 return LDB_ERR_OPERATIONS_ERROR;
1106         }
1107
1108         return partition_send_all(module, ac, req);
1109 }
1110
1111 static int partition_sort_compare(const void *v1, const void *v2)
1112 {
1113         const struct dsdb_partition *p1;
1114         const struct dsdb_partition *p2;
1115
1116         p1 = *((struct dsdb_partition * const*)v1);
1117         p2 = *((struct dsdb_partition * const*)v2);
1118
1119         return ldb_dn_compare(p1->ctrl->dn, p2->ctrl->dn);
1120 }
1121
1122 static int partition_init(struct ldb_module *module)
1123 {
1124         int ret, i;
1125         TALLOC_CTX *mem_ctx = talloc_new(module);
1126         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1127         struct ldb_result *res;
1128         struct ldb_message *msg;
1129         struct ldb_message_element *partition_attributes;
1130         struct ldb_message_element *replicate_attributes;
1131         struct ldb_message_element *modules_attributes;
1132
1133         struct partition_private_data *data;
1134
1135         if (!mem_ctx) {
1136                 return LDB_ERR_OPERATIONS_ERROR;
1137         }
1138
1139         data = talloc(mem_ctx, struct partition_private_data);
1140         if (data == NULL) {
1141                 return LDB_ERR_OPERATIONS_ERROR;
1142         }
1143
1144         ret = ldb_search(ldb_module_get_ctx(module), mem_ctx, &res,
1145                          ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), "@PARTITION"),
1146                          LDB_SCOPE_BASE, attrs, NULL);
1147         if (ret != LDB_SUCCESS) {
1148                 talloc_free(mem_ctx);
1149                 return ret;
1150         }
1151         if (res->count == 0) {
1152                 talloc_free(mem_ctx);
1153                 return ldb_next_init(module);
1154         }
1155
1156         if (res->count > 1) {
1157                 talloc_free(mem_ctx);
1158                 return LDB_ERR_CONSTRAINT_VIOLATION;
1159         }
1160
1161         msg = res->msgs[0];
1162
1163         partition_attributes = ldb_msg_find_element(msg, "partition");
1164         if (!partition_attributes) {
1165                 ldb_set_errstring(ldb_module_get_ctx(module), "partition_init: no partitions specified");
1166                 talloc_free(mem_ctx);
1167                 return LDB_ERR_CONSTRAINT_VIOLATION;
1168         }
1169         data->partitions = talloc_array(data, struct dsdb_partition *, partition_attributes->num_values + 1);
1170         if (!data->partitions) {
1171                 talloc_free(mem_ctx);
1172                 return LDB_ERR_OPERATIONS_ERROR;
1173         }
1174         for (i=0; i < partition_attributes->num_values; i++) {
1175                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1176                 char *p = strchr(base, ':');
1177                 const char *backend;
1178
1179                 if (!p) {
1180                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1181                                                 "partition_init: "
1182                                                 "invalid form for partition record (missing ':'): %s", base);
1183                         talloc_free(mem_ctx);
1184                         return LDB_ERR_CONSTRAINT_VIOLATION;
1185                 }
1186                 p[0] = '\0';
1187                 p++;
1188                 if (!p[0]) {
1189                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1190                                                 "partition_init: "
1191                                                 "invalid form for partition record (missing backend database): %s", base);
1192                         talloc_free(mem_ctx);
1193                         return LDB_ERR_CONSTRAINT_VIOLATION;
1194                 }
1195                 data->partitions[i] = talloc(data->partitions, struct dsdb_partition);
1196                 if (!data->partitions[i]) {
1197                         talloc_free(mem_ctx);
1198                         return LDB_ERR_OPERATIONS_ERROR;
1199                 }
1200                 data->partitions[i]->ctrl = talloc(data->partitions[i], struct dsdb_control_current_partition);
1201                 if (!data->partitions[i]->ctrl) {
1202                         talloc_free(mem_ctx);
1203                         return LDB_ERR_OPERATIONS_ERROR;
1204                 }
1205                 data->partitions[i]->ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1206                 data->partitions[i]->ctrl->dn = ldb_dn_new(data->partitions[i], ldb_module_get_ctx(module), base);
1207                 if (!data->partitions[i]->ctrl->dn) {
1208                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1209                                                 "partition_init: invalid DN in partition record: %s", base);
1210                         talloc_free(mem_ctx);
1211                         return LDB_ERR_CONSTRAINT_VIOLATION;
1212                 }
1213
1214                 backend = samdb_relative_path(ldb_module_get_ctx(module), 
1215                                                                    data->partitions[i], 
1216                                                                    p);
1217                 if (!backend) {
1218                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1219                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1220                         talloc_free(mem_ctx);                   
1221                 }
1222                 ret = ldb_connect_backend(ldb_module_get_ctx(module), backend, NULL, &data->partitions[i]->module);
1223                 if (ret != LDB_SUCCESS) {
1224                         talloc_free(mem_ctx);
1225                         return ret;
1226                 }
1227         }
1228         data->partitions[i] = NULL;
1229
1230         /* sort these into order, most to least specific */
1231         qsort(data->partitions, partition_attributes->num_values,
1232               sizeof(*data->partitions), partition_sort_compare);
1233
1234         for (i=0; data->partitions[i]; i++) {
1235                 struct ldb_request *req;
1236                 req = talloc_zero(mem_ctx, struct ldb_request);
1237                 if (req == NULL) {
1238                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1239                         talloc_free(mem_ctx);
1240                         return LDB_ERR_OPERATIONS_ERROR;
1241                 }
1242                 
1243                 req->operation = LDB_REQ_REGISTER_PARTITION;
1244                 req->op.reg_partition.dn = data->partitions[i]->ctrl->dn;
1245                 req->callback = ldb_op_default_callback;
1246
1247                 ldb_set_timeout(ldb_module_get_ctx(module), req, 0);
1248
1249                 req->handle = ldb_handle_new(req, ldb_module_get_ctx(module));
1250                 if (req->handle == NULL) {
1251                         return LDB_ERR_OPERATIONS_ERROR;
1252                 }
1253                 
1254                 ret = ldb_request(ldb_module_get_ctx(module), req);
1255                 if (ret == LDB_SUCCESS) {
1256                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1257                 }
1258                 if (ret != LDB_SUCCESS) {
1259                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1260                         talloc_free(mem_ctx);
1261                         return LDB_ERR_OTHER;
1262                 }
1263                 talloc_free(req);
1264         }
1265
1266         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1267         if (!replicate_attributes) {
1268                 data->replicate = NULL;
1269         } else {
1270                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1271                 if (!data->replicate) {
1272                         talloc_free(mem_ctx);
1273                         return LDB_ERR_OPERATIONS_ERROR;
1274                 }
1275
1276                 for (i=0; i < replicate_attributes->num_values; i++) {
1277                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, ldb_module_get_ctx(module), &replicate_attributes->values[i]);
1278                         if (!ldb_dn_validate(data->replicate[i])) {
1279                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1280                                                         "partition_init: "
1281                                                         "invalid DN in partition replicate record: %s", 
1282                                                         replicate_attributes->values[i].data);
1283                                 talloc_free(mem_ctx);
1284                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1285                         }
1286                 }
1287                 data->replicate[i] = NULL;
1288         }
1289
1290         /* Make the private data available to any searches the modules may trigger in initialisation */
1291         module->private_data = data;
1292         talloc_steal(module, data);
1293         
1294         modules_attributes = ldb_msg_find_element(msg, "modules");
1295         if (modules_attributes) {
1296                 for (i=0; i < modules_attributes->num_values; i++) {
1297                         struct ldb_dn *base_dn;
1298                         int partition_idx;
1299                         struct dsdb_partition *partition = NULL;
1300                         const char **modules = NULL;
1301
1302                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1303                         char *p = strchr(base, ':');
1304                         if (!p) {
1305                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1306                                                         "partition_init: "
1307                                                         "invalid form for partition module record (missing ':'): %s", base);
1308                                 talloc_free(mem_ctx);
1309                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1310                         }
1311                         p[0] = '\0';
1312                         p++;
1313                         if (!p[0]) {
1314                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1315                                                         "partition_init: "
1316                                                         "invalid form for partition module record (missing backend database): %s", base);
1317                                 talloc_free(mem_ctx);
1318                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1319                         }
1320
1321                         modules = ldb_modules_list_from_string(ldb_module_get_ctx(module), mem_ctx,
1322                                                                p);
1323                         
1324                         base_dn = ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), base);
1325                         if (!ldb_dn_validate(base_dn)) {
1326                                 talloc_free(mem_ctx);
1327                                 return LDB_ERR_OPERATIONS_ERROR;
1328                         }
1329                         
1330                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1331                                 if (ldb_dn_compare(data->partitions[partition_idx]->ctrl->dn, base_dn) == 0) {
1332                                         partition = data->partitions[partition_idx];
1333                                         break;
1334                                 }
1335                         }
1336                         
1337                         if (!partition) {
1338                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1339                                                         "partition_init: "
1340                                                         "invalid form for partition module record (no such partition): %s", base);
1341                                 talloc_free(mem_ctx);
1342                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1343                         }
1344                         
1345                         ret = ldb_load_modules_list(ldb_module_get_ctx(module), modules, partition->module, &partition->module);
1346                         if (ret != LDB_SUCCESS) {
1347                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1348                                                        "partition_init: "
1349                                                        "loading backend for %s failed: %s", 
1350                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1351                                 talloc_free(mem_ctx);
1352                                 return ret;
1353                         }
1354                         ret = ldb_init_module_chain(ldb_module_get_ctx(module), partition->module);
1355                         if (ret != LDB_SUCCESS) {
1356                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1357                                                        "partition_init: "
1358                                                        "initialising backend for %s failed: %s", 
1359                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1360                                 talloc_free(mem_ctx);
1361                                 return ret;
1362                         }
1363                 }
1364         }
1365
1366         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1367         if (ret != LDB_SUCCESS) {
1368                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1369                         "partition: Unable to register control with rootdse!\n");
1370                 return LDB_ERR_OPERATIONS_ERROR;
1371         }
1372
1373         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1374         if (ret != LDB_SUCCESS) {
1375                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1376                         "partition: Unable to register control with rootdse!\n");
1377                 return LDB_ERR_OPERATIONS_ERROR;
1378         }
1379
1380         talloc_free(mem_ctx);
1381         return ldb_next_init(module);
1382 }
1383
1384 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1385         .name              = "partition",
1386         .init_context      = partition_init,
1387         .search            = partition_search,
1388         .add               = partition_add,
1389         .modify            = partition_modify,
1390         .del               = partition_delete,
1391         .rename            = partition_rename,
1392         .extended          = partition_extended,
1393         .start_transaction = partition_start_trans,
1394         .prepare_commit    = partition_prepare_commit,
1395         .end_transaction   = partition_end_trans,
1396         .del_transaction   = partition_del_trans,
1397 };