Merge branch 'master' into wspp-schema
[nivanova/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v3 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 3 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  *  Author: Stefan Metzmacher
34  */
35
36 #include "includes.h"
37 #include "ldb_private.h"
38 #include "dsdb/samdb/samdb.h"
39
40 struct partition_private_data {
41         struct dsdb_control_current_partition **partitions;
42         struct ldb_dn **replicate;
43 };
44
45 struct part_request {
46         struct ldb_module *module;
47         struct ldb_request *req;
48 };
49
50 struct partition_context {
51         struct ldb_module *module;
52         struct ldb_request *req;
53         bool got_success;
54
55         struct part_request *part_req;
56         int num_requests;
57         int finished_requests;
58 };
59
60 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
61 {
62         struct partition_context *ac;
63
64         ac = talloc_zero(req, struct partition_context);
65         if (ac == NULL) {
66                 ldb_set_errstring(module->ldb, "Out of Memory");
67                 return NULL;
68         }
69
70         ac->module = module;
71         ac->req = req;
72
73         return ac;
74 }
75
76 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
77         while (module && module->ops->op == NULL) module = module->next; \
78 } while (0)
79
80 #define PARTITION_FIND_OP(module, op) do { \
81         PARTITION_FIND_OP_NOERROR(module, op); \
82         if (module == NULL) { \
83                 ldb_asprintf_errstring(module->ldb, \
84                         "Unable to find backend operation for " #op ); \
85                 return LDB_ERR_OPERATIONS_ERROR; \
86         } \
87 } while (0)
88
89 /*
90  *    helper functions to call the next module in chain
91  *    */
92
93 static int partition_request(struct ldb_module *module, struct ldb_request *request)
94 {
95         int ret;
96         switch (request->operation) {
97         case LDB_SEARCH:
98                 PARTITION_FIND_OP(module, search);
99                 ret = module->ops->search(module, request);
100                 break;
101         case LDB_ADD:
102                 PARTITION_FIND_OP(module, add);
103                 ret = module->ops->add(module, request);
104                 break;
105         case LDB_MODIFY:
106                 PARTITION_FIND_OP(module, modify);
107                 ret = module->ops->modify(module, request);
108                 break;
109         case LDB_DELETE:
110                 PARTITION_FIND_OP(module, del);
111                 ret = module->ops->del(module, request);
112                 break;
113         case LDB_RENAME:
114                 PARTITION_FIND_OP(module, rename);
115                 ret = module->ops->rename(module, request);
116                 break;
117         case LDB_EXTENDED:
118                 PARTITION_FIND_OP(module, extended);
119                 ret = module->ops->extended(module, request);
120                 break;
121         default:
122                 PARTITION_FIND_OP(module, request);
123                 ret = module->ops->request(module, request);
124                 break;
125         }
126         if (ret == LDB_SUCCESS) {
127                 return ret;
128         }
129         if (!ldb_errstring(module->ldb)) {
130                 /* Set a default error string, to place the blame somewhere */
131                 ldb_asprintf_errstring(module->ldb,
132                                         "error in module %s: %s (%d)",
133                                         module->ops->name,
134                                         ldb_strerror(ret), ret);
135         }
136         return ret;
137 }
138
139 static struct dsdb_control_current_partition *find_partition(struct partition_private_data *data,
140                                                              struct ldb_dn *dn)
141 {
142         int i;
143
144         /* Look at base DN */
145         /* Figure out which partition it is under */
146         /* Skip the lot if 'data' isn't here yet (initialistion) */
147         for (i=0; data && data->partitions && data->partitions[i]; i++) {
148                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
149                         return data->partitions[i];
150                 }
151         }
152
153         return NULL;
154 };
155
156 /**
157  * fire the caller's callback for every entry, but only send 'done' once.
158  */
159 static int partition_req_callback(struct ldb_request *req,
160                                   struct ldb_reply *ares)
161 {
162         struct partition_context *ac;
163         struct ldb_module *module;
164         struct ldb_request *nreq;
165         int ret;
166
167         ac = talloc_get_type(req->context, struct partition_context);
168
169         if (!ares) {
170                 return ldb_module_done(ac->req, NULL, NULL,
171                                         LDB_ERR_OPERATIONS_ERROR);
172         }
173
174         if (ares->error != LDB_SUCCESS && !ac->got_success) {
175                 return ldb_module_done(ac->req, ares->controls,
176                                         ares->response, ares->error);
177         }
178
179         switch (ares->type) {
180         case LDB_REPLY_REFERRAL:
181                 /* ignore referrals for now */
182                 break;
183
184         case LDB_REPLY_ENTRY:
185                 if (ac->req->operation != LDB_SEARCH) {
186                         ldb_set_errstring(ac->module->ldb,
187                                 "partition_req_callback:"
188                                 " Unsupported reply type for this request");
189                         return ldb_module_done(ac->req, NULL, NULL,
190                                                 LDB_ERR_OPERATIONS_ERROR);
191                 }
192                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
193
194         case LDB_REPLY_DONE:
195                 if (ares->error == LDB_SUCCESS) {
196                         ac->got_success = true;
197                 }
198                 if (ac->req->operation == LDB_EXTENDED) {
199                         /* FIXME: check for ares->response, replmd does not fill it ! */
200                         if (ares->response) {
201                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
202                                         ldb_set_errstring(ac->module->ldb,
203                                                           "partition_req_callback:"
204                                                           " Unknown extended reply, "
205                                                           "only supports START_TLS");
206                                         talloc_free(ares);
207                                         return ldb_module_done(ac->req, NULL, NULL,
208                                                                 LDB_ERR_OPERATIONS_ERROR);
209                                 }
210                         }
211                 }
212
213                 ac->finished_requests++;
214                 if (ac->finished_requests == ac->num_requests) {
215                         /* this was the last one, call callback */
216                         return ldb_module_done(ac->req, ares->controls,
217                                                ares->response, 
218                                                ac->got_success?LDB_SUCCESS:ares->error);
219                 }
220
221                 /* not the last, now call the next one */
222                 module = ac->part_req[ac->finished_requests].module;
223                 nreq = ac->part_req[ac->finished_requests].req;
224
225                 ret = partition_request(module, nreq);
226                 if (ret != LDB_SUCCESS) {
227                         talloc_free(ares);
228                         return ldb_module_done(ac->req, NULL, NULL, ret);
229                 }
230
231                 break;
232         }
233
234         talloc_free(ares);
235         return LDB_SUCCESS;
236 }
237
238 static int partition_prep_request(struct partition_context *ac,
239                                   struct dsdb_control_current_partition *partition)
240 {
241         int ret;
242         struct ldb_request *req;
243
244         ac->part_req = talloc_realloc(ac, ac->part_req,
245                                         struct part_request,
246                                         ac->num_requests + 1);
247         if (ac->part_req == NULL) {
248                 ldb_oom(ac->module->ldb);
249                 return LDB_ERR_OPERATIONS_ERROR;
250         }
251
252         switch (ac->req->operation) {
253         case LDB_SEARCH:
254                 ret = ldb_build_search_req_ex(&req, ac->module->ldb,
255                                         ac->part_req,
256                                         ac->req->op.search.base,
257                                         ac->req->op.search.scope,
258                                         ac->req->op.search.tree,
259                                         ac->req->op.search.attrs,
260                                         ac->req->controls,
261                                         ac, partition_req_callback,
262                                         ac->req);
263                 break;
264         case LDB_ADD:
265                 ret = ldb_build_add_req(&req, ac->module->ldb, ac->part_req,
266                                         ac->req->op.add.message,
267                                         ac->req->controls,
268                                         ac, partition_req_callback,
269                                         ac->req);
270                 break;
271         case LDB_MODIFY:
272                 ret = ldb_build_mod_req(&req, ac->module->ldb, ac->part_req,
273                                         ac->req->op.mod.message,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 break;
278         case LDB_DELETE:
279                 ret = ldb_build_del_req(&req, ac->module->ldb, ac->part_req,
280                                         ac->req->op.del.dn,
281                                         ac->req->controls,
282                                         ac, partition_req_callback,
283                                         ac->req);
284                 break;
285         case LDB_RENAME:
286                 ret = ldb_build_rename_req(&req, ac->module->ldb, ac->part_req,
287                                         ac->req->op.rename.olddn,
288                                         ac->req->op.rename.newdn,
289                                         ac->req->controls,
290                                         ac, partition_req_callback,
291                                         ac->req);
292                 break;
293         case LDB_EXTENDED:
294                 ret = ldb_build_extended_req(&req, ac->module->ldb,
295                                         ac->part_req,
296                                         ac->req->op.extended.oid,
297                                         ac->req->op.extended.data,
298                                         ac->req->controls,
299                                         ac, partition_req_callback,
300                                         ac->req);
301                 break;
302         default:
303                 ldb_set_errstring(ac->module->ldb,
304                                   "Unsupported request type!");
305                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
306         }
307
308         if (ret != LDB_SUCCESS) {
309                 return ret;
310         }
311
312         ac->part_req[ac->num_requests].req = req;
313
314         if (ac->req->controls) {
315                 req->controls = talloc_memdup(req, ac->req->controls,
316                                         talloc_get_size(ac->req->controls));
317                 if (req->controls == NULL) {
318                         ldb_oom(ac->module->ldb);
319                         return LDB_ERR_OPERATIONS_ERROR;
320                 }
321         }
322
323         if (partition) {
324                 ac->part_req[ac->num_requests].module = partition->module;
325
326                 ret = ldb_request_add_control(req,
327                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
328                                         false, partition);
329                 if (ret != LDB_SUCCESS) {
330                         return ret;
331                 }
332
333                 if (req->operation == LDB_SEARCH) {
334                         /* If the search is for 'more' than this partition,
335                          * then change the basedn, so a remote LDAP server
336                          * doesn't object */
337                         if (ldb_dn_compare_base(partition->dn,
338                                                 req->op.search.base) != 0) {
339                                 req->op.search.base = partition->dn;
340                         }
341                 }
342
343         } else {
344                 /* make sure you put the NEXT module here, or
345                  * partition_request() will simply loop forever on itself */
346                 ac->part_req[ac->num_requests].module = ac->module->next;
347         }
348
349         ac->num_requests++;
350
351         return LDB_SUCCESS;
352 }
353
354 static int partition_call_first(struct partition_context *ac)
355 {
356         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
357 }
358
359 /**
360  * Send a request down to all the partitions
361  */
362 static int partition_send_all(struct ldb_module *module, 
363                               struct partition_context *ac, 
364                               struct ldb_request *req) 
365 {
366         int i;
367         struct partition_private_data *data = talloc_get_type(module->private_data, 
368                                                               struct partition_private_data);
369         int ret = partition_prep_request(ac, NULL);
370         if (ret != LDB_SUCCESS) {
371                 return ret;
372         }
373         for (i=0; data && data->partitions && data->partitions[i]; i++) {
374                 ret = partition_prep_request(ac, data->partitions[i]);
375                 if (ret != LDB_SUCCESS) {
376                         return ret;
377                 }
378         }
379
380         /* fire the first one */
381         return partition_call_first(ac);
382 }
383
384 /**
385  * Figure out which backend a request needs to be aimed at.  Some
386  * requests must be replicated to all backends
387  */
388 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
389 {
390         struct partition_context *ac;
391         unsigned i;
392         int ret;
393         struct dsdb_control_current_partition *partition;
394         struct partition_private_data *data = talloc_get_type(module->private_data, 
395                                                               struct partition_private_data);
396         if (!data || !data->partitions) {
397                 return ldb_next_request(module, req);
398         }
399         
400         if (req->operation != LDB_SEARCH) {
401                 /* Is this a special DN, we need to replicate to every backend? */
402                 for (i=0; data->replicate && data->replicate[i]; i++) {
403                         if (ldb_dn_compare(data->replicate[i], 
404                                            dn) == 0) {
405                                 
406                                 ac = partition_init_ctx(module, req);
407                                 if (!ac) {
408                                         return LDB_ERR_OPERATIONS_ERROR;
409                                 }
410                                 
411                                 return partition_send_all(module, ac, req);
412                         }
413                 }
414         }
415
416         /* Otherwise, we need to find the partition to fire it to */
417
418         /* Find partition */
419         partition = find_partition(data, dn);
420         if (!partition) {
421                 /*
422                  * if we haven't found a matching partition
423                  * pass the request to the main ldb
424                  *
425                  * TODO: we should maybe return an error here
426                  *       if it's not a special dn
427                  */
428
429                 return ldb_next_request(module, req);
430         }
431
432         ac = partition_init_ctx(module, req);
433         if (!ac) {
434                 return LDB_ERR_OPERATIONS_ERROR;
435         }
436
437         /* we need to add a control but we never touch the original request */
438         ret = partition_prep_request(ac, partition);
439         if (ret != LDB_SUCCESS) {
440                 return ret;
441         }
442
443         /* fire the first one */
444         return partition_call_first(ac);
445 }
446
447 /* search */
448 static int partition_search(struct ldb_module *module, struct ldb_request *req)
449 {
450         struct ldb_control **saved_controls;
451         
452         /* Find backend */
453         struct partition_private_data *data = talloc_get_type(module->private_data, 
454                                                               struct partition_private_data);
455         /* issue request */
456
457         /* (later) consider if we should be searching multiple
458          * partitions (for 'invisible' partition behaviour */
459
460         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
461         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
462         
463         struct ldb_search_options_control *search_options = NULL;
464         if (search_control) {
465                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
466         }
467
468         /* Remove the domain_scope control, so we don't confuse a backend server */
469         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
470                 ldb_oom(module->ldb);
471                 return LDB_ERR_OPERATIONS_ERROR;
472         }
473
474         /*
475          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
476          * down as uncritical to make windows 2008 dcpromo happy.
477          */
478         if (search_control) {
479                 search_control->critical = 0;
480         }
481
482         /* TODO:
483            Generate referrals (look for a partition under this DN) if we don't have the above control specified
484         */
485         
486         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
487                 int ret, i;
488                 struct partition_context *ac;
489                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
490                         /* We have processed this flag, so we are done with this control now */
491
492                         /* Remove search control, so we don't confuse a backend server */
493                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
494                                 ldb_oom(module->ldb);
495                                 return LDB_ERR_OPERATIONS_ERROR;
496                         }
497                 }
498                 ac = partition_init_ctx(module, req);
499                 if (!ac) {
500                         return LDB_ERR_OPERATIONS_ERROR;
501                 }
502
503                 /* Search from the base DN */
504                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
505                         return partition_send_all(module, ac, req);
506                 }
507                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
508                         bool match = false, stop = false;
509                         /* Find all partitions under the search base 
510                            
511                            we match if:
512
513                               1) the DN we are looking for exactly matches the partition
514                              or
515                               2) the DN we are looking for is a parent of the partition and it isn't
516                                  a scope base search
517                              or
518                               3) the DN we are looking for is a child of the partition
519                          */
520                         if (ldb_dn_compare(data->partitions[i]->dn, req->op.search.base) == 0) {
521                                 match = true;
522                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
523                                         stop = true;
524                                 }
525                         }
526                         if (!match && 
527                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0 &&
528                              req->op.search.scope != LDB_SCOPE_BASE)) {
529                                 match = true;
530                         }
531                         if (!match &&
532                             ldb_dn_compare_base(data->partitions[i]->dn, req->op.search.base) == 0) {
533                                 match = true;
534                                 stop = true; /* note that this relies on partition ordering */
535                         }
536                         if (match) {
537                                 ret = partition_prep_request(ac, data->partitions[i]);
538                                 if (ret != LDB_SUCCESS) {
539                                         return ret;
540                                 }
541                         }
542                         if (stop) break;
543                 }
544
545                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
546                 if (ac->num_requests == 0) {
547                         talloc_free(ac);
548                         return ldb_next_request(module, req);
549                 }
550
551                 /* fire the first one */
552                 return partition_call_first(ac);
553
554         } else {
555                 /* Handle this like all other requests */
556                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
557                         /* We have processed this flag, so we are done with this control now */
558
559                         /* Remove search control, so we don't confuse a backend server */
560                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
561                                 ldb_oom(module->ldb);
562                                 return LDB_ERR_OPERATIONS_ERROR;
563                         }
564                 }
565
566                 return partition_replicate(module, req, req->op.search.base);
567         }
568 }
569
570 /* add */
571 static int partition_add(struct ldb_module *module, struct ldb_request *req)
572 {
573         return partition_replicate(module, req, req->op.add.message->dn);
574 }
575
576 /* modify */
577 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
578 {
579         return partition_replicate(module, req, req->op.mod.message->dn);
580 }
581
582 /* delete */
583 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
584 {
585         return partition_replicate(module, req, req->op.del.dn);
586 }
587
588 /* rename */
589 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
590 {
591         /* Find backend */
592         struct dsdb_control_current_partition *backend, *backend2;
593         
594         struct partition_private_data *data = talloc_get_type(module->private_data, 
595                                                               struct partition_private_data);
596
597         /* Skip the lot if 'data' isn't here yet (initialization) */
598         if (!data) {
599                 return LDB_ERR_OPERATIONS_ERROR;
600         }
601
602         backend = find_partition(data, req->op.rename.olddn);
603         backend2 = find_partition(data, req->op.rename.newdn);
604
605         if ((backend && !backend2) || (!backend && backend2)) {
606                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
607         }
608
609         if (backend != backend2) {
610                 ldb_asprintf_errstring(module->ldb, 
611                                        "Cannot rename from %s in %s to %s in %s: %s",
612                                        ldb_dn_get_linearized(req->op.rename.olddn),
613                                        ldb_dn_get_linearized(backend->dn),
614                                        ldb_dn_get_linearized(req->op.rename.newdn),
615                                        ldb_dn_get_linearized(backend2->dn),
616                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
617                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
618         }
619
620         return partition_replicate(module, req, req->op.rename.olddn);
621 }
622
623 /* start a transaction */
624 static int partition_start_trans(struct ldb_module *module)
625 {
626         int i, ret;
627         struct partition_private_data *data = talloc_get_type(module->private_data, 
628                                                               struct partition_private_data);
629         /* Look at base DN */
630         /* Figure out which partition it is under */
631         /* Skip the lot if 'data' isn't here yet (initialization) */
632         ret = ldb_next_start_trans(module);
633         if (ret != LDB_SUCCESS) {
634                 return ret;
635         }
636
637         for (i=0; data && data->partitions && data->partitions[i]; i++) {
638                 struct ldb_module *next = data->partitions[i]->module;
639                 PARTITION_FIND_OP(next, start_transaction);
640
641                 ret = next->ops->start_transaction(next);
642                 if (ret != LDB_SUCCESS) {
643                         /* Back it out, if it fails on one */
644                         for (i--; i >= 0; i--) {
645                                 next = data->partitions[i]->module;
646                                 PARTITION_FIND_OP(next, del_transaction);
647
648                                 next->ops->del_transaction(next);
649                         }
650                         ldb_next_del_trans(module);
651                         return ret;
652                 }
653         }
654         return LDB_SUCCESS;
655 }
656
657 /* end a transaction */
658 static int partition_end_trans(struct ldb_module *module)
659 {
660         int i, ret, final_ret;
661         struct partition_private_data *data = talloc_get_type(module->private_data, 
662                                                               struct partition_private_data);
663         ret = ldb_next_end_trans(module);
664         if (ret != LDB_SUCCESS) {
665                 return ret;
666         }
667
668         /* if the backend has a prepare_commit op then use that, to ensure
669            that all partitions are committed safely together */
670         for (i=0; data && data->partitions && data->partitions[i]; i++) {
671                 struct ldb_module *next_end = data->partitions[i]->module;
672                 struct ldb_module *next_prepare = data->partitions[i]->module;
673                 struct ldb_module *next_del = data->partitions[i]->module;
674
675                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
676                 if (next_prepare == NULL) {
677                         continue;
678                 }
679
680                 PARTITION_FIND_OP(next_end, end_transaction);
681                 PARTITION_FIND_OP(next_del, del_transaction);
682
683                 if (next_end != next_prepare || next_del != next_end) {
684                         ldb_asprintf_errstring(module->ldb, "ERROR: Mismatch between prepare and commit ops in ldb module");
685                         return LDB_ERR_OPERATIONS_ERROR;
686                 }
687                 
688                 ret = next_prepare->ops->prepare_commit(next_prepare);
689                 if (ret != LDB_SUCCESS) {
690                         /* if one fails, cancel all but this one */
691                         int j;
692                         for (j=0; data->partitions[j]; j++) {
693                                 if (j == i) continue;
694                                 next_del = data->partitions[j]->module;
695                                 PARTITION_FIND_OP(next_del, del_transaction);
696                                 next_del->ops->del_transaction(next_del);
697                         }
698                         ldb_next_del_trans(module);
699                         return ret;
700                 }
701         }
702
703         /* Look at base DN */
704         /* Figure out which partition it is under */
705         /* Skip the lot if 'data' isn't here yet (initialistion) */
706         final_ret = LDB_SUCCESS;
707
708         for (i=0; data && data->partitions && data->partitions[i]; i++) {
709                 struct ldb_module *next = data->partitions[i]->module;
710                 PARTITION_FIND_OP(next, end_transaction);
711
712                 ret = next->ops->end_transaction(next);
713                 if (ret != LDB_SUCCESS) {
714                         /* this should only be happening if we had a serious 
715                            OS or hardware error */
716                         ldb_asprintf_errstring(module->ldb, "ERROR: partition commit error");
717                         final_ret = ret;
718                 }
719         }
720
721         return final_ret;
722 }
723
724 /* delete a transaction */
725 static int partition_del_trans(struct ldb_module *module)
726 {
727         int i, ret, ret2 = LDB_SUCCESS;
728         struct partition_private_data *data = talloc_get_type(module->private_data, 
729                                                               struct partition_private_data);
730         ret = ldb_next_del_trans(module);
731         if (ret != LDB_SUCCESS) {
732                 ret2 = ret;
733         }
734
735         /* Look at base DN */
736         /* Figure out which partition it is under */
737         /* Skip the lot if 'data' isn't here yet (initialistion) */
738         for (i=0; data && data->partitions && data->partitions[i]; i++) {
739                 struct ldb_module *next = data->partitions[i]->module;
740                 PARTITION_FIND_OP(next, del_transaction);
741
742                 ret = next->ops->del_transaction(next);
743                 if (ret != LDB_SUCCESS) {
744                         ret2 = ret;
745                 }
746         }
747         return ret2;
748 }
749
750
751 /* FIXME: This function is still semi-async */
752 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
753 {
754         int i, ret;
755         uint64_t seq_number = 0;
756         uint64_t timestamp_sequence = 0;
757         uint64_t timestamp = 0;
758         struct partition_private_data *data = talloc_get_type(module->private_data, 
759                                                               struct partition_private_data);
760         struct ldb_seqnum_request *seq;
761         struct ldb_seqnum_result *seqr;
762         struct ldb_request *treq;
763         struct ldb_seqnum_request *tseq;
764         struct ldb_seqnum_result *tseqr;
765         struct ldb_extended *ext;
766         struct ldb_result *res;
767
768         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
769
770         switch (seq->type) {
771         case LDB_SEQ_NEXT:
772         case LDB_SEQ_HIGHEST_SEQ:
773                 res = talloc_zero(req, struct ldb_result);
774                 if (res == NULL) {
775                         return LDB_ERR_OPERATIONS_ERROR;
776                 }
777                 tseq = talloc_zero(res, struct ldb_seqnum_request);
778                 if (tseq == NULL) {
779                         talloc_free(res);
780                         return LDB_ERR_OPERATIONS_ERROR;
781                 }
782                 tseq->type = seq->type;
783
784                 ret = ldb_build_extended_req(&treq, module->ldb, res,
785                                              LDB_EXTENDED_SEQUENCE_NUMBER,
786                                              tseq,
787                                              NULL,
788                                              res,
789                                              ldb_extended_default_callback,
790                                              NULL);
791                 ret = ldb_next_request(module, treq);
792                 if (ret == LDB_SUCCESS) {
793                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
794                 }
795                 if (ret != LDB_SUCCESS) {
796                         talloc_free(res);
797                         return ret;
798                 }
799                 seqr = talloc_get_type(res->extended->data,
800                                         struct ldb_seqnum_result);
801                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
802                         timestamp_sequence = seqr->seq_num;
803                 } else {
804                         seq_number += seqr->seq_num;
805                 }
806                 talloc_free(res);
807
808                 /* Skip the lot if 'data' isn't here yet (initialistion) */
809                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
810
811                         res = talloc_zero(req, struct ldb_result);
812                         if (res == NULL) {
813                                 return LDB_ERR_OPERATIONS_ERROR;
814                         }
815                         tseq = talloc_zero(res, struct ldb_seqnum_request);
816                         if (tseq == NULL) {
817                                 talloc_free(res);
818                                 return LDB_ERR_OPERATIONS_ERROR;
819                         }
820                         tseq->type = seq->type;
821
822                         ret = ldb_build_extended_req(&treq, module->ldb, res,
823                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
824                                                      tseq,
825                                                      NULL,
826                                                      res,
827                                                      ldb_extended_default_callback,
828                                                      NULL);
829                         if (ret != LDB_SUCCESS) {
830                                 talloc_free(res);
831                                 return ret;
832                         }
833
834                         ret = ldb_request_add_control(treq,
835                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
836                                                       false, data->partitions[i]);
837                         if (ret != LDB_SUCCESS) {
838                                 talloc_free(res);
839                                 return ret;
840                         }
841
842                         ret = partition_request(data->partitions[i]->module, treq);
843                         if (ret != LDB_SUCCESS) {
844                                 talloc_free(res);
845                                 return ret;
846                         }
847                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
848                         if (ret != LDB_SUCCESS) {
849                                 talloc_free(res);
850                                 return ret;
851                         }
852                         tseqr = talloc_get_type(res->extended->data,
853                                                 struct ldb_seqnum_result);
854                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
855                                 timestamp_sequence = MAX(timestamp_sequence,
856                                                          tseqr->seq_num);
857                         } else {
858                                 seq_number += tseqr->seq_num;
859                         }
860                         talloc_free(res);
861                 }
862                 /* fall through */
863         case LDB_SEQ_HIGHEST_TIMESTAMP:
864
865                 res = talloc_zero(req, struct ldb_result);
866                 if (res == NULL) {
867                         return LDB_ERR_OPERATIONS_ERROR;
868                 }
869
870                 tseq = talloc_zero(res, struct ldb_seqnum_request);
871                 if (tseq == NULL) {
872                         talloc_free(res);
873                         return LDB_ERR_OPERATIONS_ERROR;
874                 }
875                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
876
877                 ret = ldb_build_extended_req(&treq, module->ldb, res,
878                                              LDB_EXTENDED_SEQUENCE_NUMBER,
879                                              tseq,
880                                              NULL,
881                                              res,
882                                              ldb_extended_default_callback,
883                                              NULL);
884                 if (ret != LDB_SUCCESS) {
885                         talloc_free(res);
886                         return ret;
887                 }
888
889                 ret = ldb_next_request(module, treq);
890                 if (ret != LDB_SUCCESS) {
891                         talloc_free(res);
892                         return ret;
893                 }
894                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
895                 if (ret != LDB_SUCCESS) {
896                         talloc_free(res);
897                         return ret;
898                 }
899
900                 tseqr = talloc_get_type(res->extended->data,
901                                            struct ldb_seqnum_result);
902                 timestamp = tseqr->seq_num;
903
904                 talloc_free(res);
905
906                 /* Skip the lot if 'data' isn't here yet (initialistion) */
907                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
908
909                         res = talloc_zero(req, struct ldb_result);
910                         if (res == NULL) {
911                                 return LDB_ERR_OPERATIONS_ERROR;
912                         }
913
914                         tseq = talloc_zero(res, struct ldb_seqnum_request);
915                         if (tseq == NULL) {
916                                 talloc_free(res);
917                                 return LDB_ERR_OPERATIONS_ERROR;
918                         }
919                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
920
921                         ret = ldb_build_extended_req(&treq, module->ldb, res,
922                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
923                                                      tseq,
924                                                      NULL,
925                                                      res,
926                                                      ldb_extended_default_callback,
927                                                      NULL);
928                         if (ret != LDB_SUCCESS) {
929                                 talloc_free(res);
930                                 return ret;
931                         }
932
933                         ret = ldb_request_add_control(treq,
934                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
935                                                       false, data->partitions[i]);
936                         if (ret != LDB_SUCCESS) {
937                                 talloc_free(res);
938                                 return ret;
939                         }
940
941                         ret = partition_request(data->partitions[i]->module, treq);
942                         if (ret != LDB_SUCCESS) {
943                                 talloc_free(res);
944                                 return ret;
945                         }
946                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
947                         if (ret != LDB_SUCCESS) {
948                                 talloc_free(res);
949                                 return ret;
950                         }
951
952                         tseqr = talloc_get_type(res->extended->data,
953                                                   struct ldb_seqnum_result);
954                         timestamp = MAX(timestamp, tseqr->seq_num);
955
956                         talloc_free(res);
957                 }
958
959                 break;
960         }
961
962         ext = talloc_zero(req, struct ldb_extended);
963         if (!ext) {
964                 return LDB_ERR_OPERATIONS_ERROR;
965         }
966         seqr = talloc_zero(ext, struct ldb_seqnum_result);
967         if (seqr == NULL) {
968                 talloc_free(ext);
969                 return LDB_ERR_OPERATIONS_ERROR;
970         }
971         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
972         ext->data = seqr;
973
974         switch (seq->type) {
975         case LDB_SEQ_NEXT:
976         case LDB_SEQ_HIGHEST_SEQ:
977
978                 /* Has someone above set a timebase sequence? */
979                 if (timestamp_sequence) {
980                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
981                 } else {
982                         seqr->seq_num = seq_number;
983                 }
984
985                 if (timestamp_sequence > seqr->seq_num) {
986                         seqr->seq_num = timestamp_sequence;
987                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
988                 }
989
990                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
991                 break;
992         case LDB_SEQ_HIGHEST_TIMESTAMP:
993                 seqr->seq_num = timestamp;
994                 break;
995         }
996
997         if (seq->type == LDB_SEQ_NEXT) {
998                 seqr->seq_num++;
999         }
1000
1001         /* send request done */
1002         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1003 }
1004
1005 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1006 {
1007         struct dsdb_extended_replicated_objects *ext;
1008
1009         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1010         if (!ext) {
1011                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
1012                 return LDB_ERR_PROTOCOL_ERROR;
1013         }
1014
1015         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1016                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1017                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1018                 return LDB_ERR_PROTOCOL_ERROR;
1019         }
1020
1021         return partition_replicate(module, req, ext->partition_dn);
1022 }
1023
1024 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1025 {
1026         struct dsdb_control_current_partition *partition;
1027         struct partition_private_data *data;
1028         struct ldb_dn *schema_dn;
1029         struct partition_context *ac;
1030         int ret;
1031
1032         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1033         if (!schema_dn) {
1034                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1035                 return LDB_ERR_PROTOCOL_ERROR;
1036         }
1037
1038         data = talloc_get_type(module->private_data, struct partition_private_data);
1039         if (!data) {
1040                 return LDB_ERR_OPERATIONS_ERROR;
1041         }
1042         
1043         partition = find_partition( data, schema_dn );
1044         if (!partition) {
1045                 return ldb_next_request(module, req);
1046         }
1047
1048         ac = partition_init_ctx(module, req);
1049         if (!ac) {
1050                 return LDB_ERR_OPERATIONS_ERROR;
1051         }
1052
1053         /* we need to add a control but we never touch the original request */
1054         ret = partition_prep_request(ac, partition);
1055         if (ret != LDB_SUCCESS) {
1056                 return ret;
1057         }
1058
1059         /* fire the first one */
1060         return partition_call_first(ac);
1061 }
1062
1063
1064 /* extended */
1065 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1066 {
1067         struct partition_private_data *data;
1068         struct partition_context *ac;
1069
1070         data = talloc_get_type(module->private_data, struct partition_private_data);
1071         if (!data || !data->partitions) {
1072                 return ldb_next_request(module, req);
1073         }
1074
1075         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1076                 return partition_sequence_number(module, req);
1077         }
1078
1079         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1080                 return partition_extended_replicated_objects(module, req);
1081         }
1082
1083         /* forward schemaUpdateNow operation to schema_fsmo module*/
1084         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1085                 return partition_extended_schema_update_now( module, req );
1086         }       
1087
1088         /* 
1089          * as the extended operation has no dn
1090          * we need to send it to all partitions
1091          */
1092
1093         ac = partition_init_ctx(module, req);
1094         if (!ac) {
1095                 return LDB_ERR_OPERATIONS_ERROR;
1096         }
1097
1098         return partition_send_all(module, ac, req);
1099 }
1100
1101 static int partition_sort_compare(const void *v1, const void *v2)
1102 {
1103         const struct dsdb_control_current_partition *p1;
1104         const struct dsdb_control_current_partition *p2;
1105
1106         p1 = *((struct dsdb_control_current_partition * const*)v1);
1107         p2 = *((struct dsdb_control_current_partition * const*)v2);
1108
1109         return ldb_dn_compare(p1->dn, p2->dn);
1110 }
1111
1112 static int partition_init(struct ldb_module *module)
1113 {
1114         int ret, i;
1115         TALLOC_CTX *mem_ctx = talloc_new(module);
1116         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1117         struct ldb_result *res;
1118         struct ldb_message *msg;
1119         struct ldb_message_element *partition_attributes;
1120         struct ldb_message_element *replicate_attributes;
1121         struct ldb_message_element *modules_attributes;
1122
1123         struct partition_private_data *data;
1124
1125         if (!mem_ctx) {
1126                 return LDB_ERR_OPERATIONS_ERROR;
1127         }
1128
1129         data = talloc(mem_ctx, struct partition_private_data);
1130         if (data == NULL) {
1131                 return LDB_ERR_OPERATIONS_ERROR;
1132         }
1133
1134         ret = ldb_search(module->ldb, mem_ctx, &res,
1135                          ldb_dn_new(mem_ctx, module->ldb, "@PARTITION"),
1136                          LDB_SCOPE_BASE, attrs, NULL);
1137         if (ret != LDB_SUCCESS) {
1138                 talloc_free(mem_ctx);
1139                 return ret;
1140         }
1141         if (res->count == 0) {
1142                 talloc_free(mem_ctx);
1143                 return ldb_next_init(module);
1144         }
1145
1146         if (res->count > 1) {
1147                 talloc_free(mem_ctx);
1148                 return LDB_ERR_CONSTRAINT_VIOLATION;
1149         }
1150
1151         msg = res->msgs[0];
1152
1153         partition_attributes = ldb_msg_find_element(msg, "partition");
1154         if (!partition_attributes) {
1155                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
1156                 talloc_free(mem_ctx);
1157                 return LDB_ERR_CONSTRAINT_VIOLATION;
1158         }
1159         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
1160         if (!data->partitions) {
1161                 talloc_free(mem_ctx);
1162                 return LDB_ERR_OPERATIONS_ERROR;
1163         }
1164         for (i=0; i < partition_attributes->num_values; i++) {
1165                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1166                 char *p = strchr(base, ':');
1167                 if (!p) {
1168                         ldb_asprintf_errstring(module->ldb, 
1169                                                 "partition_init: "
1170                                                 "invalid form for partition record (missing ':'): %s", base);
1171                         talloc_free(mem_ctx);
1172                         return LDB_ERR_CONSTRAINT_VIOLATION;
1173                 }
1174                 p[0] = '\0';
1175                 p++;
1176                 if (!p[0]) {
1177                         ldb_asprintf_errstring(module->ldb, 
1178                                                 "partition_init: "
1179                                                 "invalid form for partition record (missing backend database): %s", base);
1180                         talloc_free(mem_ctx);
1181                         return LDB_ERR_CONSTRAINT_VIOLATION;
1182                 }
1183                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
1184                 if (!data->partitions[i]) {
1185                         talloc_free(mem_ctx);
1186                         return LDB_ERR_OPERATIONS_ERROR;
1187                 }
1188                 data->partitions[i]->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1189
1190                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], module->ldb, base);
1191                 if (!data->partitions[i]->dn) {
1192                         ldb_asprintf_errstring(module->ldb, 
1193                                                 "partition_init: invalid DN in partition record: %s", base);
1194                         talloc_free(mem_ctx);
1195                         return LDB_ERR_CONSTRAINT_VIOLATION;
1196                 }
1197
1198                 data->partitions[i]->backend = samdb_relative_path(module->ldb, 
1199                                                                    data->partitions[i], 
1200                                                                    p);
1201                 if (!data->partitions[i]->backend) {
1202                         ldb_asprintf_errstring(module->ldb, 
1203                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1204                         talloc_free(mem_ctx);                   
1205                 }
1206                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
1207                 if (ret != LDB_SUCCESS) {
1208                         talloc_free(mem_ctx);
1209                         return ret;
1210                 }
1211         }
1212         data->partitions[i] = NULL;
1213
1214         /* sort these into order, most to least specific */
1215         qsort(data->partitions, partition_attributes->num_values,
1216               sizeof(*data->partitions), partition_sort_compare);
1217
1218         for (i=0; data->partitions[i]; i++) {
1219                 struct ldb_request *req;
1220                 req = talloc_zero(mem_ctx, struct ldb_request);
1221                 if (req == NULL) {
1222                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1223                         talloc_free(mem_ctx);
1224                         return LDB_ERR_OPERATIONS_ERROR;
1225                 }
1226                 
1227                 req->operation = LDB_REQ_REGISTER_PARTITION;
1228                 req->op.reg_partition.dn = data->partitions[i]->dn;
1229                 req->callback = ldb_op_default_callback;
1230
1231                 ldb_set_timeout(module->ldb, req, 0);
1232
1233                 req->handle = ldb_handle_new(req, module->ldb);
1234                 if (req->handle == NULL) {
1235                         return LDB_ERR_OPERATIONS_ERROR;
1236                 }
1237                 
1238                 ret = ldb_request(module->ldb, req);
1239                 if (ret == LDB_SUCCESS) {
1240                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1241                 }
1242                 if (ret != LDB_SUCCESS) {
1243                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1244                         talloc_free(mem_ctx);
1245                         return LDB_ERR_OTHER;
1246                 }
1247                 talloc_free(req);
1248         }
1249
1250         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1251         if (!replicate_attributes) {
1252                 data->replicate = NULL;
1253         } else {
1254                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1255                 if (!data->replicate) {
1256                         talloc_free(mem_ctx);
1257                         return LDB_ERR_OPERATIONS_ERROR;
1258                 }
1259
1260                 for (i=0; i < replicate_attributes->num_values; i++) {
1261                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, module->ldb, &replicate_attributes->values[i]);
1262                         if (!ldb_dn_validate(data->replicate[i])) {
1263                                 ldb_asprintf_errstring(module->ldb, 
1264                                                         "partition_init: "
1265                                                         "invalid DN in partition replicate record: %s", 
1266                                                         replicate_attributes->values[i].data);
1267                                 talloc_free(mem_ctx);
1268                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1269                         }
1270                 }
1271                 data->replicate[i] = NULL;
1272         }
1273
1274         /* Make the private data available to any searches the modules may trigger in initialisation */
1275         module->private_data = data;
1276         talloc_steal(module, data);
1277         
1278         modules_attributes = ldb_msg_find_element(msg, "modules");
1279         if (modules_attributes) {
1280                 for (i=0; i < modules_attributes->num_values; i++) {
1281                         struct ldb_dn *base_dn;
1282                         int partition_idx;
1283                         struct dsdb_control_current_partition *partition = NULL;
1284                         const char **modules = NULL;
1285
1286                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1287                         char *p = strchr(base, ':');
1288                         if (!p) {
1289                                 ldb_asprintf_errstring(module->ldb, 
1290                                                         "partition_init: "
1291                                                         "invalid form for partition module record (missing ':'): %s", base);
1292                                 talloc_free(mem_ctx);
1293                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1294                         }
1295                         p[0] = '\0';
1296                         p++;
1297                         if (!p[0]) {
1298                                 ldb_asprintf_errstring(module->ldb, 
1299                                                         "partition_init: "
1300                                                         "invalid form for partition module record (missing backend database): %s", base);
1301                                 talloc_free(mem_ctx);
1302                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1303                         }
1304
1305                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
1306                                                                p);
1307                         
1308                         base_dn = ldb_dn_new(mem_ctx, module->ldb, base);
1309                         if (!ldb_dn_validate(base_dn)) {
1310                                 talloc_free(mem_ctx);
1311                                 return LDB_ERR_OPERATIONS_ERROR;
1312                         }
1313                         
1314                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1315                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
1316                                         partition = data->partitions[partition_idx];
1317                                         break;
1318                                 }
1319                         }
1320                         
1321                         if (!partition) {
1322                                 ldb_asprintf_errstring(module->ldb, 
1323                                                         "partition_init: "
1324                                                         "invalid form for partition module record (no such partition): %s", base);
1325                                 talloc_free(mem_ctx);
1326                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1327                         }
1328                         
1329                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
1330                         if (ret != LDB_SUCCESS) {
1331                                 ldb_asprintf_errstring(module->ldb, 
1332                                                        "partition_init: "
1333                                                        "loading backend for %s failed: %s", 
1334                                                        base, ldb_errstring(module->ldb));
1335                                 talloc_free(mem_ctx);
1336                                 return ret;
1337                         }
1338                         ret = ldb_init_module_chain(module->ldb, partition->module);
1339                         if (ret != LDB_SUCCESS) {
1340                                 ldb_asprintf_errstring(module->ldb, 
1341                                                        "partition_init: "
1342                                                        "initialising backend for %s failed: %s", 
1343                                                        base, ldb_errstring(module->ldb));
1344                                 talloc_free(mem_ctx);
1345                                 return ret;
1346                         }
1347                 }
1348         }
1349
1350         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1351         if (ret != LDB_SUCCESS) {
1352                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1353                         "partition: Unable to register control with rootdse!\n");
1354                 return LDB_ERR_OPERATIONS_ERROR;
1355         }
1356
1357         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1358         if (ret != LDB_SUCCESS) {
1359                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1360                         "partition: Unable to register control with rootdse!\n");
1361                 return LDB_ERR_OPERATIONS_ERROR;
1362         }
1363
1364         talloc_free(mem_ctx);
1365         return ldb_next_init(module);
1366 }
1367
1368 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1369         .name              = "partition",
1370         .init_context      = partition_init,
1371         .search            = partition_search,
1372         .add               = partition_add,
1373         .modify            = partition_modify,
1374         .del               = partition_delete,
1375         .rename            = partition_rename,
1376         .extended          = partition_extended,
1377         .start_transaction = partition_start_trans,
1378         .end_transaction   = partition_end_trans,
1379         .del_transaction   = partition_del_trans,
1380 };