r19531: Make struct ldb_dn opaque and local to ldb_dn.c
[kamenim/samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5
6    * NOTICE: this module is NOT released under the GNU LGPL license as
7    * other ldb code. This module is release under the GNU GPL v2 or
8    * later license.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 struct partition {
39         struct ldb_module *module;
40         const char *backend;
41         struct ldb_dn *dn;
42 };
43 struct partition_private_data {
44         struct partition **partitions;
45         struct ldb_dn **replicate;
46 };
47
48 struct partition_context {
49         struct ldb_module *module;
50         struct ldb_request *orig_req;
51
52         struct ldb_request **down_req;
53         int num_requests;
54         int finished_requests;
55 };
56
57 static struct ldb_handle *partition_init_handle(struct ldb_request *req, struct ldb_module *module)
58 {
59         struct partition_context *ac;
60         struct ldb_handle *h;
61
62         h = talloc_zero(req, struct ldb_handle);
63         if (h == NULL) {
64                 ldb_set_errstring(module->ldb, "Out of Memory");
65                 return NULL;
66         }
67
68         h->module = module;
69
70         ac = talloc_zero(h, struct partition_context);
71         if (ac == NULL) {
72                 ldb_set_errstring(module->ldb, "Out of Memory");
73                 talloc_free(h);
74                 return NULL;
75         }
76
77         h->private_data = (void *)ac;
78
79         ac->module = module;
80         ac->orig_req = req;
81
82         return h;
83 }
84
85 struct ldb_module *make_module_for_next_request(TALLOC_CTX *mem_ctx, 
86                                                 struct ldb_context *ldb,
87                                                 struct ldb_module *module) 
88 {
89         struct ldb_module *current;
90         static const struct ldb_module_ops ops; /* zero */
91         current = talloc_zero(mem_ctx, struct ldb_module);
92         if (current == NULL) {
93                 return module;
94         }
95         
96         current->ldb = ldb;
97         current->ops = &ops;
98         current->prev = NULL;
99         current->next = module;
100         return current;
101 }
102
103 struct ldb_module *find_backend(struct ldb_module *module, struct ldb_request *req, const struct ldb_dn *dn)
104 {
105         int i;
106         struct partition_private_data *data = talloc_get_type(module->private_data, 
107                                                               struct partition_private_data);
108         /* Look at base DN */
109         /* Figure out which partition it is under */
110         /* Skip the lot if 'data' isn't here yet (initialistion) */
111         for (i=0; data && data->partitions && data->partitions[i]; i++) {
112                 if (ldb_dn_compare_base(module->ldb, 
113                                         data->partitions[i]->dn, 
114                                         dn) == 0) {
115                         return make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
116                 }
117         }
118
119         return module;
120 };
121
122
123 /*
124   fire the caller's callback for every entry, but only send 'done' once.
125 */
126 static int partition_search_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
127 {
128         struct partition_context *ac;
129
130         if (!context || !ares) {
131                 ldb_set_errstring(ldb, "partition_search_callback: NULL Context or Result in 'search' callback");
132                 goto error;
133         }
134
135         ac = talloc_get_type(context, struct partition_context);
136
137         if (ares->type == LDB_REPLY_ENTRY) {
138                 return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
139         } else {
140                 ac->finished_requests++;
141                 if (ac->finished_requests == ac->num_requests) {
142                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
143                 } else {
144                         talloc_free(ares);
145                         return LDB_SUCCESS;
146                 }
147         }
148 error:
149         talloc_free(ares);
150         return LDB_ERR_OPERATIONS_ERROR;
151 }
152
153 /*
154   only fire the 'last' callback, and only for START-TLS for now 
155 */
156 static int partition_other_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
157 {
158         struct partition_context *ac;
159
160         if (!context) {
161                 ldb_set_errstring(ldb, "partition_other_callback: NULL Context in 'other' callback");
162                 goto error;
163         }
164
165         ac = talloc_get_type(context, struct partition_context);
166
167         if (!ac->orig_req->callback) {
168                 talloc_free(ares);
169                 return LDB_SUCCESS;
170         }
171
172         if (!ares 
173             || (ares->type == LDB_REPLY_EXTENDED 
174                 && strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID))) {
175                 ac->finished_requests++;
176                 if (ac->finished_requests == ac->num_requests) {
177                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
178                 }
179                 talloc_free(ares);
180                 return LDB_SUCCESS;
181         }
182         ldb_set_errstring(ldb, "partition_other_callback: Unknown reply type, only supports START_TLS");
183 error:
184         talloc_free(ares);
185         return LDB_ERR_OPERATIONS_ERROR;
186 }
187
188
189 static int partition_send_request(struct partition_context *ac, struct ldb_module *partition, 
190                                   struct ldb_dn *partition_base_dn)
191 {
192         int ret;
193         struct ldb_module *next = make_module_for_next_request(ac->module, ac->module->ldb, partition);
194         struct ldb_request *req;
195         ac->down_req = talloc_realloc(ac, ac->down_req, 
196                                         struct ldb_request *, ac->num_requests + 1);
197         if (!ac->down_req) {
198                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
199                 return LDB_ERR_OPERATIONS_ERROR;
200         }
201         req = ac->down_req[ac->num_requests] = talloc(ac, struct ldb_request);
202         if (req == NULL) {
203                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
204                 return LDB_ERR_OPERATIONS_ERROR;
205         }
206         
207         *ac->down_req[ac->num_requests] = *ac->orig_req; /* copy the request */
208
209         if (req->operation == LDB_SEARCH) {
210                 /* If the search is for 'more' than this partition,
211                  * then change the basedn, so a remote LDAP server
212                  * doesn't object */
213                 if (ldb_dn_compare_base(ac->module->ldb, 
214                                         partition_base_dn, req->op.search.base) != 0) {
215                         req->op.search.base = partition_base_dn;
216                 }
217                 req->callback = partition_search_callback;
218                 req->context = ac;
219         } else {
220                 req->callback = partition_other_callback;
221                 req->context = ac;
222         }
223
224         /* Spray off search requests to all backends */
225         ret = ldb_next_request(next, req); 
226         if (ret != LDB_SUCCESS) {
227                 return ret;
228         }
229         
230         ac->num_requests++;
231         return LDB_SUCCESS;
232 }
233
234 /* Send a request down to all the partitions */
235 static int partition_send_all(struct ldb_module *module, 
236                               struct partition_context *ac, struct ldb_request *req) 
237 {
238         int i;
239         struct partition_private_data *data = talloc_get_type(module->private_data, 
240                                                               struct partition_private_data);
241         int ret = partition_send_request(ac, module->next, NULL);
242         if (ret != LDB_SUCCESS) {
243                 return ret;
244         }
245         for (i=0; data && data->partitions && data->partitions[i]; i++) {
246                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
247                 if (ret != LDB_SUCCESS) {
248                         return ret;
249                 }
250         }
251         return LDB_SUCCESS;
252 }
253
254 /* Figure out which backend a request needs to be aimed at.  Some
255  * requests must be replicated to all backends */
256 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, const struct ldb_dn *dn) 
257 {
258         int i;
259         struct ldb_module *backend;
260         struct partition_private_data *data = talloc_get_type(module->private_data, 
261                                                               struct partition_private_data);
262         
263         /* Is this a special DN, we need to replicate to every backend? */
264         for (i=0; data->replicate && data->replicate[i]; i++) {
265                 if (ldb_dn_compare(module->ldb, 
266                                    data->replicate[i], 
267                                    dn) == 0) {
268                         struct ldb_handle *h;
269                         struct partition_context *ac;
270                         
271                         h = partition_init_handle(req, module);
272                         if (!h) {
273                                 return LDB_ERR_OPERATIONS_ERROR;
274                         }
275                         /* return our own handle to deal with this call */
276                         req->handle = h;
277                         
278                         ac = talloc_get_type(h->private_data, struct partition_context);
279                         
280                         return partition_send_all(module, ac, req);
281                 }
282         }
283
284         /* Otherwise, we need to find the backend to fire it to */
285
286         /* Find backend */
287         backend = find_backend(module, req, dn);
288         
289         /* issue request */
290         return ldb_next_request(backend, req);
291         
292 }
293
294 /* search */
295 static int partition_search(struct ldb_module *module, struct ldb_request *req)
296 {
297         /* Find backend */
298         struct partition_private_data *data = talloc_get_type(module->private_data, 
299                                                               struct partition_private_data);
300         /* issue request */
301
302         /* (later) consider if we should be searching multiple
303          * partitions (for 'invisible' partition behaviour */
304         if (ldb_get_opaque(module->ldb, "global_catalog")) {
305                 int ret, i;
306                 struct ldb_handle *h;
307                 struct partition_context *ac;
308                 
309                 h = partition_init_handle(req, module);
310                 if (!h) {
311                         return LDB_ERR_OPERATIONS_ERROR;
312                 }
313                 /* return our own handle to deal with this call */
314                 req->handle = h;
315                 
316                 ac = talloc_get_type(h->private_data, struct partition_context);
317                 
318                 /* Search from the base DN */
319                 if (!req->op.search.base || (ldb_dn_get_comp_num(req->op.search.base) == 0)) {
320                         return partition_send_all(module, ac, req);
321                 }
322                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
323                         /* Find all partitions under the search base */
324                         if (ldb_dn_compare_base(module->ldb, 
325                                                 req->op.search.base,
326                                                 data->partitions[i]->dn) == 0) {
327                                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
328                                 if (ret != LDB_SUCCESS) {
329                                         return ret;
330                                 }
331                         }
332                 }
333
334                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
335                 if (ac->num_requests == 0) {
336                         talloc_free(h);
337                         return ldb_next_request(module, req);
338                 }
339                 
340                 return LDB_SUCCESS;
341         } else {
342                 struct ldb_module *backend = find_backend(module, req, req->op.search.base);
343         
344                 return ldb_next_request(backend, req);
345         }
346 }
347
348 /* add */
349 static int partition_add(struct ldb_module *module, struct ldb_request *req)
350 {
351         return partition_replicate(module, req, req->op.add.message->dn);
352 }
353
354 /* modify */
355 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
356 {
357         return partition_replicate(module, req, req->op.mod.message->dn);
358 }
359
360 /* delete */
361 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
362 {
363         return partition_replicate(module, req, req->op.del.dn);
364 }
365
366 /* rename */
367 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
368 {
369         /* Find backend */
370         struct ldb_module *backend = find_backend(module, req, req->op.rename.olddn);
371         struct ldb_module *backend2 = find_backend(module, req, req->op.rename.newdn);
372
373         if (backend->next != backend2->next) {
374                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
375         }
376
377         return partition_replicate(module, req, req->op.rename.olddn);
378 }
379
380 /* start a transaction */
381 static int partition_start_trans(struct ldb_module *module)
382 {
383         int i, ret;
384         struct partition_private_data *data = talloc_get_type(module->private_data, 
385                                                               struct partition_private_data);
386         /* Look at base DN */
387         /* Figure out which partition it is under */
388         /* Skip the lot if 'data' isn't here yet (initialistion) */
389         ret = ldb_next_start_trans(module);
390         if (ret != LDB_SUCCESS) {
391                 return ret;
392         }
393
394         for (i=0; data && data->partitions && data->partitions[i]; i++) {
395                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
396
397                 ret = ldb_next_start_trans(next);
398                 talloc_free(next);
399                 if (ret != LDB_SUCCESS) {
400                         /* Back it out, if it fails on one */
401                         for (i--; i >= 0; i--) {
402                                 next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
403                                 ldb_next_del_trans(next);
404                                 talloc_free(next);
405                         }
406                         return ret;
407                 }
408         }
409         return LDB_SUCCESS;
410 }
411
412 /* end a transaction */
413 static int partition_end_trans(struct ldb_module *module)
414 {
415         int i, ret, ret2 = LDB_SUCCESS;
416         struct partition_private_data *data = talloc_get_type(module->private_data, 
417                                                               struct partition_private_data);
418         ret = ldb_next_end_trans(module);
419         if (ret != LDB_SUCCESS) {
420                 return ret;
421         }
422
423         /* Look at base DN */
424         /* Figure out which partition it is under */
425         /* Skip the lot if 'data' isn't here yet (initialistion) */
426         for (i=0; data && data->partitions && data->partitions[i]; i++) {
427                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
428                 
429                 ret = ldb_next_end_trans(next);
430                 talloc_free(next);
431                 if (ret != LDB_SUCCESS) {
432                         ret2 = ret;
433                 }
434         }
435
436         if (ret != LDB_SUCCESS) {
437                 /* Back it out, if it fails on one */
438                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
439                         struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
440                         ldb_next_del_trans(next);
441                         talloc_free(next);
442                 }
443         }
444         return ret;
445 }
446
447 /* delete a transaction */
448 static int partition_del_trans(struct ldb_module *module)
449 {
450         int i, ret, ret2 = LDB_SUCCESS;
451         struct partition_private_data *data = talloc_get_type(module->private_data, 
452                                                               struct partition_private_data);
453         ret = ldb_next_del_trans(module);
454         if (ret != LDB_SUCCESS) {
455                 ret2 = ret;
456         }
457
458         /* Look at base DN */
459         /* Figure out which partition it is under */
460         /* Skip the lot if 'data' isn't here yet (initialistion) */
461         for (i=0; data && data->partitions && data->partitions[i]; i++) {
462                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
463                 
464                 ret = ldb_next_del_trans(next);
465                 talloc_free(next);
466                 if (ret != LDB_SUCCESS) {
467                         ret2 = ret;
468                 }
469         }
470         return ret2;
471 }
472
473 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
474 {
475         int i, ret;
476         uint64_t seq_number = 0;
477         uint64_t timestamp_sequence = 0;
478         uint64_t timestamp = 0;
479         struct partition_private_data *data = talloc_get_type(module->private_data, 
480                                                               struct partition_private_data);
481
482         switch (req->op.seq_num.type) {
483         case LDB_SEQ_NEXT:
484         case LDB_SEQ_HIGHEST_SEQ:
485                 ret = ldb_next_request(module, req);
486                 if (ret != LDB_SUCCESS) {
487                         return ret;
488                 }
489                 if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
490                         timestamp_sequence = req->op.seq_num.seq_num;
491                 } else {
492                         seq_number = seq_number + req->op.seq_num.seq_num;
493                 }
494
495                 /* Look at base DN */
496                 /* Figure out which partition it is under */
497                 /* Skip the lot if 'data' isn't here yet (initialistion) */
498                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
499                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
500                         
501                         ret = ldb_next_request(next, req);
502                         talloc_free(next);
503                         if (ret != LDB_SUCCESS) {
504                                 return ret;
505                         }
506                         if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
507                                 timestamp_sequence = MAX(timestamp_sequence, req->op.seq_num.seq_num);
508                         } else {
509                                 seq_number = seq_number + req->op.seq_num.seq_num;
510                         }
511                 }
512                 /* fall though */
513         case LDB_SEQ_HIGHEST_TIMESTAMP:
514         {
515                 struct ldb_request *date_req = talloc(req, struct ldb_request);
516                 if (!date_req) {
517                         return LDB_ERR_OPERATIONS_ERROR;
518                 }
519                 *date_req = *req;
520                 date_req->op.seq_num.flags = LDB_SEQ_HIGHEST_TIMESTAMP;
521
522                 ret = ldb_next_request(module, date_req);
523                 if (ret != LDB_SUCCESS) {
524                         return ret;
525                 }
526                 timestamp = date_req->op.seq_num.seq_num;
527                 
528                 /* Look at base DN */
529                 /* Figure out which partition it is under */
530                 /* Skip the lot if 'data' isn't here yet (initialistion) */
531                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
532                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
533                         
534                         ret = ldb_next_request(next, date_req);
535                         talloc_free(next);
536                         if (ret != LDB_SUCCESS) {
537                                 return ret;
538                         }
539                         timestamp = MAX(timestamp, date_req->op.seq_num.seq_num);
540                 }
541                 break;
542         }
543         }
544
545         switch (req->op.seq_num.flags) {
546         case LDB_SEQ_NEXT:
547         case LDB_SEQ_HIGHEST_SEQ:
548                 
549                 req->op.seq_num.flags = 0;
550
551                 /* Has someone above set a timebase sequence? */
552                 if (timestamp_sequence) {
553                         req->op.seq_num.seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
554                 } else {
555                         req->op.seq_num.seq_num = seq_number;
556                 }
557
558                 if (timestamp_sequence > req->op.seq_num.seq_num) {
559                         req->op.seq_num.seq_num = timestamp_sequence;
560                         req->op.seq_num.flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
561                 }
562
563                 req->op.seq_num.flags |= LDB_SEQ_GLOBAL_SEQUENCE;
564                 break;
565         case LDB_SEQ_HIGHEST_TIMESTAMP:
566                 req->op.seq_num.seq_num = timestamp;
567                 break;
568         }
569
570         switch (req->op.seq_num.flags) {
571         case LDB_SEQ_NEXT:
572                 req->op.seq_num.seq_num++;
573         }
574         return LDB_SUCCESS;
575 }
576
577 static int sort_compare(void *void1,
578                         void *void2, void *opaque)
579 {
580         struct ldb_context *ldb = talloc_get_type(opaque, struct ldb_context);
581         struct partition **pp1 = void1;
582         struct partition **pp2 = void2;
583         struct partition *partition1 = talloc_get_type(*pp1, struct partition);
584         struct partition *partition2 = talloc_get_type(*pp2, struct partition);
585
586         return ldb_dn_compare(ldb, partition1->dn, partition2->dn);
587 }
588
589 static int partition_init(struct ldb_module *module)
590 {
591         int ret, i;
592         TALLOC_CTX *mem_ctx = talloc_new(module);
593         static const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
594         struct ldb_result *res;
595         struct ldb_message *msg;
596         struct ldb_message_element *partition_attributes;
597         struct ldb_message_element *replicate_attributes;
598         struct ldb_message_element *modules_attributes;
599
600         struct partition_private_data *data;
601
602         if (!mem_ctx) {
603                 return LDB_ERR_OPERATIONS_ERROR;
604         }
605
606         data = talloc(mem_ctx, struct partition_private_data);
607         if (data == NULL) {
608                 return LDB_ERR_OPERATIONS_ERROR;
609         }
610
611         ret = ldb_search(module->ldb, ldb_dn_explode(mem_ctx, "@PARTITION"),
612                          LDB_SCOPE_BASE,
613                          NULL, attrs,
614                          &res);
615         if (ret != LDB_SUCCESS) {
616                 talloc_free(mem_ctx);
617                 return ret;
618         }
619         talloc_steal(mem_ctx, res);
620         if (res->count == 0) {
621                 talloc_free(mem_ctx);
622                 return ldb_next_init(module);
623         }
624
625         if (res->count > 1) {
626                 talloc_free(mem_ctx);
627                 return LDB_ERR_CONSTRAINT_VIOLATION;
628         }
629
630         msg = res->msgs[0];
631
632         partition_attributes = ldb_msg_find_element(msg, "partition");
633         if (!partition_attributes) {
634                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
635                 talloc_free(mem_ctx);
636                 return LDB_ERR_CONSTRAINT_VIOLATION;
637         }
638         data->partitions = talloc_array(data, struct partition *, partition_attributes->num_values + 1);
639         if (!data->partitions) {
640                 talloc_free(mem_ctx);
641                 return LDB_ERR_OPERATIONS_ERROR;
642         }
643         for (i=0; i < partition_attributes->num_values; i++) {
644                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
645                 char *p = strchr(base, ':');
646                 if (!p) {
647                         ldb_asprintf_errstring(module->ldb, 
648                                                 "partition_init: "
649                                                 "invalid form for partition record (missing ':'): %s", base);
650                         talloc_free(mem_ctx);
651                         return LDB_ERR_CONSTRAINT_VIOLATION;
652                 }
653                 p[0] = '\0';
654                 p++;
655                 if (!p[0]) {
656                         ldb_asprintf_errstring(module->ldb, 
657                                                 "partition_init: "
658                                                 "invalid form for partition record (missing backend database): %s", base);
659                         talloc_free(mem_ctx);
660                         return LDB_ERR_CONSTRAINT_VIOLATION;
661                 }
662                 data->partitions[i] = talloc(data->partitions, struct partition);
663                 if (!data->partitions[i]) {
664                         talloc_free(mem_ctx);
665                         return LDB_ERR_OPERATIONS_ERROR;
666                 }
667
668                 data->partitions[i]->dn = ldb_dn_explode(data->partitions[i], base);
669                 if (!data->partitions[i]->dn) {
670                         ldb_asprintf_errstring(module->ldb, 
671                                                 "partition_init: invalid DN in partition record: %s", base);
672                         talloc_free(mem_ctx);
673                         return LDB_ERR_CONSTRAINT_VIOLATION;
674                 }
675
676                 data->partitions[i]->backend = private_path(data->partitions[i], p);
677                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
678                 if (ret != LDB_SUCCESS) {
679                         talloc_free(mem_ctx);
680                         return ret;
681                 }
682         }
683         data->partitions[i] = NULL;
684
685         /* sort these into order, most to least specific */
686         ldb_qsort(data->partitions, partition_attributes->num_values, sizeof(*data->partitions), 
687                   module->ldb, sort_compare);
688
689         for (i=0; data->partitions[i]; i++) {
690                 struct ldb_request *req;
691                 req = talloc_zero(mem_ctx, struct ldb_request);
692                 if (req == NULL) {
693                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
694                         talloc_free(mem_ctx);
695                         return LDB_ERR_OPERATIONS_ERROR;
696                 }
697                 
698                 req->operation = LDB_REQ_REGISTER_PARTITION;
699                 req->op.reg_partition.dn = data->partitions[i]->dn;
700                 
701                 ret = ldb_request(module->ldb, req);
702                 if (ret != LDB_SUCCESS) {
703                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
704                         talloc_free(mem_ctx);
705                         return LDB_ERR_OTHER;
706                 }
707                 talloc_free(req);
708         }
709
710         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
711         if (!replicate_attributes) {
712                 data->replicate = NULL;
713         } else {
714                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
715                 if (!data->replicate) {
716                         talloc_free(mem_ctx);
717                         return LDB_ERR_OPERATIONS_ERROR;
718                 }
719                 
720                 for (i=0; i < replicate_attributes->num_values; i++) {
721                         data->replicate[i] = ldb_dn_explode(data->replicate, (const char *)replicate_attributes->values[i].data);
722                         if (!data->replicate[i]) {
723                                 ldb_asprintf_errstring(module->ldb, 
724                                                         "partition_init: "
725                                                         "invalid DN in partition replicate record: %s", 
726                                                         replicate_attributes->values[i].data);
727                                 talloc_free(mem_ctx);
728                                 return LDB_ERR_CONSTRAINT_VIOLATION;
729                         }
730                 }
731                 data->replicate[i] = NULL;
732         }
733
734         /* Make the private data available to any searches the modules may trigger in initialisation */
735         module->private_data = data;
736         talloc_steal(module, data);
737         
738         modules_attributes = ldb_msg_find_element(msg, "modules");
739         if (modules_attributes) {
740                 for (i=0; i < modules_attributes->num_values; i++) {
741                         struct ldb_dn *base_dn;
742                         int partition_idx;
743                         struct partition *partition = NULL;
744                         const char **modules = NULL;
745
746                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
747                         char *p = strchr(base, ':');
748                         if (!p) {
749                                 ldb_asprintf_errstring(module->ldb, 
750                                                         "partition_init: "
751                                                         "invalid form for partition module record (missing ':'): %s", base);
752                                 talloc_free(mem_ctx);
753                                 return LDB_ERR_CONSTRAINT_VIOLATION;
754                         }
755                         p[0] = '\0';
756                         p++;
757                         if (!p[0]) {
758                                 ldb_asprintf_errstring(module->ldb, 
759                                                         "partition_init: "
760                                                         "invalid form for partition module record (missing backend database): %s", base);
761                                 talloc_free(mem_ctx);
762                                 return LDB_ERR_CONSTRAINT_VIOLATION;
763                         }
764
765                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
766                                                                p);
767                         
768                         base_dn = ldb_dn_explode(mem_ctx, base);
769                         if (!base_dn) {
770                                 talloc_free(mem_ctx);
771                                 return LDB_ERR_OPERATIONS_ERROR;
772                         }
773                         
774                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
775                                 if (ldb_dn_compare(module->ldb, data->partitions[partition_idx]->dn, 
776                                                    base_dn) == 0) {
777                                         partition = data->partitions[partition_idx];
778                                         break;
779                                 }
780                         }
781                         
782                         if (!partition) {
783                                 ldb_asprintf_errstring(module->ldb, 
784                                                         "partition_init: "
785                                                         "invalid form for partition module record (no such partition): %s", base);
786                                 talloc_free(mem_ctx);
787                                 return LDB_ERR_CONSTRAINT_VIOLATION;
788                         }
789                         
790                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
791                         if (ret != LDB_SUCCESS) {
792                                 ldb_asprintf_errstring(module->ldb, 
793                                                        "partition_init: "
794                                                        "loading backend for %s failed: %s", 
795                                                        base, ldb_errstring(module->ldb));
796                                 talloc_free(mem_ctx);
797                                 return ret;
798                         }
799                         ret = ldb_init_module_chain(module->ldb, partition->module);
800                         if (ret != LDB_SUCCESS) {
801                                 ldb_asprintf_errstring(module->ldb, 
802                                                        "partition_init: "
803                                                        "initialising backend for %s failed: %s", 
804                                                        base, ldb_errstring(module->ldb));
805                                 talloc_free(mem_ctx);
806                                 return ret;
807                         }
808                 }
809         }
810
811         talloc_free(mem_ctx);
812         return ldb_next_init(module);
813 }
814
815 static int partition_wait_none(struct ldb_handle *handle) {
816         struct partition_context *ac;
817         int ret;
818         int i;
819     
820         if (!handle || !handle->private_data) {
821                 return LDB_ERR_OPERATIONS_ERROR;
822         }
823
824         if (handle->state == LDB_ASYNC_DONE) {
825                 return handle->status;
826         }
827
828         handle->state = LDB_ASYNC_PENDING;
829         handle->status = LDB_SUCCESS;
830
831         ac = talloc_get_type(handle->private_data, struct partition_context);
832
833         for (i=0; i < ac->num_requests; i++) {
834                 ret = ldb_wait(ac->down_req[i]->handle, LDB_WAIT_NONE);
835                 
836                 if (ret != LDB_SUCCESS) {
837                         handle->status = ret;
838                         goto done;
839                 }
840                 if (ac->down_req[i]->handle->status != LDB_SUCCESS) {
841                         handle->status = ac->down_req[i]->handle->status;
842                         goto done;
843                 }
844                 
845                 if (ac->down_req[i]->handle->state != LDB_ASYNC_DONE) {
846                         return LDB_SUCCESS;
847                 }
848         }
849
850         ret = LDB_SUCCESS;
851
852 done:
853         handle->state = LDB_ASYNC_DONE;
854         return ret;
855 }
856
857
858 static int partition_wait_all(struct ldb_handle *handle) {
859
860         int ret;
861
862         while (handle->state != LDB_ASYNC_DONE) {
863                 ret = partition_wait_none(handle);
864                 if (ret != LDB_SUCCESS) {
865                         return ret;
866                 }
867         }
868
869         return handle->status;
870 }
871
872 static int partition_wait(struct ldb_handle *handle, enum ldb_wait_type type)
873 {
874         if (type == LDB_WAIT_ALL) {
875                 return partition_wait_all(handle);
876         } else {
877                 return partition_wait_none(handle);
878         }
879 }
880
881 static const struct ldb_module_ops partition_ops = {
882         .name              = "partition",
883         .init_context      = partition_init,
884         .search            = partition_search,
885         .add               = partition_add,
886         .modify            = partition_modify,
887         .del               = partition_delete,
888         .rename            = partition_rename,
889         .start_transaction = partition_start_trans,
890         .end_transaction   = partition_end_trans,
891         .del_transaction   = partition_del_trans,
892         .sequence_number   = partition_sequence_number,
893         .wait              = partition_wait
894 };
895
896 int ldb_partition_init(void)
897 {
898         return ldb_register_module(&partition_ops);
899 }