r20809: rename struct partition into struct dsdb_control_current_partition
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v2 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 2 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 */
25
26 /*
27  *  Name: ldb
28  *
29  *  Component: ldb partitions module
30  *
31  *  Description: Implement LDAP partitions
32  *
33  *  Author: Andrew Bartlett
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb/include/includes.h"
39 #include "dsdb/samdb/samdb.h"
40
41 struct dsdb_control_current_partition {
42         struct ldb_module *module;
43         const char *backend;
44         struct ldb_dn *dn;
45 };
46 struct partition_private_data {
47         struct dsdb_control_current_partition **partitions;
48         struct ldb_dn **replicate;
49 };
50
51 struct partition_context {
52         struct ldb_module *module;
53         struct ldb_handle *handle;
54         struct ldb_request *orig_req;
55
56         struct ldb_request **down_req;
57         int num_requests;
58         int finished_requests;
59 };
60
61 static struct partition_context *partition_init_handle(struct ldb_request *req, struct ldb_module *module)
62 {
63         struct partition_context *ac;
64         struct ldb_handle *h;
65
66         h = talloc_zero(req, struct ldb_handle);
67         if (h == NULL) {
68                 ldb_set_errstring(module->ldb, "Out of Memory");
69                 return NULL;
70         }
71
72         h->module = module;
73
74         ac = talloc_zero(h, struct partition_context);
75         if (ac == NULL) {
76                 ldb_set_errstring(module->ldb, "Out of Memory");
77                 talloc_free(h);
78                 return NULL;
79         }
80
81         h->private_data = ac;
82
83         ac->module = module;
84         ac->handle = h;
85         ac->orig_req = req;
86
87         req->handle = h;
88
89         return ac;
90 }
91
92 struct ldb_module *make_module_for_next_request(TALLOC_CTX *mem_ctx, 
93                                                 struct ldb_context *ldb,
94                                                 struct ldb_module *module)
95 {
96         struct ldb_module *current;
97         static const struct ldb_module_ops ops; /* zero */
98         current = talloc_zero(mem_ctx, struct ldb_module);
99         if (current == NULL) {
100                 return module;
101         }
102         
103         current->ldb = ldb;
104         current->ops = &ops;
105         current->prev = NULL;
106         current->next = module;
107         return current;
108 }
109
110 struct ldb_module *find_backend(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
111 {
112         int i;
113         struct partition_private_data *data = talloc_get_type(module->private_data, 
114                                                               struct partition_private_data);
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialistion) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
120                         return make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
121                 }
122         }
123
124         return module;
125 };
126
127
128 /*
129   fire the caller's callback for every entry, but only send 'done' once.
130 */
131 static int partition_search_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134
135         if (!context || !ares) {
136                 ldb_set_errstring(ldb, "partition_search_callback: NULL Context or Result in 'search' callback");
137                 goto error;
138         }
139
140         ac = talloc_get_type(context, struct partition_context);
141
142         if (ares->type == LDB_REPLY_ENTRY) {
143                 return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
144         } else {
145                 ac->finished_requests++;
146                 if (ac->finished_requests == ac->num_requests) {
147                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
148                 } else {
149                         talloc_free(ares);
150                         return LDB_SUCCESS;
151                 }
152         }
153 error:
154         talloc_free(ares);
155         return LDB_ERR_OPERATIONS_ERROR;
156 }
157
158 /*
159   only fire the 'last' callback, and only for START-TLS for now 
160 */
161 static int partition_other_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
162 {
163         struct partition_context *ac;
164
165         if (!context) {
166                 ldb_set_errstring(ldb, "partition_other_callback: NULL Context in 'other' callback");
167                 goto error;
168         }
169
170         ac = talloc_get_type(context, struct partition_context);
171
172         if (!ac->orig_req->callback) {
173                 talloc_free(ares);
174                 return LDB_SUCCESS;
175         }
176
177         if (!ares 
178             || (ares->type == LDB_REPLY_EXTENDED 
179                 && strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID))) {
180                 ac->finished_requests++;
181                 if (ac->finished_requests == ac->num_requests) {
182                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
183                 }
184                 talloc_free(ares);
185                 return LDB_SUCCESS;
186         }
187         ldb_set_errstring(ldb, "partition_other_callback: Unknown reply type, only supports START_TLS");
188 error:
189         talloc_free(ares);
190         return LDB_ERR_OPERATIONS_ERROR;
191 }
192
193
194 static int partition_send_request(struct partition_context *ac, struct ldb_module *partition, 
195                                   struct ldb_dn *partition_base_dn)
196 {
197         int ret;
198         struct ldb_module *next = make_module_for_next_request(ac->module, ac->module->ldb, partition);
199         struct ldb_request *req;
200         ac->down_req = talloc_realloc(ac, ac->down_req, 
201                                         struct ldb_request *, ac->num_requests + 1);
202         if (!ac->down_req) {
203                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
204                 return LDB_ERR_OPERATIONS_ERROR;
205         }
206         req = ac->down_req[ac->num_requests] = talloc(ac, struct ldb_request);
207         if (req == NULL) {
208                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
209                 return LDB_ERR_OPERATIONS_ERROR;
210         }
211         
212         *ac->down_req[ac->num_requests] = *ac->orig_req; /* copy the request */
213
214         if (req->operation == LDB_SEARCH) {
215                 /* If the search is for 'more' than this partition,
216                  * then change the basedn, so a remote LDAP server
217                  * doesn't object */
218                 if (ldb_dn_compare_base(partition_base_dn, req->op.search.base) != 0) {
219                         req->op.search.base = partition_base_dn;
220                 }
221                 req->callback = partition_search_callback;
222                 req->context = ac;
223         } else {
224                 req->callback = partition_other_callback;
225                 req->context = ac;
226         }
227
228         /* Spray off search requests to all backends */
229         ret = ldb_next_request(next, req); 
230         if (ret != LDB_SUCCESS) {
231                 return ret;
232         }
233         
234         ac->num_requests++;
235         return LDB_SUCCESS;
236 }
237
238 /* Send a request down to all the partitions */
239 static int partition_send_all(struct ldb_module *module, 
240                               struct partition_context *ac, struct ldb_request *req) 
241 {
242         int i;
243         struct partition_private_data *data = talloc_get_type(module->private_data, 
244                                                               struct partition_private_data);
245         int ret = partition_send_request(ac, module->next, NULL);
246         if (ret != LDB_SUCCESS) {
247                 return ret;
248         }
249         for (i=0; data && data->partitions && data->partitions[i]; i++) {
250                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
251                 if (ret != LDB_SUCCESS) {
252                         return ret;
253                 }
254         }
255         return LDB_SUCCESS;
256 }
257
258 /* Figure out which backend a request needs to be aimed at.  Some
259  * requests must be replicated to all backends */
260 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
261 {
262         int i;
263         struct ldb_module *backend;
264         struct partition_private_data *data = talloc_get_type(module->private_data, 
265                                                               struct partition_private_data);
266         
267         /* Is this a special DN, we need to replicate to every backend? */
268         for (i=0; data->replicate && data->replicate[i]; i++) {
269                 if (ldb_dn_compare(data->replicate[i], 
270                                    dn) == 0) {
271                         struct partition_context *ac;
272                         
273                         ac = partition_init_handle(req, module);
274                         if (!ac) {
275                                 return LDB_ERR_OPERATIONS_ERROR;
276                         }
277                         
278                         return partition_send_all(module, ac, req);
279                 }
280         }
281
282         /* Otherwise, we need to find the backend to fire it to */
283
284         /* Find backend */
285         backend = find_backend(module, req, dn);
286         
287         /* issue request */
288         return ldb_next_request(backend, req);
289         
290 }
291
292 /* search */
293 static int partition_search(struct ldb_module *module, struct ldb_request *req)
294 {
295         /* Find backend */
296         struct partition_private_data *data = talloc_get_type(module->private_data, 
297                                                               struct partition_private_data);
298         /* issue request */
299
300         /* (later) consider if we should be searching multiple
301          * partitions (for 'invisible' partition behaviour */
302         if (ldb_get_opaque(module->ldb, "global_catalog")) {
303                 int ret, i;
304                 struct partition_context *ac;
305                 
306                 ac = partition_init_handle(req, module);
307                 if (!ac) {
308                         return LDB_ERR_OPERATIONS_ERROR;
309                 }
310
311                 /* Search from the base DN */
312                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
313                         return partition_send_all(module, ac, req);
314                 }
315                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
316                         /* Find all partitions under the search base */
317                         if (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0) {
318                                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
319                                 if (ret != LDB_SUCCESS) {
320                                         return ret;
321                                 }
322                         }
323                 }
324
325                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
326                 if (ac->num_requests == 0) {
327                         talloc_free(ac);
328                         return ldb_next_request(module, req);
329                 }
330                 
331                 return LDB_SUCCESS;
332         } else {
333                 struct ldb_module *backend = find_backend(module, req, req->op.search.base);
334         
335                 return ldb_next_request(backend, req);
336         }
337 }
338
339 /* add */
340 static int partition_add(struct ldb_module *module, struct ldb_request *req)
341 {
342         return partition_replicate(module, req, req->op.add.message->dn);
343 }
344
345 /* modify */
346 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
347 {
348         return partition_replicate(module, req, req->op.mod.message->dn);
349 }
350
351 /* delete */
352 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
353 {
354         return partition_replicate(module, req, req->op.del.dn);
355 }
356
357 /* rename */
358 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
359 {
360         /* Find backend */
361         struct ldb_module *backend = find_backend(module, req, req->op.rename.olddn);
362         struct ldb_module *backend2 = find_backend(module, req, req->op.rename.newdn);
363
364         if (backend->next != backend2->next) {
365                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
366         }
367
368         return partition_replicate(module, req, req->op.rename.olddn);
369 }
370
371 /* start a transaction */
372 static int partition_start_trans(struct ldb_module *module)
373 {
374         int i, ret;
375         struct partition_private_data *data = talloc_get_type(module->private_data, 
376                                                               struct partition_private_data);
377         /* Look at base DN */
378         /* Figure out which partition it is under */
379         /* Skip the lot if 'data' isn't here yet (initialistion) */
380         ret = ldb_next_start_trans(module);
381         if (ret != LDB_SUCCESS) {
382                 return ret;
383         }
384
385         for (i=0; data && data->partitions && data->partitions[i]; i++) {
386                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
387
388                 ret = ldb_next_start_trans(next);
389                 talloc_free(next);
390                 if (ret != LDB_SUCCESS) {
391                         /* Back it out, if it fails on one */
392                         for (i--; i >= 0; i--) {
393                                 next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
394                                 ldb_next_del_trans(next);
395                                 talloc_free(next);
396                         }
397                         return ret;
398                 }
399         }
400         return LDB_SUCCESS;
401 }
402
403 /* end a transaction */
404 static int partition_end_trans(struct ldb_module *module)
405 {
406         int i, ret, ret2 = LDB_SUCCESS;
407         struct partition_private_data *data = talloc_get_type(module->private_data, 
408                                                               struct partition_private_data);
409         ret = ldb_next_end_trans(module);
410         if (ret != LDB_SUCCESS) {
411                 return ret;
412         }
413
414         /* Look at base DN */
415         /* Figure out which partition it is under */
416         /* Skip the lot if 'data' isn't here yet (initialistion) */
417         for (i=0; data && data->partitions && data->partitions[i]; i++) {
418                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
419                 
420                 ret = ldb_next_end_trans(next);
421                 talloc_free(next);
422                 if (ret != LDB_SUCCESS) {
423                         ret2 = ret;
424                 }
425         }
426
427         if (ret != LDB_SUCCESS) {
428                 /* Back it out, if it fails on one */
429                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
430                         struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
431                         ldb_next_del_trans(next);
432                         talloc_free(next);
433                 }
434         }
435         return ret;
436 }
437
438 /* delete a transaction */
439 static int partition_del_trans(struct ldb_module *module)
440 {
441         int i, ret, ret2 = LDB_SUCCESS;
442         struct partition_private_data *data = talloc_get_type(module->private_data, 
443                                                               struct partition_private_data);
444         ret = ldb_next_del_trans(module);
445         if (ret != LDB_SUCCESS) {
446                 ret2 = ret;
447         }
448
449         /* Look at base DN */
450         /* Figure out which partition it is under */
451         /* Skip the lot if 'data' isn't here yet (initialistion) */
452         for (i=0; data && data->partitions && data->partitions[i]; i++) {
453                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
454                 
455                 ret = ldb_next_del_trans(next);
456                 talloc_free(next);
457                 if (ret != LDB_SUCCESS) {
458                         ret2 = ret;
459                 }
460         }
461         return ret2;
462 }
463
464 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
465 {
466         int i, ret;
467         uint64_t seq_number = 0;
468         uint64_t timestamp_sequence = 0;
469         uint64_t timestamp = 0;
470         struct partition_private_data *data = talloc_get_type(module->private_data, 
471                                                               struct partition_private_data);
472
473         switch (req->op.seq_num.type) {
474         case LDB_SEQ_NEXT:
475         case LDB_SEQ_HIGHEST_SEQ:
476                 ret = ldb_next_request(module, req);
477                 if (ret != LDB_SUCCESS) {
478                         return ret;
479                 }
480                 if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
481                         timestamp_sequence = req->op.seq_num.seq_num;
482                 } else {
483                         seq_number = seq_number + req->op.seq_num.seq_num;
484                 }
485
486                 /* Look at base DN */
487                 /* Figure out which partition it is under */
488                 /* Skip the lot if 'data' isn't here yet (initialistion) */
489                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
490                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
491                         
492                         ret = ldb_next_request(next, req);
493                         talloc_free(next);
494                         if (ret != LDB_SUCCESS) {
495                                 return ret;
496                         }
497                         if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
498                                 timestamp_sequence = MAX(timestamp_sequence, req->op.seq_num.seq_num);
499                         } else {
500                                 seq_number = seq_number + req->op.seq_num.seq_num;
501                         }
502                 }
503                 /* fall though */
504         case LDB_SEQ_HIGHEST_TIMESTAMP:
505         {
506                 struct ldb_request *date_req = talloc(req, struct ldb_request);
507                 if (!date_req) {
508                         return LDB_ERR_OPERATIONS_ERROR;
509                 }
510                 *date_req = *req;
511                 date_req->op.seq_num.flags = LDB_SEQ_HIGHEST_TIMESTAMP;
512
513                 ret = ldb_next_request(module, date_req);
514                 if (ret != LDB_SUCCESS) {
515                         return ret;
516                 }
517                 timestamp = date_req->op.seq_num.seq_num;
518                 
519                 /* Look at base DN */
520                 /* Figure out which partition it is under */
521                 /* Skip the lot if 'data' isn't here yet (initialistion) */
522                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
523                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
524                         
525                         ret = ldb_next_request(next, date_req);
526                         talloc_free(next);
527                         if (ret != LDB_SUCCESS) {
528                                 return ret;
529                         }
530                         timestamp = MAX(timestamp, date_req->op.seq_num.seq_num);
531                 }
532                 break;
533         }
534         }
535
536         switch (req->op.seq_num.flags) {
537         case LDB_SEQ_NEXT:
538         case LDB_SEQ_HIGHEST_SEQ:
539                 
540                 req->op.seq_num.flags = 0;
541
542                 /* Has someone above set a timebase sequence? */
543                 if (timestamp_sequence) {
544                         req->op.seq_num.seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
545                 } else {
546                         req->op.seq_num.seq_num = seq_number;
547                 }
548
549                 if (timestamp_sequence > req->op.seq_num.seq_num) {
550                         req->op.seq_num.seq_num = timestamp_sequence;
551                         req->op.seq_num.flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
552                 }
553
554                 req->op.seq_num.flags |= LDB_SEQ_GLOBAL_SEQUENCE;
555                 break;
556         case LDB_SEQ_HIGHEST_TIMESTAMP:
557                 req->op.seq_num.seq_num = timestamp;
558                 break;
559         }
560
561         switch (req->op.seq_num.flags) {
562         case LDB_SEQ_NEXT:
563                 req->op.seq_num.seq_num++;
564         }
565         return LDB_SUCCESS;
566 }
567
568 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
569 {
570         struct dsdb_extended_replicated_objects *ext;
571
572         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
573         if (!ext) {
574                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
575                 return LDB_ERR_PROTOCOL_ERROR;
576         }
577
578         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
579                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
580                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
581                 return LDB_ERR_PROTOCOL_ERROR;
582         }
583
584         return partition_replicate(module, req, ext->partition_dn);
585 }
586
587 /* extended */
588 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
589 {
590         struct partition_context *ac;
591
592         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
593                 return partition_extended_replicated_objects(module, req);
594         }
595
596         /* 
597          * as the extended operation has no dn
598          * we need to send it to all partitions
599          */
600
601         ac = partition_init_handle(req, module);
602         if (!ac) {
603                 return LDB_ERR_OPERATIONS_ERROR;
604         }
605                         
606         return partition_send_all(module, ac, req);
607 }
608
609 static int sort_compare(void *void1,
610                         void *void2, void *opaque)
611 {
612         struct dsdb_control_current_partition **pp1 = void1;
613         struct dsdb_control_current_partition **pp2 = void2;
614         struct dsdb_control_current_partition *partition1 = talloc_get_type(*pp1,
615                                                             struct dsdb_control_current_partition);
616         struct dsdb_control_current_partition *partition2 = talloc_get_type(*pp2,
617                                                             struct dsdb_control_current_partition);
618
619         return ldb_dn_compare(partition1->dn, partition2->dn);
620 }
621
622 static int partition_init(struct ldb_module *module)
623 {
624         int ret, i;
625         TALLOC_CTX *mem_ctx = talloc_new(module);
626         static const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
627         struct ldb_result *res;
628         struct ldb_message *msg;
629         struct ldb_message_element *partition_attributes;
630         struct ldb_message_element *replicate_attributes;
631         struct ldb_message_element *modules_attributes;
632
633         struct partition_private_data *data;
634
635         if (!mem_ctx) {
636                 return LDB_ERR_OPERATIONS_ERROR;
637         }
638
639         data = talloc(mem_ctx, struct partition_private_data);
640         if (data == NULL) {
641                 return LDB_ERR_OPERATIONS_ERROR;
642         }
643
644         ret = ldb_search(module->ldb, ldb_dn_new(mem_ctx, module->ldb, "@PARTITION"),
645                          LDB_SCOPE_BASE,
646                          NULL, attrs,
647                          &res);
648         if (ret != LDB_SUCCESS) {
649                 talloc_free(mem_ctx);
650                 return ret;
651         }
652         talloc_steal(mem_ctx, res);
653         if (res->count == 0) {
654                 talloc_free(mem_ctx);
655                 return ldb_next_init(module);
656         }
657
658         if (res->count > 1) {
659                 talloc_free(mem_ctx);
660                 return LDB_ERR_CONSTRAINT_VIOLATION;
661         }
662
663         msg = res->msgs[0];
664
665         partition_attributes = ldb_msg_find_element(msg, "partition");
666         if (!partition_attributes) {
667                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
668                 talloc_free(mem_ctx);
669                 return LDB_ERR_CONSTRAINT_VIOLATION;
670         }
671         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
672         if (!data->partitions) {
673                 talloc_free(mem_ctx);
674                 return LDB_ERR_OPERATIONS_ERROR;
675         }
676         for (i=0; i < partition_attributes->num_values; i++) {
677                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
678                 char *p = strchr(base, ':');
679                 if (!p) {
680                         ldb_asprintf_errstring(module->ldb, 
681                                                 "partition_init: "
682                                                 "invalid form for partition record (missing ':'): %s", base);
683                         talloc_free(mem_ctx);
684                         return LDB_ERR_CONSTRAINT_VIOLATION;
685                 }
686                 p[0] = '\0';
687                 p++;
688                 if (!p[0]) {
689                         ldb_asprintf_errstring(module->ldb, 
690                                                 "partition_init: "
691                                                 "invalid form for partition record (missing backend database): %s", base);
692                         talloc_free(mem_ctx);
693                         return LDB_ERR_CONSTRAINT_VIOLATION;
694                 }
695                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
696                 if (!data->partitions[i]) {
697                         talloc_free(mem_ctx);
698                         return LDB_ERR_OPERATIONS_ERROR;
699                 }
700
701                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], module->ldb, base);
702                 if (!data->partitions[i]->dn) {
703                         ldb_asprintf_errstring(module->ldb, 
704                                                 "partition_init: invalid DN in partition record: %s", base);
705                         talloc_free(mem_ctx);
706                         return LDB_ERR_CONSTRAINT_VIOLATION;
707                 }
708
709                 data->partitions[i]->backend = private_path(data->partitions[i], p);
710                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
711                 if (ret != LDB_SUCCESS) {
712                         talloc_free(mem_ctx);
713                         return ret;
714                 }
715         }
716         data->partitions[i] = NULL;
717
718         /* sort these into order, most to least specific */
719         ldb_qsort(data->partitions, partition_attributes->num_values, sizeof(*data->partitions), 
720                   module->ldb, sort_compare);
721
722         for (i=0; data->partitions[i]; i++) {
723                 struct ldb_request *req;
724                 req = talloc_zero(mem_ctx, struct ldb_request);
725                 if (req == NULL) {
726                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
727                         talloc_free(mem_ctx);
728                         return LDB_ERR_OPERATIONS_ERROR;
729                 }
730                 
731                 req->operation = LDB_REQ_REGISTER_PARTITION;
732                 req->op.reg_partition.dn = data->partitions[i]->dn;
733                 
734                 ret = ldb_request(module->ldb, req);
735                 if (ret != LDB_SUCCESS) {
736                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
737                         talloc_free(mem_ctx);
738                         return LDB_ERR_OTHER;
739                 }
740                 talloc_free(req);
741         }
742
743         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
744         if (!replicate_attributes) {
745                 data->replicate = NULL;
746         } else {
747                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
748                 if (!data->replicate) {
749                         talloc_free(mem_ctx);
750                         return LDB_ERR_OPERATIONS_ERROR;
751                 }
752                 
753                 for (i=0; i < replicate_attributes->num_values; i++) {
754                         data->replicate[i] = ldb_dn_new(data->replicate, module->ldb, (const char *)replicate_attributes->values[i].data);
755                         if (!ldb_dn_validate(data->replicate[i])) {
756                                 ldb_asprintf_errstring(module->ldb, 
757                                                         "partition_init: "
758                                                         "invalid DN in partition replicate record: %s", 
759                                                         replicate_attributes->values[i].data);
760                                 talloc_free(mem_ctx);
761                                 return LDB_ERR_CONSTRAINT_VIOLATION;
762                         }
763                 }
764                 data->replicate[i] = NULL;
765         }
766
767         /* Make the private data available to any searches the modules may trigger in initialisation */
768         module->private_data = data;
769         talloc_steal(module, data);
770         
771         modules_attributes = ldb_msg_find_element(msg, "modules");
772         if (modules_attributes) {
773                 for (i=0; i < modules_attributes->num_values; i++) {
774                         struct ldb_dn *base_dn;
775                         int partition_idx;
776                         struct dsdb_control_current_partition *partition = NULL;
777                         const char **modules = NULL;
778
779                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
780                         char *p = strchr(base, ':');
781                         if (!p) {
782                                 ldb_asprintf_errstring(module->ldb, 
783                                                         "partition_init: "
784                                                         "invalid form for partition module record (missing ':'): %s", base);
785                                 talloc_free(mem_ctx);
786                                 return LDB_ERR_CONSTRAINT_VIOLATION;
787                         }
788                         p[0] = '\0';
789                         p++;
790                         if (!p[0]) {
791                                 ldb_asprintf_errstring(module->ldb, 
792                                                         "partition_init: "
793                                                         "invalid form for partition module record (missing backend database): %s", base);
794                                 talloc_free(mem_ctx);
795                                 return LDB_ERR_CONSTRAINT_VIOLATION;
796                         }
797
798                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
799                                                                p);
800                         
801                         base_dn = ldb_dn_new(mem_ctx, module->ldb, base);
802                         if (!ldb_dn_validate(base_dn)) {
803                                 talloc_free(mem_ctx);
804                                 return LDB_ERR_OPERATIONS_ERROR;
805                         }
806                         
807                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
808                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
809                                         partition = data->partitions[partition_idx];
810                                         break;
811                                 }
812                         }
813                         
814                         if (!partition) {
815                                 ldb_asprintf_errstring(module->ldb, 
816                                                         "partition_init: "
817                                                         "invalid form for partition module record (no such partition): %s", base);
818                                 talloc_free(mem_ctx);
819                                 return LDB_ERR_CONSTRAINT_VIOLATION;
820                         }
821                         
822                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
823                         if (ret != LDB_SUCCESS) {
824                                 ldb_asprintf_errstring(module->ldb, 
825                                                        "partition_init: "
826                                                        "loading backend for %s failed: %s", 
827                                                        base, ldb_errstring(module->ldb));
828                                 talloc_free(mem_ctx);
829                                 return ret;
830                         }
831                         ret = ldb_init_module_chain(module->ldb, partition->module);
832                         if (ret != LDB_SUCCESS) {
833                                 ldb_asprintf_errstring(module->ldb, 
834                                                        "partition_init: "
835                                                        "initialising backend for %s failed: %s", 
836                                                        base, ldb_errstring(module->ldb));
837                                 talloc_free(mem_ctx);
838                                 return ret;
839                         }
840                 }
841         }
842
843         talloc_free(mem_ctx);
844         return ldb_next_init(module);
845 }
846
847 static int partition_wait_none(struct ldb_handle *handle) {
848         struct partition_context *ac;
849         int ret;
850         int i;
851     
852         if (!handle || !handle->private_data) {
853                 return LDB_ERR_OPERATIONS_ERROR;
854         }
855
856         if (handle->state == LDB_ASYNC_DONE) {
857                 return handle->status;
858         }
859
860         handle->state = LDB_ASYNC_PENDING;
861         handle->status = LDB_SUCCESS;
862
863         ac = talloc_get_type(handle->private_data, struct partition_context);
864
865         for (i=0; i < ac->num_requests; i++) {
866                 ret = ldb_wait(ac->down_req[i]->handle, LDB_WAIT_NONE);
867                 
868                 if (ret != LDB_SUCCESS) {
869                         handle->status = ret;
870                         goto done;
871                 }
872                 if (ac->down_req[i]->handle->status != LDB_SUCCESS) {
873                         handle->status = ac->down_req[i]->handle->status;
874                         goto done;
875                 }
876                 
877                 if (ac->down_req[i]->handle->state != LDB_ASYNC_DONE) {
878                         return LDB_SUCCESS;
879                 }
880         }
881
882         ret = LDB_SUCCESS;
883
884 done:
885         handle->state = LDB_ASYNC_DONE;
886         return ret;
887 }
888
889
890 static int partition_wait_all(struct ldb_handle *handle) {
891
892         int ret;
893
894         while (handle->state != LDB_ASYNC_DONE) {
895                 ret = partition_wait_none(handle);
896                 if (ret != LDB_SUCCESS) {
897                         return ret;
898                 }
899         }
900
901         return handle->status;
902 }
903
904 static int partition_wait(struct ldb_handle *handle, enum ldb_wait_type type)
905 {
906         if (type == LDB_WAIT_ALL) {
907                 return partition_wait_all(handle);
908         } else {
909                 return partition_wait_none(handle);
910         }
911 }
912
913 static const struct ldb_module_ops partition_ops = {
914         .name              = "partition",
915         .init_context      = partition_init,
916         .search            = partition_search,
917         .add               = partition_add,
918         .modify            = partition_modify,
919         .del               = partition_delete,
920         .rename            = partition_rename,
921         .extended          = partition_extended,
922         .sequence_number   = partition_sequence_number,
923         .start_transaction = partition_start_trans,
924         .end_transaction   = partition_end_trans,
925         .del_transaction   = partition_del_trans,
926         .wait              = partition_wait
927 };
928
929 int ldb_partition_init(void)
930 {
931         return ldb_register_module(&partition_ops);
932 }