greatly simplify the transaction processing in the partition module
[nivanova/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v3 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 3 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  *  Author: Stefan Metzmacher
34  */
35
36 #include "includes.h"
37 #include "lib/ldb/include/ldb.h"
38 #include "lib/ldb/include/ldb_errors.h"
39 #include "lib/ldb/include/ldb_module.h"
40 #include "lib/ldb/include/ldb_private.h"
41 #include "dsdb/samdb/samdb.h"
42
43 struct dsdb_partition {
44         struct ldb_module *module;
45         struct dsdb_control_current_partition *ctrl;
46 };
47
48 struct partition_private_data {
49         struct dsdb_partition **partitions;
50         struct ldb_dn **replicate;
51 };
52
53 struct part_request {
54         struct ldb_module *module;
55         struct ldb_request *req;
56 };
57
58 struct partition_context {
59         struct ldb_module *module;
60         struct ldb_request *req;
61         bool got_success;
62
63         struct part_request *part_req;
64         int num_requests;
65         int finished_requests;
66 };
67
68 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
69 {
70         struct partition_context *ac;
71
72         ac = talloc_zero(req, struct partition_context);
73         if (ac == NULL) {
74                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
75                 return NULL;
76         }
77
78         ac->module = module;
79         ac->req = req;
80
81         return ac;
82 }
83
84 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
85         while (module && module->ops->op == NULL) module = module->next; \
86 } while (0)
87
88 #define PARTITION_FIND_OP(module, op) do { \
89         PARTITION_FIND_OP_NOERROR(module, op); \
90         if (module == NULL) { \
91                 ldb_asprintf_errstring(ldb_module_get_ctx(module), \
92                         "Unable to find backend operation for " #op ); \
93                 return LDB_ERR_OPERATIONS_ERROR; \
94         } \
95 } while (0)
96
97 /*
98  *    helper functions to call the next module in chain
99  *    */
100
101 static int partition_request(struct ldb_module *module, struct ldb_request *request)
102 {
103         int ret;
104         switch (request->operation) {
105         case LDB_SEARCH:
106                 PARTITION_FIND_OP(module, search);
107                 ret = module->ops->search(module, request);
108                 break;
109         case LDB_ADD:
110                 PARTITION_FIND_OP(module, add);
111                 ret = module->ops->add(module, request);
112                 break;
113         case LDB_MODIFY:
114                 PARTITION_FIND_OP(module, modify);
115                 ret = module->ops->modify(module, request);
116                 break;
117         case LDB_DELETE:
118                 PARTITION_FIND_OP(module, del);
119                 ret = module->ops->del(module, request);
120                 break;
121         case LDB_RENAME:
122                 PARTITION_FIND_OP(module, rename);
123                 ret = module->ops->rename(module, request);
124                 break;
125         case LDB_EXTENDED:
126                 PARTITION_FIND_OP(module, extended);
127                 ret = module->ops->extended(module, request);
128                 break;
129         default:
130                 PARTITION_FIND_OP(module, request);
131                 ret = module->ops->request(module, request);
132                 break;
133         }
134         if (ret == LDB_SUCCESS) {
135                 return ret;
136         }
137         if (!ldb_errstring(ldb_module_get_ctx(module))) {
138                 /* Set a default error string, to place the blame somewhere */
139                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
140                                         "error in module %s: %s (%d)",
141                                         module->ops->name,
142                                         ldb_strerror(ret), ret);
143         }
144         return ret;
145 }
146
147 static struct dsdb_partition *find_partition(struct partition_private_data *data,
148                                              struct ldb_dn *dn,
149                                              struct ldb_request *req)
150 {
151         int i;
152         struct ldb_control *partition_ctrl;
153
154         /* see if the request has the partition DN specified in a
155          * control. The repl_meta_data module can specify this to
156          * ensure that replication happens to the right partition
157          */
158         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
159         if (partition_ctrl) {
160                 const struct dsdb_control_current_partition *partition;
161                 partition = talloc_get_type(partition_ctrl->data,
162                                             struct dsdb_control_current_partition);
163                 if (partition != NULL) {
164                         dn = partition->dn;
165                 }
166         }
167
168         /* Look at base DN */
169         /* Figure out which partition it is under */
170         /* Skip the lot if 'data' isn't here yet (initialisation) */
171         for (i=0; data && data->partitions && data->partitions[i]; i++) {
172                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
173                         return data->partitions[i];
174                 }
175         }
176
177         return NULL;
178 }
179
180 /**
181  * fire the caller's callback for every entry, but only send 'done' once.
182  */
183 static int partition_req_callback(struct ldb_request *req,
184                                   struct ldb_reply *ares)
185 {
186         struct partition_context *ac;
187         struct ldb_module *module;
188         struct ldb_request *nreq;
189         int ret;
190
191         ac = talloc_get_type(req->context, struct partition_context);
192
193         if (!ares) {
194                 return ldb_module_done(ac->req, NULL, NULL,
195                                         LDB_ERR_OPERATIONS_ERROR);
196         }
197
198         if (ares->error != LDB_SUCCESS && !ac->got_success) {
199                 return ldb_module_done(ac->req, ares->controls,
200                                         ares->response, ares->error);
201         }
202
203         switch (ares->type) {
204         case LDB_REPLY_REFERRAL:
205                 /* ignore referrals for now */
206                 break;
207
208         case LDB_REPLY_ENTRY:
209                 if (ac->req->operation != LDB_SEARCH) {
210                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
211                                 "partition_req_callback:"
212                                 " Unsupported reply type for this request");
213                         return ldb_module_done(ac->req, NULL, NULL,
214                                                 LDB_ERR_OPERATIONS_ERROR);
215                 }
216                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
217
218         case LDB_REPLY_DONE:
219                 if (ares->error == LDB_SUCCESS) {
220                         ac->got_success = true;
221                 }
222                 if (ac->req->operation == LDB_EXTENDED) {
223                         /* FIXME: check for ares->response, replmd does not fill it ! */
224                         if (ares->response) {
225                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
226                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
227                                                           "partition_req_callback:"
228                                                           " Unknown extended reply, "
229                                                           "only supports START_TLS");
230                                         talloc_free(ares);
231                                         return ldb_module_done(ac->req, NULL, NULL,
232                                                                 LDB_ERR_OPERATIONS_ERROR);
233                                 }
234                         }
235                 }
236
237                 ac->finished_requests++;
238                 if (ac->finished_requests == ac->num_requests) {
239                         /* this was the last one, call callback */
240                         return ldb_module_done(ac->req, ares->controls,
241                                                ares->response, 
242                                                ac->got_success?LDB_SUCCESS:ares->error);
243                 }
244
245                 /* not the last, now call the next one */
246                 module = ac->part_req[ac->finished_requests].module;
247                 nreq = ac->part_req[ac->finished_requests].req;
248
249                 ret = partition_request(module, nreq);
250                 if (ret != LDB_SUCCESS) {
251                         talloc_free(ares);
252                         return ldb_module_done(ac->req, NULL, NULL, ret);
253                 }
254
255                 break;
256         }
257
258         talloc_free(ares);
259         return LDB_SUCCESS;
260 }
261
262 static int partition_prep_request(struct partition_context *ac,
263                                   struct dsdb_partition *partition)
264 {
265         int ret;
266         struct ldb_request *req;
267
268         ac->part_req = talloc_realloc(ac, ac->part_req,
269                                         struct part_request,
270                                         ac->num_requests + 1);
271         if (ac->part_req == NULL) {
272                 ldb_oom(ldb_module_get_ctx(ac->module));
273                 return LDB_ERR_OPERATIONS_ERROR;
274         }
275
276         switch (ac->req->operation) {
277         case LDB_SEARCH:
278                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
279                                         ac->part_req,
280                                         ac->req->op.search.base,
281                                         ac->req->op.search.scope,
282                                         ac->req->op.search.tree,
283                                         ac->req->op.search.attrs,
284                                         ac->req->controls,
285                                         ac, partition_req_callback,
286                                         ac->req);
287                 break;
288         case LDB_ADD:
289                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
290                                         ac->req->op.add.message,
291                                         ac->req->controls,
292                                         ac, partition_req_callback,
293                                         ac->req);
294                 break;
295         case LDB_MODIFY:
296                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
297                                         ac->req->op.mod.message,
298                                         ac->req->controls,
299                                         ac, partition_req_callback,
300                                         ac->req);
301                 break;
302         case LDB_DELETE:
303                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
304                                         ac->req->op.del.dn,
305                                         ac->req->controls,
306                                         ac, partition_req_callback,
307                                         ac->req);
308                 break;
309         case LDB_RENAME:
310                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
311                                         ac->req->op.rename.olddn,
312                                         ac->req->op.rename.newdn,
313                                         ac->req->controls,
314                                         ac, partition_req_callback,
315                                         ac->req);
316                 break;
317         case LDB_EXTENDED:
318                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
319                                         ac->part_req,
320                                         ac->req->op.extended.oid,
321                                         ac->req->op.extended.data,
322                                         ac->req->controls,
323                                         ac, partition_req_callback,
324                                         ac->req);
325                 break;
326         default:
327                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
328                                   "Unsupported request type!");
329                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
330         }
331
332         if (ret != LDB_SUCCESS) {
333                 return ret;
334         }
335
336         ac->part_req[ac->num_requests].req = req;
337
338         if (ac->req->controls) {
339                 req->controls = talloc_memdup(req, ac->req->controls,
340                                         talloc_get_size(ac->req->controls));
341                 if (req->controls == NULL) {
342                         ldb_oom(ldb_module_get_ctx(ac->module));
343                         return LDB_ERR_OPERATIONS_ERROR;
344                 }
345         }
346
347         if (partition) {
348                 ac->part_req[ac->num_requests].module = partition->module;
349
350                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
351                         ret = ldb_request_add_control(req,
352                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
353                                                       false, partition->ctrl);
354                         if (ret != LDB_SUCCESS) {
355                                 return ret;
356                         }
357                 }
358
359                 if (req->operation == LDB_SEARCH) {
360                         /* If the search is for 'more' than this partition,
361                          * then change the basedn, so a remote LDAP server
362                          * doesn't object */
363                         if (ldb_dn_compare_base(partition->ctrl->dn,
364                                                 req->op.search.base) != 0) {
365                                 req->op.search.base = partition->ctrl->dn;
366                         }
367                 }
368
369         } else {
370                 /* make sure you put the NEXT module here, or
371                  * partition_request() will simply loop forever on itself */
372                 ac->part_req[ac->num_requests].module = ac->module->next;
373         }
374
375         ac->num_requests++;
376
377         return LDB_SUCCESS;
378 }
379
380 static int partition_call_first(struct partition_context *ac)
381 {
382         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
383 }
384
385 /**
386  * Send a request down to all the partitions
387  */
388 static int partition_send_all(struct ldb_module *module, 
389                               struct partition_context *ac, 
390                               struct ldb_request *req) 
391 {
392         int i;
393         struct partition_private_data *data = talloc_get_type(module->private_data, 
394                                                               struct partition_private_data);
395         int ret = partition_prep_request(ac, NULL);
396         if (ret != LDB_SUCCESS) {
397                 return ret;
398         }
399         for (i=0; data && data->partitions && data->partitions[i]; i++) {
400                 ret = partition_prep_request(ac, data->partitions[i]);
401                 if (ret != LDB_SUCCESS) {
402                         return ret;
403                 }
404         }
405
406         /* fire the first one */
407         return partition_call_first(ac);
408 }
409
410 /**
411  * Figure out which backend a request needs to be aimed at.  Some
412  * requests must be replicated to all backends
413  */
414 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
415 {
416         struct partition_context *ac;
417         unsigned i;
418         int ret;
419         struct dsdb_partition *partition;
420         struct partition_private_data *data = talloc_get_type(module->private_data, 
421                                                               struct partition_private_data);
422         if (!data || !data->partitions) {
423                 return ldb_next_request(module, req);
424         }
425         
426         if (req->operation != LDB_SEARCH) {
427                 /* Is this a special DN, we need to replicate to every backend? */
428                 for (i=0; data->replicate && data->replicate[i]; i++) {
429                         if (ldb_dn_compare(data->replicate[i], 
430                                            dn) == 0) {
431                                 
432                                 ac = partition_init_ctx(module, req);
433                                 if (!ac) {
434                                         return LDB_ERR_OPERATIONS_ERROR;
435                                 }
436                                 
437                                 return partition_send_all(module, ac, req);
438                         }
439                 }
440         }
441
442         /* Otherwise, we need to find the partition to fire it to */
443
444         /* Find partition */
445         partition = find_partition(data, dn, req);
446         if (!partition) {
447                 /*
448                  * if we haven't found a matching partition
449                  * pass the request to the main ldb
450                  *
451                  * TODO: we should maybe return an error here
452                  *       if it's not a special dn
453                  */
454
455                 return ldb_next_request(module, req);
456         }
457
458         ac = partition_init_ctx(module, req);
459         if (!ac) {
460                 return LDB_ERR_OPERATIONS_ERROR;
461         }
462
463         /* we need to add a control but we never touch the original request */
464         ret = partition_prep_request(ac, partition);
465         if (ret != LDB_SUCCESS) {
466                 return ret;
467         }
468
469         /* fire the first one */
470         return partition_call_first(ac);
471 }
472
473 /* search */
474 static int partition_search(struct ldb_module *module, struct ldb_request *req)
475 {
476         struct ldb_control **saved_controls;
477         
478         /* Find backend */
479         struct partition_private_data *data = talloc_get_type(module->private_data, 
480                                                               struct partition_private_data);
481         /* issue request */
482
483         /* (later) consider if we should be searching multiple
484          * partitions (for 'invisible' partition behaviour */
485
486         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
487         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
488         
489         struct ldb_search_options_control *search_options = NULL;
490         if (search_control) {
491                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
492         }
493
494         /* Remove the domain_scope control, so we don't confuse a backend server */
495         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
496                 ldb_oom(ldb_module_get_ctx(module));
497                 return LDB_ERR_OPERATIONS_ERROR;
498         }
499
500         /*
501          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
502          * down as uncritical to make windows 2008 dcpromo happy.
503          */
504         if (search_control) {
505                 search_control->critical = 0;
506         }
507
508         /* TODO:
509            Generate referrals (look for a partition under this DN) if we don't have the above control specified
510         */
511         
512         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
513                 int ret, i;
514                 struct partition_context *ac;
515                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
516                         /* We have processed this flag, so we are done with this control now */
517
518                         /* Remove search control, so we don't confuse a backend server */
519                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
520                                 ldb_oom(ldb_module_get_ctx(module));
521                                 return LDB_ERR_OPERATIONS_ERROR;
522                         }
523                 }
524                 ac = partition_init_ctx(module, req);
525                 if (!ac) {
526                         return LDB_ERR_OPERATIONS_ERROR;
527                 }
528
529                 /* Search from the base DN */
530                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
531                         return partition_send_all(module, ac, req);
532                 }
533                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
534                         bool match = false, stop = false;
535                         /* Find all partitions under the search base 
536                            
537                            we match if:
538
539                               1) the DN we are looking for exactly matches the partition
540                              or
541                               2) the DN we are looking for is a parent of the partition and it isn't
542                                  a scope base search
543                              or
544                               3) the DN we are looking for is a child of the partition
545                          */
546                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
547                                 match = true;
548                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
549                                         stop = true;
550                                 }
551                         }
552                         if (!match && 
553                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
554                              req->op.search.scope != LDB_SCOPE_BASE)) {
555                                 match = true;
556                         }
557                         if (!match &&
558                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
559                                 match = true;
560                                 stop = true; /* note that this relies on partition ordering */
561                         }
562                         if (match) {
563                                 ret = partition_prep_request(ac, data->partitions[i]);
564                                 if (ret != LDB_SUCCESS) {
565                                         return ret;
566                                 }
567                         }
568                         if (stop) break;
569                 }
570
571                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
572                 if (ac->num_requests == 0) {
573                         talloc_free(ac);
574                         return ldb_next_request(module, req);
575                 }
576
577                 /* fire the first one */
578                 return partition_call_first(ac);
579
580         } else {
581                 /* Handle this like all other requests */
582                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
583                         /* We have processed this flag, so we are done with this control now */
584
585                         /* Remove search control, so we don't confuse a backend server */
586                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
587                                 ldb_oom(ldb_module_get_ctx(module));
588                                 return LDB_ERR_OPERATIONS_ERROR;
589                         }
590                 }
591
592                 return partition_replicate(module, req, req->op.search.base);
593         }
594 }
595
596 /* add */
597 static int partition_add(struct ldb_module *module, struct ldb_request *req)
598 {
599         return partition_replicate(module, req, req->op.add.message->dn);
600 }
601
602 /* modify */
603 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
604 {
605         return partition_replicate(module, req, req->op.mod.message->dn);
606 }
607
608 /* delete */
609 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
610 {
611         return partition_replicate(module, req, req->op.del.dn);
612 }
613
614 /* rename */
615 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
616 {
617         /* Find backend */
618         struct dsdb_partition *backend, *backend2;
619         
620         struct partition_private_data *data = talloc_get_type(module->private_data, 
621                                                               struct partition_private_data);
622
623         /* Skip the lot if 'data' isn't here yet (initialisation) */
624         if (!data) {
625                 return LDB_ERR_OPERATIONS_ERROR;
626         }
627
628         backend = find_partition(data, req->op.rename.olddn, req);
629         backend2 = find_partition(data, req->op.rename.newdn, req);
630
631         if ((backend && !backend2) || (!backend && backend2)) {
632                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
633         }
634
635         if (backend != backend2) {
636                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
637                                        "Cannot rename from %s in %s to %s in %s: %s",
638                                        ldb_dn_get_linearized(req->op.rename.olddn),
639                                        ldb_dn_get_linearized(backend->ctrl->dn),
640                                        ldb_dn_get_linearized(req->op.rename.newdn),
641                                        ldb_dn_get_linearized(backend2->ctrl->dn),
642                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
643                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
644         }
645
646         return partition_replicate(module, req, req->op.rename.olddn);
647 }
648
649 /* start a transaction */
650 static int partition_start_trans(struct ldb_module *module)
651 {
652         int i, ret;
653         struct partition_private_data *data = talloc_get_type(module->private_data, 
654                                                               struct partition_private_data);
655         /* Look at base DN */
656         /* Figure out which partition it is under */
657         /* Skip the lot if 'data' isn't here yet (initialization) */
658         ret = ldb_next_start_trans(module);
659         if (ret != LDB_SUCCESS) {
660                 return ret;
661         }
662
663         for (i=0; data && data->partitions && data->partitions[i]; i++) {
664                 struct ldb_module *next = data->partitions[i]->module;
665                 PARTITION_FIND_OP(next, start_transaction);
666
667                 ret = next->ops->start_transaction(next);
668                 if (ret != LDB_SUCCESS) {
669                         /* Back it out, if it fails on one */
670                         for (i--; i >= 0; i--) {
671                                 next = data->partitions[i]->module;
672                                 PARTITION_FIND_OP(next, del_transaction);
673
674                                 next->ops->del_transaction(next);
675                         }
676                         ldb_next_del_trans(module);
677                         return ret;
678                 }
679         }
680         return LDB_SUCCESS;
681 }
682
683 /* prepare for a commit */
684 static int partition_prepare_commit(struct ldb_module *module)
685 {
686         int i;
687         struct partition_private_data *data = talloc_get_type(module->private_data, 
688                                                               struct partition_private_data);
689
690         for (i=0; data && data->partitions && data->partitions[i]; i++) {
691                 struct ldb_module *next_prepare = data->partitions[i]->module;
692                 int ret;
693
694                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
695                 if (next_prepare == NULL) {
696                         continue;
697                 }
698
699                 ret = next_prepare->ops->prepare_commit(next_prepare);
700                 if (ret != LDB_SUCCESS) {
701                         return ret;
702                 }
703         }
704
705         return ldb_next_prepare_commit(module);
706 }
707
708
709 /* end a transaction */
710 static int partition_end_trans(struct ldb_module *module)
711 {
712         int i;
713         struct partition_private_data *data = talloc_get_type(module->private_data, 
714                                                               struct partition_private_data);
715         for (i=0; data && data->partitions && data->partitions[i]; i++) {
716                 struct ldb_module *next_end = data->partitions[i]->module;
717                 int ret;
718
719                 PARTITION_FIND_OP(next_end, end_transaction);
720
721                 ret = next_end->ops->end_transaction(next_end);
722                 if (ret != LDB_SUCCESS) {
723                         return ret;
724                 }
725         }
726
727         return ldb_next_end_trans(module);
728 }
729
730 /* delete a transaction */
731 static int partition_del_trans(struct ldb_module *module)
732 {
733         int i, ret, final_ret = LDB_SUCCESS;
734         struct partition_private_data *data = talloc_get_type(module->private_data, 
735                                                               struct partition_private_data);
736         for (i=0; data && data->partitions && data->partitions[i]; i++) {
737                 struct ldb_module *next = data->partitions[i]->module;
738                 PARTITION_FIND_OP(next, del_transaction);
739
740                 ret = next->ops->del_transaction(next);
741                 if (ret != LDB_SUCCESS) {
742                         final_ret = ret;
743                 }
744         }       
745
746         ret = ldb_next_del_trans(module);
747         if (ret != LDB_SUCCESS) {
748                 final_ret = ret;
749         }
750         return final_ret;
751 }
752
753
754 /* FIXME: This function is still semi-async */
755 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
756 {
757         int i, ret;
758         uint64_t seq_number = 0;
759         uint64_t timestamp_sequence = 0;
760         uint64_t timestamp = 0;
761         struct partition_private_data *data = talloc_get_type(module->private_data, 
762                                                               struct partition_private_data);
763         struct ldb_seqnum_request *seq;
764         struct ldb_seqnum_result *seqr;
765         struct ldb_request *treq;
766         struct ldb_seqnum_request *tseq;
767         struct ldb_seqnum_result *tseqr;
768         struct ldb_extended *ext;
769         struct ldb_result *res;
770
771         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
772
773         switch (seq->type) {
774         case LDB_SEQ_NEXT:
775         case LDB_SEQ_HIGHEST_SEQ:
776                 res = talloc_zero(req, struct ldb_result);
777                 if (res == NULL) {
778                         return LDB_ERR_OPERATIONS_ERROR;
779                 }
780                 tseq = talloc_zero(res, struct ldb_seqnum_request);
781                 if (tseq == NULL) {
782                         talloc_free(res);
783                         return LDB_ERR_OPERATIONS_ERROR;
784                 }
785                 tseq->type = seq->type;
786
787                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
788                                              LDB_EXTENDED_SEQUENCE_NUMBER,
789                                              tseq,
790                                              NULL,
791                                              res,
792                                              ldb_extended_default_callback,
793                                              NULL);
794                 ret = ldb_next_request(module, treq);
795                 if (ret == LDB_SUCCESS) {
796                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
797                 }
798                 if (ret != LDB_SUCCESS) {
799                         talloc_free(res);
800                         return ret;
801                 }
802                 seqr = talloc_get_type(res->extended->data,
803                                         struct ldb_seqnum_result);
804                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
805                         timestamp_sequence = seqr->seq_num;
806                 } else {
807                         seq_number += seqr->seq_num;
808                 }
809                 talloc_free(res);
810
811                 /* Skip the lot if 'data' isn't here yet (initialisation) */
812                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
813
814                         res = talloc_zero(req, struct ldb_result);
815                         if (res == NULL) {
816                                 return LDB_ERR_OPERATIONS_ERROR;
817                         }
818                         tseq = talloc_zero(res, struct ldb_seqnum_request);
819                         if (tseq == NULL) {
820                                 talloc_free(res);
821                                 return LDB_ERR_OPERATIONS_ERROR;
822                         }
823                         tseq->type = seq->type;
824
825                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
826                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
827                                                      tseq,
828                                                      NULL,
829                                                      res,
830                                                      ldb_extended_default_callback,
831                                                      NULL);
832                         if (ret != LDB_SUCCESS) {
833                                 talloc_free(res);
834                                 return ret;
835                         }
836
837                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
838                                 ret = ldb_request_add_control(treq,
839                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
840                                                               false, data->partitions[i]->ctrl);
841                                 if (ret != LDB_SUCCESS) {
842                                         talloc_free(res);
843                                         return ret;
844                                 }
845                         }
846
847                         ret = partition_request(data->partitions[i]->module, treq);
848                         if (ret != LDB_SUCCESS) {
849                                 talloc_free(res);
850                                 return ret;
851                         }
852                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
853                         if (ret != LDB_SUCCESS) {
854                                 talloc_free(res);
855                                 return ret;
856                         }
857                         tseqr = talloc_get_type(res->extended->data,
858                                                 struct ldb_seqnum_result);
859                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
860                                 timestamp_sequence = MAX(timestamp_sequence,
861                                                          tseqr->seq_num);
862                         } else {
863                                 seq_number += tseqr->seq_num;
864                         }
865                         talloc_free(res);
866                 }
867                 /* fall through */
868         case LDB_SEQ_HIGHEST_TIMESTAMP:
869
870                 res = talloc_zero(req, struct ldb_result);
871                 if (res == NULL) {
872                         return LDB_ERR_OPERATIONS_ERROR;
873                 }
874
875                 tseq = talloc_zero(res, struct ldb_seqnum_request);
876                 if (tseq == NULL) {
877                         talloc_free(res);
878                         return LDB_ERR_OPERATIONS_ERROR;
879                 }
880                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
881
882                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
883                                              LDB_EXTENDED_SEQUENCE_NUMBER,
884                                              tseq,
885                                              NULL,
886                                              res,
887                                              ldb_extended_default_callback,
888                                              NULL);
889                 if (ret != LDB_SUCCESS) {
890                         talloc_free(res);
891                         return ret;
892                 }
893
894                 ret = ldb_next_request(module, treq);
895                 if (ret != LDB_SUCCESS) {
896                         talloc_free(res);
897                         return ret;
898                 }
899                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
900                 if (ret != LDB_SUCCESS) {
901                         talloc_free(res);
902                         return ret;
903                 }
904
905                 tseqr = talloc_get_type(res->extended->data,
906                                            struct ldb_seqnum_result);
907                 timestamp = tseqr->seq_num;
908
909                 talloc_free(res);
910
911                 /* Skip the lot if 'data' isn't here yet (initialisation) */
912                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
913
914                         res = talloc_zero(req, struct ldb_result);
915                         if (res == NULL) {
916                                 return LDB_ERR_OPERATIONS_ERROR;
917                         }
918
919                         tseq = talloc_zero(res, struct ldb_seqnum_request);
920                         if (tseq == NULL) {
921                                 talloc_free(res);
922                                 return LDB_ERR_OPERATIONS_ERROR;
923                         }
924                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
925
926                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
927                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
928                                                      tseq,
929                                                      NULL,
930                                                      res,
931                                                      ldb_extended_default_callback,
932                                                      NULL);
933                         if (ret != LDB_SUCCESS) {
934                                 talloc_free(res);
935                                 return ret;
936                         }
937
938                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
939                                 ret = ldb_request_add_control(treq,
940                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
941                                                               false, data->partitions[i]->ctrl);
942                                 if (ret != LDB_SUCCESS) {
943                                         talloc_free(res);
944                                         return ret;
945                                 }
946                         }
947
948                         ret = partition_request(data->partitions[i]->module, treq);
949                         if (ret != LDB_SUCCESS) {
950                                 talloc_free(res);
951                                 return ret;
952                         }
953                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
954                         if (ret != LDB_SUCCESS) {
955                                 talloc_free(res);
956                                 return ret;
957                         }
958
959                         tseqr = talloc_get_type(res->extended->data,
960                                                   struct ldb_seqnum_result);
961                         timestamp = MAX(timestamp, tseqr->seq_num);
962
963                         talloc_free(res);
964                 }
965
966                 break;
967         }
968
969         ext = talloc_zero(req, struct ldb_extended);
970         if (!ext) {
971                 return LDB_ERR_OPERATIONS_ERROR;
972         }
973         seqr = talloc_zero(ext, struct ldb_seqnum_result);
974         if (seqr == NULL) {
975                 talloc_free(ext);
976                 return LDB_ERR_OPERATIONS_ERROR;
977         }
978         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
979         ext->data = seqr;
980
981         switch (seq->type) {
982         case LDB_SEQ_NEXT:
983         case LDB_SEQ_HIGHEST_SEQ:
984
985                 /* Has someone above set a timebase sequence? */
986                 if (timestamp_sequence) {
987                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
988                 } else {
989                         seqr->seq_num = seq_number;
990                 }
991
992                 if (timestamp_sequence > seqr->seq_num) {
993                         seqr->seq_num = timestamp_sequence;
994                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
995                 }
996
997                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
998                 break;
999         case LDB_SEQ_HIGHEST_TIMESTAMP:
1000                 seqr->seq_num = timestamp;
1001                 break;
1002         }
1003
1004         if (seq->type == LDB_SEQ_NEXT) {
1005                 seqr->seq_num++;
1006         }
1007
1008         /* send request done */
1009         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1010 }
1011
1012 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1013 {
1014         struct dsdb_partition *partition;
1015         struct partition_private_data *data;
1016         struct ldb_dn *schema_dn;
1017         struct partition_context *ac;
1018         int ret;
1019
1020         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1021         if (!schema_dn) {
1022                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1023                 return LDB_ERR_PROTOCOL_ERROR;
1024         }
1025
1026         data = talloc_get_type(module->private_data, struct partition_private_data);
1027         if (!data) {
1028                 return LDB_ERR_OPERATIONS_ERROR;
1029         }
1030         
1031         partition = find_partition( data, schema_dn, req);
1032         if (!partition) {
1033                 return ldb_next_request(module, req);
1034         }
1035
1036         ac = partition_init_ctx(module, req);
1037         if (!ac) {
1038                 return LDB_ERR_OPERATIONS_ERROR;
1039         }
1040
1041         /* we need to add a control but we never touch the original request */
1042         ret = partition_prep_request(ac, partition);
1043         if (ret != LDB_SUCCESS) {
1044                 return ret;
1045         }
1046
1047         /* fire the first one */
1048         ret =  partition_call_first(ac);
1049
1050         if (ret != LDB_SUCCESS){
1051                 return ret;
1052         }
1053
1054         return ldb_request_done(req, ret);
1055 }
1056
1057
1058 /* extended */
1059 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1060 {
1061         struct partition_private_data *data;
1062         struct partition_context *ac;
1063
1064         data = talloc_get_type(module->private_data, struct partition_private_data);
1065         if (!data || !data->partitions) {
1066                 return ldb_next_request(module, req);
1067         }
1068
1069         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1070                 return partition_sequence_number(module, req);
1071         }
1072
1073         /* forward schemaUpdateNow operation to schema_fsmo module*/
1074         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1075                 return partition_extended_schema_update_now( module, req );
1076         }       
1077
1078         /* 
1079          * as the extended operation has no dn
1080          * we need to send it to all partitions
1081          */
1082
1083         ac = partition_init_ctx(module, req);
1084         if (!ac) {
1085                 return LDB_ERR_OPERATIONS_ERROR;
1086         }
1087
1088         return partition_send_all(module, ac, req);
1089 }
1090
1091 static int partition_sort_compare(const void *v1, const void *v2)
1092 {
1093         const struct dsdb_partition *p1;
1094         const struct dsdb_partition *p2;
1095
1096         p1 = *((struct dsdb_partition * const*)v1);
1097         p2 = *((struct dsdb_partition * const*)v2);
1098
1099         return ldb_dn_compare(p1->ctrl->dn, p2->ctrl->dn);
1100 }
1101
1102 static int partition_init(struct ldb_module *module)
1103 {
1104         int ret, i;
1105         TALLOC_CTX *mem_ctx = talloc_new(module);
1106         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1107         struct ldb_result *res;
1108         struct ldb_message *msg;
1109         struct ldb_message_element *partition_attributes;
1110         struct ldb_message_element *replicate_attributes;
1111         struct ldb_message_element *modules_attributes;
1112
1113         struct partition_private_data *data;
1114
1115         if (!mem_ctx) {
1116                 return LDB_ERR_OPERATIONS_ERROR;
1117         }
1118
1119         data = talloc(mem_ctx, struct partition_private_data);
1120         if (data == NULL) {
1121                 return LDB_ERR_OPERATIONS_ERROR;
1122         }
1123
1124         ret = ldb_search(ldb_module_get_ctx(module), mem_ctx, &res,
1125                          ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), "@PARTITION"),
1126                          LDB_SCOPE_BASE, attrs, NULL);
1127         if (ret != LDB_SUCCESS) {
1128                 talloc_free(mem_ctx);
1129                 return ret;
1130         }
1131         if (res->count == 0) {
1132                 talloc_free(mem_ctx);
1133                 return ldb_next_init(module);
1134         }
1135
1136         if (res->count > 1) {
1137                 talloc_free(mem_ctx);
1138                 return LDB_ERR_CONSTRAINT_VIOLATION;
1139         }
1140
1141         msg = res->msgs[0];
1142
1143         partition_attributes = ldb_msg_find_element(msg, "partition");
1144         if (!partition_attributes) {
1145                 ldb_set_errstring(ldb_module_get_ctx(module), "partition_init: no partitions specified");
1146                 talloc_free(mem_ctx);
1147                 return LDB_ERR_CONSTRAINT_VIOLATION;
1148         }
1149         data->partitions = talloc_array(data, struct dsdb_partition *, partition_attributes->num_values + 1);
1150         if (!data->partitions) {
1151                 talloc_free(mem_ctx);
1152                 return LDB_ERR_OPERATIONS_ERROR;
1153         }
1154         for (i=0; i < partition_attributes->num_values; i++) {
1155                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1156                 char *p = strchr(base, ':');
1157                 const char *backend;
1158
1159                 if (!p) {
1160                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1161                                                 "partition_init: "
1162                                                 "invalid form for partition record (missing ':'): %s", base);
1163                         talloc_free(mem_ctx);
1164                         return LDB_ERR_CONSTRAINT_VIOLATION;
1165                 }
1166                 p[0] = '\0';
1167                 p++;
1168                 if (!p[0]) {
1169                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1170                                                 "partition_init: "
1171                                                 "invalid form for partition record (missing backend database): %s", base);
1172                         talloc_free(mem_ctx);
1173                         return LDB_ERR_CONSTRAINT_VIOLATION;
1174                 }
1175                 data->partitions[i] = talloc(data->partitions, struct dsdb_partition);
1176                 if (!data->partitions[i]) {
1177                         talloc_free(mem_ctx);
1178                         return LDB_ERR_OPERATIONS_ERROR;
1179                 }
1180                 data->partitions[i]->ctrl = talloc(data->partitions[i], struct dsdb_control_current_partition);
1181                 if (!data->partitions[i]->ctrl) {
1182                         talloc_free(mem_ctx);
1183                         return LDB_ERR_OPERATIONS_ERROR;
1184                 }
1185                 data->partitions[i]->ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1186                 data->partitions[i]->ctrl->dn = ldb_dn_new(data->partitions[i], ldb_module_get_ctx(module), base);
1187                 if (!data->partitions[i]->ctrl->dn) {
1188                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1189                                                 "partition_init: invalid DN in partition record: %s", base);
1190                         talloc_free(mem_ctx);
1191                         return LDB_ERR_CONSTRAINT_VIOLATION;
1192                 }
1193
1194                 backend = samdb_relative_path(ldb_module_get_ctx(module), 
1195                                                                    data->partitions[i], 
1196                                                                    p);
1197                 if (!backend) {
1198                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1199                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1200                         talloc_free(mem_ctx);                   
1201                 }
1202                 ret = ldb_connect_backend(ldb_module_get_ctx(module), backend, NULL, &data->partitions[i]->module);
1203                 if (ret != LDB_SUCCESS) {
1204                         talloc_free(mem_ctx);
1205                         return ret;
1206                 }
1207         }
1208         data->partitions[i] = NULL;
1209
1210         /* sort these into order, most to least specific */
1211         qsort(data->partitions, partition_attributes->num_values,
1212               sizeof(*data->partitions), partition_sort_compare);
1213
1214         for (i=0; data->partitions[i]; i++) {
1215                 struct ldb_request *req;
1216                 req = talloc_zero(mem_ctx, struct ldb_request);
1217                 if (req == NULL) {
1218                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1219                         talloc_free(mem_ctx);
1220                         return LDB_ERR_OPERATIONS_ERROR;
1221                 }
1222                 
1223                 req->operation = LDB_REQ_REGISTER_PARTITION;
1224                 req->op.reg_partition.dn = data->partitions[i]->ctrl->dn;
1225                 req->callback = ldb_op_default_callback;
1226
1227                 ldb_set_timeout(ldb_module_get_ctx(module), req, 0);
1228
1229                 req->handle = ldb_handle_new(req, ldb_module_get_ctx(module));
1230                 if (req->handle == NULL) {
1231                         return LDB_ERR_OPERATIONS_ERROR;
1232                 }
1233                 
1234                 ret = ldb_request(ldb_module_get_ctx(module), req);
1235                 if (ret == LDB_SUCCESS) {
1236                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1237                 }
1238                 if (ret != LDB_SUCCESS) {
1239                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1240                         talloc_free(mem_ctx);
1241                         return LDB_ERR_OTHER;
1242                 }
1243                 talloc_free(req);
1244         }
1245
1246         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1247         if (!replicate_attributes) {
1248                 data->replicate = NULL;
1249         } else {
1250                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1251                 if (!data->replicate) {
1252                         talloc_free(mem_ctx);
1253                         return LDB_ERR_OPERATIONS_ERROR;
1254                 }
1255
1256                 for (i=0; i < replicate_attributes->num_values; i++) {
1257                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, ldb_module_get_ctx(module), &replicate_attributes->values[i]);
1258                         if (!ldb_dn_validate(data->replicate[i])) {
1259                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1260                                                         "partition_init: "
1261                                                         "invalid DN in partition replicate record: %s", 
1262                                                         replicate_attributes->values[i].data);
1263                                 talloc_free(mem_ctx);
1264                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1265                         }
1266                 }
1267                 data->replicate[i] = NULL;
1268         }
1269
1270         /* Make the private data available to any searches the modules may trigger in initialisation */
1271         module->private_data = data;
1272         talloc_steal(module, data);
1273         
1274         modules_attributes = ldb_msg_find_element(msg, "modules");
1275         if (modules_attributes) {
1276                 for (i=0; i < modules_attributes->num_values; i++) {
1277                         struct ldb_dn *base_dn;
1278                         int partition_idx;
1279                         struct dsdb_partition *partition = NULL;
1280                         const char **modules = NULL;
1281
1282                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1283                         char *p = strchr(base, ':');
1284                         if (!p) {
1285                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1286                                                         "partition_init: "
1287                                                         "invalid form for partition module record (missing ':'): %s", base);
1288                                 talloc_free(mem_ctx);
1289                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1290                         }
1291                         p[0] = '\0';
1292                         p++;
1293                         if (!p[0]) {
1294                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1295                                                         "partition_init: "
1296                                                         "invalid form for partition module record (missing backend database): %s", base);
1297                                 talloc_free(mem_ctx);
1298                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1299                         }
1300
1301                         modules = ldb_modules_list_from_string(ldb_module_get_ctx(module), mem_ctx,
1302                                                                p);
1303                         
1304                         base_dn = ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), base);
1305                         if (!ldb_dn_validate(base_dn)) {
1306                                 talloc_free(mem_ctx);
1307                                 return LDB_ERR_OPERATIONS_ERROR;
1308                         }
1309                         
1310                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1311                                 if (ldb_dn_compare(data->partitions[partition_idx]->ctrl->dn, base_dn) == 0) {
1312                                         partition = data->partitions[partition_idx];
1313                                         break;
1314                                 }
1315                         }
1316                         
1317                         if (!partition) {
1318                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1319                                                         "partition_init: "
1320                                                         "invalid form for partition module record (no such partition): %s", base);
1321                                 talloc_free(mem_ctx);
1322                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1323                         }
1324                         
1325                         ret = ldb_load_modules_list(ldb_module_get_ctx(module), modules, partition->module, &partition->module);
1326                         if (ret != LDB_SUCCESS) {
1327                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1328                                                        "partition_init: "
1329                                                        "loading backend for %s failed: %s", 
1330                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1331                                 talloc_free(mem_ctx);
1332                                 return ret;
1333                         }
1334                         ret = ldb_init_module_chain(ldb_module_get_ctx(module), partition->module);
1335                         if (ret != LDB_SUCCESS) {
1336                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1337                                                        "partition_init: "
1338                                                        "initialising backend for %s failed: %s", 
1339                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1340                                 talloc_free(mem_ctx);
1341                                 return ret;
1342                         }
1343                 }
1344         }
1345
1346         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1347         if (ret != LDB_SUCCESS) {
1348                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1349                         "partition: Unable to register control with rootdse!\n");
1350                 return LDB_ERR_OPERATIONS_ERROR;
1351         }
1352
1353         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1354         if (ret != LDB_SUCCESS) {
1355                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1356                         "partition: Unable to register control with rootdse!\n");
1357                 return LDB_ERR_OPERATIONS_ERROR;
1358         }
1359
1360         talloc_free(mem_ctx);
1361         return ldb_next_init(module);
1362 }
1363
1364 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1365         .name              = "partition",
1366         .init_context      = partition_init,
1367         .search            = partition_search,
1368         .add               = partition_add,
1369         .modify            = partition_modify,
1370         .del               = partition_delete,
1371         .rename            = partition_rename,
1372         .extended          = partition_extended,
1373         .start_transaction = partition_start_trans,
1374         .prepare_commit    = partition_prepare_commit,
1375         .end_transaction   = partition_end_trans,
1376         .del_transaction   = partition_del_trans,
1377 };