dsdb: ensure we take out a read lock during the dsdb_init
[garming/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140
141         if (!ares) {
142                 return ldb_module_done(ac->req, NULL, NULL,
143                                         LDB_ERR_OPERATIONS_ERROR);
144         }
145
146         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148                 /* If we didn't fan this request out to mulitple partitions,
149                  * or this is an individual search result, we can
150                  * deterministically tell the caller what partition this was
151                  * written to (repl_meta_data likes to know) */
152                 ret = ldb_reply_add_control(ares,
153                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
154                                             false, partition_ctrl->data);
155                 if (ret != LDB_SUCCESS) {
156                         return ldb_module_done(ac->req, NULL, NULL,
157                                                ret);
158                 }
159         }
160
161         if (ares->error != LDB_SUCCESS) {
162                 return ldb_module_done(ac->req, ares->controls,
163                                         ares->response, ares->error);
164         }
165
166         switch (ares->type) {
167         case LDB_REPLY_REFERRAL:
168                 return ldb_module_send_referral(ac->req, ares->referral);
169
170         case LDB_REPLY_ENTRY:
171                 if (ac->req->operation != LDB_SEARCH) {
172                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
173                                 "partition_req_callback:"
174                                 " Unsupported reply type for this request");
175                         return ldb_module_done(ac->req, NULL, NULL,
176                                                 LDB_ERR_OPERATIONS_ERROR);
177                 }
178                 
179                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
180
181         case LDB_REPLY_DONE:
182                 if (ac->req->operation == LDB_EXTENDED) {
183                         /* FIXME: check for ares->response, replmd does not fill it ! */
184                         if (ares->response) {
185                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
187                                                           "partition_req_callback:"
188                                                           " Unknown extended reply, "
189                                                           "only supports START_TLS");
190                                         talloc_free(ares);
191                                         return ldb_module_done(ac->req, NULL, NULL,
192                                                                 LDB_ERR_OPERATIONS_ERROR);
193                                 }
194                         }
195                 }
196
197                 ac->finished_requests++;
198                 if (ac->finished_requests == ac->num_requests) {
199                         /* Send back referrals if they do exist (search ops) */
200                         if (ac->referrals != NULL) {
201                                 const char **ref;
202                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
203                                         ret = ldb_module_send_referral(ac->req,
204                                                                        talloc_strdup(ac->req, *ref));
205                                         if (ret != LDB_SUCCESS) {
206                                                 return ldb_module_done(ac->req, NULL, NULL,
207                                                                        ret);
208                                         }
209                                 }
210                         }
211
212                         /* this was the last one, call callback */
213                         return ldb_module_done(ac->req, ares->controls,
214                                                ares->response, 
215                                                ares->error);
216                 }
217
218                 /* not the last, now call the next one */
219                 module = ac->part_req[ac->finished_requests].module;
220                 nreq = ac->part_req[ac->finished_requests].req;
221
222                 ret = partition_request(module, nreq);
223                 if (ret != LDB_SUCCESS) {
224                         talloc_free(ares);
225                         return ldb_module_done(ac->req, NULL, NULL, ret);
226                 }
227
228                 break;
229         }
230
231         talloc_free(ares);
232         return LDB_SUCCESS;
233 }
234
235 static int partition_prep_request(struct partition_context *ac,
236                                   struct dsdb_partition *partition)
237 {
238         int ret;
239         struct ldb_request *req;
240         struct ldb_control *partition_ctrl = NULL;
241
242         ac->part_req = talloc_realloc(ac, ac->part_req,
243                                         struct part_request,
244                                         ac->num_requests + 1);
245         if (ac->part_req == NULL) {
246                 return ldb_oom(ldb_module_get_ctx(ac->module));
247         }
248
249         switch (ac->req->operation) {
250         case LDB_SEARCH:
251                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
252                                         ac->part_req,
253                                         ac->req->op.search.base,
254                                         ac->req->op.search.scope,
255                                         ac->req->op.search.tree,
256                                         ac->req->op.search.attrs,
257                                         ac->req->controls,
258                                         ac, partition_req_callback,
259                                         ac->req);
260                 LDB_REQ_SET_LOCATION(req);
261                 break;
262         case LDB_ADD:
263                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264                                         ac->req->op.add.message,
265                                         ac->req->controls,
266                                         ac, partition_req_callback,
267                                         ac->req);
268                 LDB_REQ_SET_LOCATION(req);
269                 break;
270         case LDB_MODIFY:
271                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272                                         ac->req->op.mod.message,
273                                         ac->req->controls,
274                                         ac, partition_req_callback,
275                                         ac->req);
276                 LDB_REQ_SET_LOCATION(req);
277                 break;
278         case LDB_DELETE:
279                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280                                         ac->req->op.del.dn,
281                                         ac->req->controls,
282                                         ac, partition_req_callback,
283                                         ac->req);
284                 LDB_REQ_SET_LOCATION(req);
285                 break;
286         case LDB_RENAME:
287                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288                                         ac->req->op.rename.olddn,
289                                         ac->req->op.rename.newdn,
290                                         ac->req->controls,
291                                         ac, partition_req_callback,
292                                         ac->req);
293                 LDB_REQ_SET_LOCATION(req);
294                 break;
295         case LDB_EXTENDED:
296                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
297                                         ac->part_req,
298                                         ac->req->op.extended.oid,
299                                         ac->req->op.extended.data,
300                                         ac->req->controls,
301                                         ac, partition_req_callback,
302                                         ac->req);
303                 LDB_REQ_SET_LOCATION(req);
304                 break;
305         default:
306                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307                                   "Unsupported request type!");
308                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
309         }
310
311         if (ret != LDB_SUCCESS) {
312                 return ret;
313         }
314
315         ac->part_req[ac->num_requests].req = req;
316
317         if (ac->req->controls) {
318                 /* Duplicate everything beside the current partition control */
319                 partition_ctrl = ldb_request_get_control(ac->req,
320                                                          DSDB_CONTROL_CURRENT_PARTITION_OID);
321                 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322                         return ldb_module_oom(ac->module);
323                 }
324         }
325
326         if (partition) {
327                 void *part_data = partition->ctrl;
328
329                 ac->part_req[ac->num_requests].module = partition->module;
330
331                 if (partition_ctrl != NULL) {
332                         if (partition_ctrl->data != NULL) {
333                                 part_data = partition_ctrl->data;
334                         }
335
336                         /*
337                          * If the provided current partition control is without
338                          * data then use the calculated one.
339                          */
340                         ret = ldb_request_add_control(req,
341                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
342                                                       false, part_data);
343                         if (ret != LDB_SUCCESS) {
344                                 return ret;
345                         }
346                 }
347
348                 if (req->operation == LDB_SEARCH) {
349                         /* If the search is for 'more' than this partition,
350                          * then change the basedn, so a remote LDAP server
351                          * doesn't object */
352                         if (ldb_dn_compare_base(partition->ctrl->dn,
353                                                 req->op.search.base) != 0) {
354                                 req->op.search.base = partition->ctrl->dn;
355                         }
356                 }
357
358         } else {
359                 /* make sure you put the module here, or
360                  * or ldb_next_request() will skip a module */
361                 ac->part_req[ac->num_requests].module = ac->module;
362         }
363
364         ac->num_requests++;
365
366         return LDB_SUCCESS;
367 }
368
369 static int partition_call_first(struct partition_context *ac)
370 {
371         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
372 }
373
374 /**
375  * Send a request down to all the partitions (but not the sam.ldb file)
376  */
377 static int partition_send_all(struct ldb_module *module, 
378                               struct partition_context *ac, 
379                               struct ldb_request *req) 
380 {
381         unsigned int i;
382         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383                                                               struct partition_private_data);
384         int ret;
385
386         for (i=0; data && data->partitions && data->partitions[i]; i++) {
387                 ret = partition_prep_request(ac, data->partitions[i]);
388                 if (ret != LDB_SUCCESS) {
389                         return ret;
390                 }
391         }
392
393         /* fire the first one */
394         return partition_call_first(ac);
395 }
396
397
398 /**
399  * send an operation to the top partition, then copy the resulting
400  * object to all other partitions
401  */
402 static int partition_copy_all(struct ldb_module *module,
403                               struct partition_context *ac,
404                               struct ldb_request *req,
405                               struct ldb_dn *dn)
406 {
407         unsigned int i;
408         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
409                                                               struct partition_private_data);
410         int ret, search_ret;
411         struct ldb_result *res;
412
413         /* do the request on the top level sam.ldb synchronously */
414         ret = ldb_next_request(module, req);
415         if (ret != LDB_SUCCESS) {
416                 return ret;
417         }
418         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
419         if (ret != LDB_SUCCESS) {
420                 return ret;
421         }
422
423         /* now fetch the resulting object, and then copy it to all the
424          * other partitions. We need this approach to cope with the
425          * partitions getting out of sync. If for example the
426          * @ATTRIBUTES object exists on one partition but not the
427          * others then just doing each of the partitions in turn will
428          * lead to an error
429          */
430         search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
431         if (search_ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
432                 return search_ret;
433         }
434
435         /* now delete the object in the other partitions, if requried
436         */
437         if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
438                 for (i=0; data->partitions && data->partitions[i]; i++) {
439                         int pret;
440                         pret = dsdb_module_del(data->partitions[i]->module,
441                                                dn,
442                                                DSDB_FLAG_NEXT_MODULE,
443                                                req);
444                         if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
445                                 /* we should only get success or no
446                                    such object from the other partitions */
447                                 return pret;
448                         }
449                 }
450
451                 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
452         }
453
454         /* now add/modify in the other partitions */
455         for (i=0; data->partitions && data->partitions[i]; i++) {
456                 struct ldb_message *modify_msg = NULL;
457                 int pret;
458                 unsigned int el_idx;
459
460                 pret = dsdb_module_add(data->partitions[i]->module,
461                                        res->msgs[0],
462                                        DSDB_FLAG_NEXT_MODULE,
463                                        req);
464                 if (pret == LDB_SUCCESS) {
465                         continue;
466                 }
467
468                 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
469                         return pret;
470                 }
471
472                 modify_msg = ldb_msg_copy(req, res->msgs[0]);
473                 if (modify_msg == NULL) {
474                         return ldb_module_oom(module);
475                 }
476
477                 /*
478                  * mark all the message elements as
479                  * LDB_FLAG_MOD_REPLACE
480                  */
481                 for (el_idx=0;
482                      el_idx < modify_msg->num_elements;
483                      el_idx++) {
484                         modify_msg->elements[el_idx].flags
485                                 = LDB_FLAG_MOD_REPLACE;
486                 }
487
488                 if (req->operation == LDB_MODIFY) {
489                         const struct ldb_message *req_msg = req->op.mod.message;
490                         /*
491                          * mark elements to be removed, if there were
492                          * deleted entirely above we need to delete
493                          * them here too
494                          */
495                         for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
496                                 if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
497                                     || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
498                                         req_msg->elements[el_idx].num_values == 0)) {
499                                         if (ldb_msg_find_element(modify_msg,
500                                                                  req_msg->elements[el_idx].name) != NULL) {
501                                                 continue;
502                                         }
503                                         ldb_msg_add_empty(modify_msg,
504                                                           req_msg->elements[el_idx].name,
505                                                           LDB_FLAG_MOD_REPLACE,
506                                                           NULL);
507                                 }
508                         }
509                 }
510
511                 pret = dsdb_module_modify(data->partitions[i]->module,
512                                           modify_msg,
513                                           DSDB_FLAG_NEXT_MODULE,
514                                           req);
515
516                 if (pret != LDB_SUCCESS) {
517                         return pret;
518                 }
519         }
520
521         return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
522 }
523
524 /**
525  * Figure out which backend a request needs to be aimed at.  Some
526  * requests must be replicated to all backends
527  */
528 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
529 {
530         struct partition_context *ac;
531         unsigned int i;
532         int ret;
533         struct dsdb_partition *partition;
534         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
535                                                               struct partition_private_data);
536
537         /* if we aren't initialised yet go further */
538         if (!data || !data->partitions) {
539                 return ldb_next_request(module, req);
540         }
541
542         if (ldb_dn_is_special(dn)) {
543                 /* Is this a special DN, we need to replicate to every backend? */
544                 for (i=0; data->replicate && data->replicate[i]; i++) {
545                         if (ldb_dn_compare(data->replicate[i], 
546                                            dn) == 0) {
547                                 
548                                 ac = partition_init_ctx(module, req);
549                                 if (!ac) {
550                                         return ldb_operr(ldb_module_get_ctx(module));
551                                 }
552                                 
553                                 return partition_copy_all(module, ac, req, dn);
554                         }
555                 }
556         }
557
558         /* Otherwise, we need to find the partition to fire it to */
559
560         /* Find partition */
561         partition = find_partition(data, dn, req);
562         if (!partition) {
563                 /*
564                  * if we haven't found a matching partition
565                  * pass the request to the main ldb
566                  *
567                  * TODO: we should maybe return an error here
568                  *       if it's not a special dn
569                  */
570
571                 return ldb_next_request(module, req);
572         }
573
574         ac = partition_init_ctx(module, req);
575         if (!ac) {
576                 return ldb_operr(ldb_module_get_ctx(module));
577         }
578
579         /* we need to add a control but we never touch the original request */
580         ret = partition_prep_request(ac, partition);
581         if (ret != LDB_SUCCESS) {
582                 return ret;
583         }
584
585         /* fire the first one */
586         return partition_call_first(ac);
587 }
588
589 /* search */
590 static int partition_search(struct ldb_module *module, struct ldb_request *req)
591 {
592         struct ldb_control **saved_controls;
593         /* Find backend */
594         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
595                                                               struct partition_private_data);
596         struct partition_context *ac;
597         struct ldb_context *ldb;
598         struct loadparm_context *lp_ctx;
599
600         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
601         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
602         struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
603         
604         struct ldb_search_options_control *search_options = NULL;
605         struct dsdb_partition *p;
606         unsigned int i, j;
607         int ret;
608         bool domain_scope = false, phantom_root = false;
609
610         p = find_partition(data, NULL, req);
611         if (p != NULL) {
612                 /* the caller specified what partition they want the
613                  * search - just pass it on
614                  */
615                 return ldb_next_request(p->module, req);
616         }
617
618         /* Get back the search options from the search control, and mark it as
619          * non-critical (to make backends and also dcpromo happy).
620          */
621         if (search_control) {
622                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
623                 search_control->critical = 0;
624
625         }
626
627         /* Remove the "domain_scope" control, so we don't confuse a backend
628          * server */
629         if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
630                 return ldb_oom(ldb_module_get_ctx(module));
631         }
632
633         /* if we aren't initialised yet go further */
634         if (!data || !data->partitions) {
635                 return ldb_next_request(module, req);
636         }
637
638         /* Special DNs without specified partition should go further */
639         if (ldb_dn_is_special(req->op.search.base)) {
640                 return ldb_next_request(module, req);
641         }
642
643         /* Locate the options */
644         domain_scope = (search_options
645                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
646                 || domain_scope_control;
647         phantom_root = search_options
648                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
649
650         /* Remove handled options from the search control flag */
651         if (search_options) {
652                 search_options->search_options = search_options->search_options
653                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
654                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
655         }
656
657         ac = partition_init_ctx(module, req);
658         if (!ac) {
659                 return ldb_operr(ldb_module_get_ctx(module));
660         }
661
662         ldb = ldb_module_get_ctx(ac->module);
663         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
664                                                 struct loadparm_context);
665
666         /* Search from the base DN */
667         if (ldb_dn_is_null(req->op.search.base)) {
668                 if (!phantom_root) {
669                         return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
670                 }
671                 return partition_send_all(module, ac, req);
672         }
673
674         for (i=0; data->partitions[i]; i++) {
675                 bool match = false, stop = false;
676
677                 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
678                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
679                                                 req->op.search.base) == 0) {
680                                 /* base DN is in a partial replica
681                                    with the NO_GLOBAL_CATALOG
682                                    control. This partition is invisible */
683                                 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
684                                 continue;
685                         }
686                 }
687
688                 if (phantom_root) {
689                         /* Phantom root: Find all partitions under the
690                          * search base. We match if:
691                          *
692                          * 1) the DN we are looking for exactly matches a
693                          *    certain partition and always stop
694                          * 2) the DN we are looking for is a parent of certain
695                          *    partitions and it isn't a scope base search
696                          * 3) the DN we are looking for is a child of a certain
697                          *    partition and always stop
698                          *    - we don't need to go any further up in the
699                          *    hierarchy!
700                          */
701                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
702                                            req->op.search.base) == 0) {
703                                 match = true;
704                                 stop = true;
705                         }
706                         if (!match &&
707                             (ldb_dn_compare_base(req->op.search.base,
708                                                  data->partitions[i]->ctrl->dn) == 0 &&
709                              req->op.search.scope != LDB_SCOPE_BASE)) {
710                                 match = true;
711                         }
712                         if (!match &&
713                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
714                                                 req->op.search.base) == 0) {
715                                 match = true;
716                                 stop = true; /* note that this relies on partition ordering */
717                         }
718                 } else {
719                         /* Domain scope: Find all partitions under the search
720                          * base.
721                          *
722                          * We generate referral candidates if we haven't
723                          * specified the domain scope control, haven't a base
724                          * search* scope and the DN we are looking for is a real
725                          * predecessor of certain partitions. When a new
726                          * referral candidate is nearer to the DN than an
727                          * existing one delete the latter (we want to have only
728                          * the closest ones). When we checked this for all
729                          * candidates we have the final referrals.
730                          *
731                          * We match if the DN we are looking for is a child of
732                          * a certain partition or the partition
733                          * DN itself - we don't need to go any further
734                          * up in the hierarchy!
735                          */
736                         if ((!domain_scope) &&
737                             (req->op.search.scope != LDB_SCOPE_BASE) &&
738                             (ldb_dn_compare_base(req->op.search.base,
739                                                  data->partitions[i]->ctrl->dn) == 0) &&
740                             (ldb_dn_compare(req->op.search.base,
741                                             data->partitions[i]->ctrl->dn) != 0)) {
742                                 char *ref = talloc_asprintf(ac,
743                                                             "ldap://%s/%s%s",
744                                                             lpcfg_dnsdomain(lp_ctx),
745                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
746                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
747
748                                 if (ref == NULL) {
749                                         return ldb_oom(ldb);
750                                 }
751
752                                 /* Initialise the referrals list */
753                                 if (ac->referrals == NULL) {
754                                         char **l = str_list_make_empty(ac);
755                                         ac->referrals = discard_const_p(const char *, l);
756                                         if (ac->referrals == NULL) {
757                                                 return ldb_oom(ldb);
758                                         }
759                                 }
760
761                                 /* Check if the new referral candidate is
762                                  * closer to the base DN than already
763                                  * saved ones and delete the latters */
764                                 j = 0;
765                                 while (ac->referrals[j] != NULL) {
766                                         if (strstr(ac->referrals[j],
767                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
768                                                 str_list_remove(ac->referrals,
769                                                                 ac->referrals[j]);
770                                         } else {
771                                                 ++j;
772                                         }
773                                 }
774
775                                 /* Add our new candidate */
776                                 ac->referrals = str_list_add(ac->referrals, ref);
777
778                                 talloc_free(ref);
779
780                                 if (ac->referrals == NULL) {
781                                         return ldb_oom(ldb);
782                                 }
783                         }
784                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
785                                 match = true;
786                                 stop = true; /* note that this relies on partition ordering */
787                         }
788                 }
789
790                 if (match) {
791                         ret = partition_prep_request(ac, data->partitions[i]);
792                         if (ret != LDB_SUCCESS) {
793                                 return ret;
794                         }
795                 }
796
797                 if (stop) break;
798         }
799
800         /* Perhaps we didn't match any partitions. Try the main partition */
801         if (ac->num_requests == 0) {
802                 talloc_free(ac);
803                 return ldb_next_request(module, req);
804         }
805
806         /* fire the first one */
807         return partition_call_first(ac);
808 }
809
810 /* add */
811 static int partition_add(struct ldb_module *module, struct ldb_request *req)
812 {
813         return partition_replicate(module, req, req->op.add.message->dn);
814 }
815
816 /* modify */
817 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
818 {
819         return partition_replicate(module, req, req->op.mod.message->dn);
820 }
821
822 /* delete */
823 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
824 {
825         return partition_replicate(module, req, req->op.del.dn);
826 }
827
828 /* rename */
829 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
830 {
831         /* Find backend */
832         struct dsdb_partition *backend, *backend2;
833         
834         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
835                                                               struct partition_private_data);
836
837         /* Skip the lot if 'data' isn't here yet (initialisation) */
838         if (!data) {
839                 return ldb_operr(ldb_module_get_ctx(module));
840         }
841
842         backend = find_partition(data, req->op.rename.olddn, req);
843         backend2 = find_partition(data, req->op.rename.newdn, req);
844
845         if ((backend && !backend2) || (!backend && backend2)) {
846                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
847         }
848
849         if (backend != backend2) {
850                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
851                                        "Cannot rename from %s in %s to %s in %s: %s",
852                                        ldb_dn_get_linearized(req->op.rename.olddn),
853                                        ldb_dn_get_linearized(backend->ctrl->dn),
854                                        ldb_dn_get_linearized(req->op.rename.newdn),
855                                        ldb_dn_get_linearized(backend2->ctrl->dn),
856                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
857                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
858         }
859
860         return partition_replicate(module, req, req->op.rename.olddn);
861 }
862
863 /* start a transaction */
864 int partition_start_trans(struct ldb_module *module)
865 {
866         int i;
867         int ret;
868         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
869                                                               struct partition_private_data);
870         /* Look at base DN */
871         /* Figure out which partition it is under */
872         /* Skip the lot if 'data' isn't here yet (initialization) */
873         if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
874                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
875         }
876
877         /* This order must match that in prepare_commit() and read_lock() */
878         ret = ldb_next_start_trans(module);
879         if (ret != LDB_SUCCESS) {
880                 return ret;
881         }
882
883         ret = partition_reload_if_required(module, data, NULL);
884         if (ret != LDB_SUCCESS) {
885                 ldb_next_del_trans(module);
886                 return ret;
887         }
888
889         for (i=0; data && data->partitions && data->partitions[i]; i++) {
890                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
891                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
892                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
893                 }
894                 ret = ldb_next_start_trans(data->partitions[i]->module);
895                 if (ret != LDB_SUCCESS) {
896                         /* Back it out, if it fails on one */
897                         for (i--; i >= 0; i--) {
898                                 ldb_next_del_trans(data->partitions[i]->module);
899                         }
900                         ldb_next_del_trans(module);
901                         partition_metadata_del_trans(module);
902                         return ret;
903                 }
904         }
905
906         /*
907          * Because in prepare_commit this must come last, to ensure
908          * lock ordering we have to do this last here also 
909          */
910         ret = partition_metadata_start_trans(module);
911         if (ret != LDB_SUCCESS) {
912                 /* Back it out, if it fails on one */
913                 for (i--; i >= 0; i--) {
914                         ldb_next_del_trans(data->partitions[i]->module);
915                 }
916                 ldb_next_del_trans(module);
917                 return ret;
918         }
919
920         data->in_transaction++;
921
922         return LDB_SUCCESS;
923 }
924
925 /* prepare for a commit */
926 int partition_prepare_commit(struct ldb_module *module)
927 {
928         unsigned int i;
929         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
930                                                               struct partition_private_data);
931         int ret;
932
933         ret = ldb_next_prepare_commit(module);
934         if (ret != LDB_SUCCESS) {
935                 return ret;
936         }
937
938         for (i=0; data && data->partitions && data->partitions[i]; i++) {
939                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
940                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
941                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
942                 }
943                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
944                 if (ret != LDB_SUCCESS) {
945                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
946                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
947                                                ldb_errstring(ldb_module_get_ctx(module)));
948                         return ret;
949                 }
950         }
951
952         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
953                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
954         }
955
956         /* metadata prepare commit must come last, as other partitions could modify
957          * the database inside the prepare commit method of a module */
958         return partition_metadata_prepare_commit(module);
959 }
960
961
962 /* end a transaction */
963 int partition_end_trans(struct ldb_module *module)
964 {
965         int ret, ret2;
966         unsigned int i;
967         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
968                                                               struct partition_private_data);
969
970         ret = LDB_SUCCESS;
971
972         if (data->in_transaction == 0) {
973                 DEBUG(0,("partition end transaction mismatch\n"));
974                 ret = LDB_ERR_OPERATIONS_ERROR;
975         } else {
976                 data->in_transaction--;
977         }
978
979
980         for (i=0; data && data->partitions && data->partitions[i]; i++) {
981                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
982                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
983                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
984                 }
985                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
986                 if (ret2 != LDB_SUCCESS) {
987                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
988                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
989                                                ldb_errstring(ldb_module_get_ctx(module)));
990                         ret = ret2;
991                 }
992         }
993
994         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
995                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
996         }
997         ret2 = ldb_next_end_trans(module);
998         if (ret2 != LDB_SUCCESS) {
999                 ret = ret2;
1000         }
1001
1002         ret2 = partition_metadata_end_trans(module);
1003         if (ret2 != LDB_SUCCESS) {
1004                 ret = ret2;
1005         }
1006
1007         return ret;
1008 }
1009
1010 /* delete a transaction */
1011 int partition_del_trans(struct ldb_module *module)
1012 {
1013         int ret, final_ret = LDB_SUCCESS;
1014         unsigned int i;
1015         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1016                                                               struct partition_private_data);
1017
1018         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1019                 if (ldb_module_flags(ldb_module_get_ctx(module)) &
1020                     LDB_FLG_ENABLE_TRACING) {
1021                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
1022                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1023                 }
1024                 ret = ldb_next_del_trans(data->partitions[i]->module);
1025                 if (ret != LDB_SUCCESS) {
1026                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
1027                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1028                                                ldb_errstring(ldb_module_get_ctx(module)));
1029                         final_ret = ret;
1030                 }
1031         }       
1032
1033         if (data->in_transaction == 0) {
1034                 DEBUG(0,("partition del transaction mismatch\n"));
1035                 return ldb_operr(ldb_module_get_ctx(module));
1036         }
1037         data->in_transaction--;
1038
1039         if (ldb_module_flags(ldb_module_get_ctx(module)) &
1040             LDB_FLG_ENABLE_TRACING) {
1041                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1042         }
1043         ret = ldb_next_del_trans(module);
1044         if (ret != LDB_SUCCESS) {
1045                 final_ret = ret;
1046         }
1047
1048         ret = partition_metadata_del_trans(module);
1049         if (ret != LDB_SUCCESS) {
1050                 final_ret = ret;
1051         }
1052
1053         return final_ret;
1054 }
1055
1056 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
1057                                       uint64_t *seq_number,
1058                                       struct ldb_request *parent)
1059 {
1060         int ret;
1061         struct ldb_result *res;
1062         struct ldb_seqnum_request *tseq;
1063         struct ldb_seqnum_result *seqr;
1064
1065         tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1066         if (tseq == NULL) {
1067                 return ldb_oom(ldb_module_get_ctx(module));
1068         }
1069         tseq->type = LDB_SEQ_HIGHEST_SEQ;
1070         
1071         ret = dsdb_module_extended(module, tseq, &res,
1072                                    LDB_EXTENDED_SEQUENCE_NUMBER,
1073                                    tseq,
1074                                    DSDB_FLAG_NEXT_MODULE,
1075                                    parent);
1076         if (ret != LDB_SUCCESS) {
1077                 talloc_free(tseq);
1078                 return ret;
1079         }
1080         
1081         seqr = talloc_get_type_abort(res->extended->data,
1082                                      struct ldb_seqnum_result);
1083         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1084                 talloc_free(res);
1085                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1086                         "Primary backend in partition module returned a timestamp based seq");
1087         }
1088
1089         *seq_number = seqr->seq_num;
1090         talloc_free(tseq);
1091         return LDB_SUCCESS;
1092 }
1093
1094
1095 /*
1096  * Older version of sequence number as sum of sequence numbers for each partition
1097  */
1098 int partition_sequence_number_from_partitions(struct ldb_module *module,
1099                                               uint64_t *seqr)
1100 {
1101         int ret;
1102         unsigned int i;
1103         uint64_t seq_number = 0;
1104         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1105                                                               struct partition_private_data);
1106
1107         ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1108         if (ret != LDB_SUCCESS) {
1109                 return ret;
1110         }
1111         
1112         /* Skip the lot if 'data' isn't here yet (initialisation) */
1113         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1114                 struct ldb_seqnum_request *tseq;
1115                 struct ldb_seqnum_result *tseqr;
1116                 struct ldb_request *treq;
1117                 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1118                 if (res == NULL) {
1119                         return ldb_oom(ldb_module_get_ctx(module));
1120                 }
1121                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1122                 if (tseq == NULL) {
1123                         talloc_free(res);
1124                         return ldb_oom(ldb_module_get_ctx(module));
1125                 }
1126                 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1127                 
1128                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1129                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1130                                              tseq,
1131                                              NULL,
1132                                              res,
1133                                              ldb_extended_default_callback,
1134                                              NULL);
1135                 LDB_REQ_SET_LOCATION(treq);
1136                 if (ret != LDB_SUCCESS) {
1137                         talloc_free(res);
1138                         return ret;
1139                 }
1140                 
1141                 ret = partition_request(data->partitions[i]->module, treq);
1142                 if (ret != LDB_SUCCESS) {
1143                         talloc_free(res);
1144                         return ret;
1145                 }
1146                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1147                 if (ret != LDB_SUCCESS) {
1148                         talloc_free(res);
1149                         return ret;
1150                 }
1151                 tseqr = talloc_get_type(res->extended->data,
1152                                         struct ldb_seqnum_result);
1153                 seq_number += tseqr->seq_num;
1154                 talloc_free(res);
1155         }
1156
1157         *seqr = seq_number;
1158         return LDB_SUCCESS;
1159 }
1160
1161
1162 /*
1163  * Newer version of sequence number using metadata tdb
1164  */
1165 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1166 {
1167         struct ldb_extended *ext;
1168         struct ldb_seqnum_request *seq;
1169         struct ldb_seqnum_result *seqr;
1170         uint64_t seq_number;
1171         int ret;
1172
1173         seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1174         switch (seq->type) {
1175         case LDB_SEQ_NEXT:
1176                 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1177                 if (ret != LDB_SUCCESS) {
1178                         return ret;
1179                 }
1180                 break;
1181
1182         case LDB_SEQ_HIGHEST_SEQ:
1183                 ret = partition_metadata_sequence_number(module, &seq_number);
1184                 if (ret != LDB_SUCCESS) {
1185                         return ret;
1186                 }
1187                 break;
1188
1189         case LDB_SEQ_HIGHEST_TIMESTAMP:
1190                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1191                                         "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1192         }
1193
1194         ext = talloc_zero(req, struct ldb_extended);
1195         if (!ext) {
1196                 return ldb_module_oom(module);
1197         }
1198         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1199         if (seqr == NULL) {
1200                 talloc_free(ext);
1201                 return ldb_module_oom(module);
1202         }
1203         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1204         ext->data = seqr;
1205
1206         seqr->seq_num = seq_number;
1207         seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1208
1209         /* send request done */
1210         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1211 }
1212
1213 /* lock all the backends */
1214 int partition_read_lock(struct ldb_module *module)
1215 {
1216         int i;
1217         int ret;
1218         int ret2;
1219         struct ldb_context *ldb = ldb_module_get_ctx(module);
1220         struct partition_private_data *data = \
1221                 talloc_get_type(ldb_module_get_private(module),
1222                                 struct partition_private_data);
1223
1224         if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1225                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1226                           "partition_read_lock() -> (metadata partition)");
1227         }
1228
1229         /*
1230          * It is important to only do this for LOCK because:
1231          * - we don't want to unlock what we did not lock
1232          *
1233          * - we don't want to make a new lock on the sam.ldb
1234          *   (triggered inside this routine due to the seq num check)
1235          *   during an unlock phase as that will violate the lock
1236          *   ordering
1237          */
1238
1239         if (data == NULL) {
1240                 TALLOC_CTX *mem_ctx = talloc_new(module);
1241
1242                 data = talloc_zero(mem_ctx, struct partition_private_data);
1243                 if (data == NULL) {
1244                         talloc_free(mem_ctx);
1245                         return ldb_operr(ldb);
1246                 }
1247
1248                 /*
1249                  * When used from Samba4, this message is set by the
1250                  * samba4 module, as a fixed value not read from the
1251                  * DB.  This avoids listing modules in the DB
1252                  */
1253                 data->forced_module_msg = talloc_get_type(
1254                         ldb_get_opaque(ldb,
1255                                        DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1256                         struct ldb_message);
1257
1258                 ldb_module_set_private(module, talloc_steal(module,
1259                                                             data));
1260                 talloc_free(mem_ctx);
1261         }
1262
1263         /*
1264          * This will lock the metadata partition (sam.ldb) and
1265          * will also call event loops, so we do it before we
1266          * get the whole db lock.
1267          */
1268         ret = partition_reload_if_required(module, data, NULL);
1269         if (ret != LDB_SUCCESS) {
1270                 return ret;
1271         }
1272
1273         /*
1274          * This order must match that in prepare_commit(), start with
1275          * the metadata partition (sam.ldb) lock
1276          */
1277         ret = ldb_next_read_lock(module);
1278         if (ret != LDB_SUCCESS) {
1279                 ldb_debug_set(ldb,
1280                               LDB_DEBUG_FATAL,
1281                               "Failed to lock db: %s / %s for metadata partition",
1282                               ldb_errstring(ldb),
1283                               ldb_strerror(ret));
1284
1285                 return ret;
1286         }
1287
1288         /*
1289          * The metadata partition (sam.ldb) lock is not
1290          * enough to block another process in prepare_commit(),
1291          * because prepare_commit() is a no-op, if nothing
1292          * was changed in the specific backend.
1293          *
1294          * That means the following per partition locks are required.
1295          */
1296         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1297                 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1298                         ldb_debug(ldb, LDB_DEBUG_TRACE,
1299                                   "partition_read_lock() -> %s",
1300                                   ldb_dn_get_linearized(
1301                                           data->partitions[i]->ctrl->dn));
1302                 }
1303                 ret = ldb_next_read_lock(data->partitions[i]->module);
1304                 if (ret == LDB_SUCCESS) {
1305                         continue;
1306                 }
1307
1308                 ldb_debug_set(ldb,
1309                               LDB_DEBUG_FATAL,
1310                               "Failed to lock db: %s / %s for %s",
1311                               ldb_errstring(ldb),
1312                               ldb_strerror(ret),
1313                               ldb_dn_get_linearized(
1314                                       data->partitions[i]->ctrl->dn));
1315
1316                 /* Back it out, if it fails on one */
1317                 for (i--; i >= 0; i--) {
1318                         ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1319                         if (ret2 != LDB_SUCCESS) {
1320                                 ldb_debug(ldb,
1321                                           LDB_DEBUG_FATAL,
1322                                           "Failed to unlock db: %s / %s",
1323                                           ldb_errstring(ldb),
1324                                           ldb_strerror(ret2));
1325                         }
1326                 }
1327                 ret2 = ldb_next_read_unlock(module);
1328                 if (ret2 != LDB_SUCCESS) {
1329                         ldb_debug(ldb,
1330                                   LDB_DEBUG_FATAL,
1331                                   "Failed to unlock db: %s / %s",
1332                                   ldb_errstring(ldb),
1333                                   ldb_strerror(ret2));
1334                 }
1335                 return ret;
1336         }
1337
1338         return LDB_SUCCESS;
1339 }
1340
1341 /* unlock all the backends */
1342 int partition_read_unlock(struct ldb_module *module)
1343 {
1344         int i;
1345         int ret = LDB_SUCCESS;
1346         int ret2;
1347         struct ldb_context *ldb = ldb_module_get_ctx(module);
1348         struct partition_private_data *data = \
1349                 talloc_get_type(ldb_module_get_private(module),
1350                                 struct partition_private_data);
1351
1352         /*
1353          * This order must be similar to partition_{end,del}_trans()
1354          * the metadata partition (sam.ldb) unlock must be at the end.
1355          */
1356
1357         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1358                 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1359                         ldb_debug(ldb, LDB_DEBUG_TRACE,
1360                                   "partition_read_unlock() -> %s",
1361                                   ldb_dn_get_linearized(
1362                                           data->partitions[i]->ctrl->dn));
1363                 }
1364                 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1365                 if (ret2 != LDB_SUCCESS) {
1366                         ldb_debug_set(ldb,
1367                                       LDB_DEBUG_FATAL,
1368                                       "Failed to lock db: %s / %s for %s",
1369                                       ldb_errstring(ldb),
1370                                       ldb_strerror(ret),
1371                                       ldb_dn_get_linearized(
1372                                               data->partitions[i]->ctrl->dn));
1373
1374                         /*
1375                          * Don't overwrite the original failure code
1376                          * if there was one
1377                          */
1378                         if (ret == LDB_SUCCESS) {
1379                                 ret = ret2;
1380                         }
1381                 }
1382         }
1383
1384         if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1385                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1386                           "partition_read_unlock() -> (metadata partition)");
1387         }
1388
1389         ret2 = ldb_next_read_unlock(module);
1390         if (ret2 != LDB_SUCCESS) {
1391                 ldb_debug_set(ldb,
1392                               LDB_DEBUG_FATAL,
1393                               "Failed to unlock db: %s / %s for metadata partition",
1394                               ldb_errstring(ldb),
1395                               ldb_strerror(ret2));
1396
1397                 /*
1398                  * Don't overwrite the original failure code
1399                  * if there was one
1400                  */
1401                 if (ret == LDB_SUCCESS) {
1402                         ret = ret2;
1403                 }
1404         }
1405
1406         return ret;
1407 }
1408
1409 /* extended */
1410 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1411 {
1412         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1413                                                               struct partition_private_data);
1414         struct partition_context *ac;
1415         int ret;
1416
1417         /* if we aren't initialised yet go further */
1418         if (!data) {
1419                 return ldb_next_request(module, req);
1420         }
1421
1422         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1423                 /* Update the metadata.tdb to increment the schema version if needed*/
1424                 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1425                 ret = partition_metadata_inc_schema_sequence(module);
1426                 return ldb_module_done(req, NULL, NULL, ret);
1427         }
1428         
1429         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1430                 return partition_sequence_number(module, req);
1431         }
1432
1433         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1434                 return partition_create(module, req);
1435         }
1436
1437         /* 
1438          * as the extended operation has no dn
1439          * we need to send it to all partitions
1440          */
1441
1442         ac = partition_init_ctx(module, req);
1443         if (!ac) {
1444                 return ldb_operr(ldb_module_get_ctx(module));
1445         }
1446
1447         return partition_send_all(module, ac, req);
1448 }
1449
1450 static const struct ldb_module_ops ldb_partition_module_ops = {
1451         .name              = "partition",
1452         .init_context      = partition_init,
1453         .search            = partition_search,
1454         .add               = partition_add,
1455         .modify            = partition_modify,
1456         .del               = partition_delete,
1457         .rename            = partition_rename,
1458         .extended          = partition_extended,
1459         .start_transaction = partition_start_trans,
1460         .prepare_commit    = partition_prepare_commit,
1461         .end_transaction   = partition_end_trans,
1462         .del_transaction   = partition_del_trans,
1463         .read_lock         = partition_read_lock,
1464         .read_unlock       = partition_read_unlock
1465 };
1466
1467 int ldb_partition_module_init(const char *version)
1468 {
1469         LDB_MODULE_CHECK_VERSION(version);
1470         return ldb_register_module(&ldb_partition_module_ops);
1471 }