dsdb: Fix typos
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140
141         if (!ares) {
142                 return ldb_module_done(ac->req, NULL, NULL,
143                                         LDB_ERR_OPERATIONS_ERROR);
144         }
145
146         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148                 /* If we didn't fan this request out to mulitple partitions,
149                  * or this is an individual search result, we can
150                  * deterministically tell the caller what partition this was
151                  * written to (repl_meta_data likes to know) */
152                 ret = ldb_reply_add_control(ares,
153                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
154                                             false, partition_ctrl->data);
155                 if (ret != LDB_SUCCESS) {
156                         return ldb_module_done(ac->req, NULL, NULL,
157                                                ret);
158                 }
159         }
160
161         if (ares->error != LDB_SUCCESS) {
162                 return ldb_module_done(ac->req, ares->controls,
163                                         ares->response, ares->error);
164         }
165
166         switch (ares->type) {
167         case LDB_REPLY_REFERRAL:
168                 return ldb_module_send_referral(ac->req, ares->referral);
169
170         case LDB_REPLY_ENTRY:
171                 if (ac->req->operation != LDB_SEARCH) {
172                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
173                                 "partition_req_callback:"
174                                 " Unsupported reply type for this request");
175                         return ldb_module_done(ac->req, NULL, NULL,
176                                                 LDB_ERR_OPERATIONS_ERROR);
177                 }
178                 
179                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
180
181         case LDB_REPLY_DONE:
182                 if (ac->req->operation == LDB_EXTENDED) {
183                         /* FIXME: check for ares->response, replmd does not fill it ! */
184                         if (ares->response) {
185                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
187                                                           "partition_req_callback:"
188                                                           " Unknown extended reply, "
189                                                           "only supports START_TLS");
190                                         talloc_free(ares);
191                                         return ldb_module_done(ac->req, NULL, NULL,
192                                                                 LDB_ERR_OPERATIONS_ERROR);
193                                 }
194                         }
195                 }
196
197                 ac->finished_requests++;
198                 if (ac->finished_requests == ac->num_requests) {
199                         /* Send back referrals if they do exist (search ops) */
200                         if (ac->referrals != NULL) {
201                                 const char **ref;
202                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
203                                         ret = ldb_module_send_referral(ac->req,
204                                                                        talloc_strdup(ac->req, *ref));
205                                         if (ret != LDB_SUCCESS) {
206                                                 return ldb_module_done(ac->req, NULL, NULL,
207                                                                        ret);
208                                         }
209                                 }
210                         }
211
212                         /* this was the last one, call callback */
213                         return ldb_module_done(ac->req, ares->controls,
214                                                ares->response, 
215                                                ares->error);
216                 }
217
218                 /* not the last, now call the next one */
219                 module = ac->part_req[ac->finished_requests].module;
220                 nreq = ac->part_req[ac->finished_requests].req;
221
222                 ret = partition_request(module, nreq);
223                 if (ret != LDB_SUCCESS) {
224                         talloc_free(ares);
225                         return ldb_module_done(ac->req, NULL, NULL, ret);
226                 }
227
228                 break;
229         }
230
231         talloc_free(ares);
232         return LDB_SUCCESS;
233 }
234
235 static int partition_prep_request(struct partition_context *ac,
236                                   struct dsdb_partition *partition)
237 {
238         int ret;
239         struct ldb_request *req;
240         struct ldb_control *partition_ctrl = NULL;
241
242         ac->part_req = talloc_realloc(ac, ac->part_req,
243                                         struct part_request,
244                                         ac->num_requests + 1);
245         if (ac->part_req == NULL) {
246                 return ldb_oom(ldb_module_get_ctx(ac->module));
247         }
248
249         switch (ac->req->operation) {
250         case LDB_SEARCH:
251                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
252                                         ac->part_req,
253                                         ac->req->op.search.base,
254                                         ac->req->op.search.scope,
255                                         ac->req->op.search.tree,
256                                         ac->req->op.search.attrs,
257                                         ac->req->controls,
258                                         ac, partition_req_callback,
259                                         ac->req);
260                 LDB_REQ_SET_LOCATION(req);
261                 break;
262         case LDB_ADD:
263                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264                                         ac->req->op.add.message,
265                                         ac->req->controls,
266                                         ac, partition_req_callback,
267                                         ac->req);
268                 LDB_REQ_SET_LOCATION(req);
269                 break;
270         case LDB_MODIFY:
271                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272                                         ac->req->op.mod.message,
273                                         ac->req->controls,
274                                         ac, partition_req_callback,
275                                         ac->req);
276                 LDB_REQ_SET_LOCATION(req);
277                 break;
278         case LDB_DELETE:
279                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280                                         ac->req->op.del.dn,
281                                         ac->req->controls,
282                                         ac, partition_req_callback,
283                                         ac->req);
284                 LDB_REQ_SET_LOCATION(req);
285                 break;
286         case LDB_RENAME:
287                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288                                         ac->req->op.rename.olddn,
289                                         ac->req->op.rename.newdn,
290                                         ac->req->controls,
291                                         ac, partition_req_callback,
292                                         ac->req);
293                 LDB_REQ_SET_LOCATION(req);
294                 break;
295         case LDB_EXTENDED:
296                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
297                                         ac->part_req,
298                                         ac->req->op.extended.oid,
299                                         ac->req->op.extended.data,
300                                         ac->req->controls,
301                                         ac, partition_req_callback,
302                                         ac->req);
303                 LDB_REQ_SET_LOCATION(req);
304                 break;
305         default:
306                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307                                   "Unsupported request type!");
308                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
309         }
310
311         if (ret != LDB_SUCCESS) {
312                 return ret;
313         }
314
315         ac->part_req[ac->num_requests].req = req;
316
317         if (ac->req->controls) {
318                 /* Duplicate everything beside the current partition control */
319                 partition_ctrl = ldb_request_get_control(ac->req,
320                                                          DSDB_CONTROL_CURRENT_PARTITION_OID);
321                 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322                         return ldb_module_oom(ac->module);
323                 }
324         }
325
326         if (partition) {
327                 void *part_data = partition->ctrl;
328
329                 ac->part_req[ac->num_requests].module = partition->module;
330
331                 if (partition_ctrl != NULL) {
332                         if (partition_ctrl->data != NULL) {
333                                 part_data = partition_ctrl->data;
334                         }
335
336                         /*
337                          * If the provided current partition control is without
338                          * data then use the calculated one.
339                          */
340                         ret = ldb_request_add_control(req,
341                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
342                                                       false, part_data);
343                         if (ret != LDB_SUCCESS) {
344                                 return ret;
345                         }
346                 }
347
348                 if (req->operation == LDB_SEARCH) {
349                         /* If the search is for 'more' than this partition,
350                          * then change the basedn, so a remote LDAP server
351                          * doesn't object */
352                         if (ldb_dn_compare_base(partition->ctrl->dn,
353                                                 req->op.search.base) != 0) {
354                                 req->op.search.base = partition->ctrl->dn;
355                         }
356                 }
357
358         } else {
359                 /* make sure you put the module here, or
360                  * or ldb_next_request() will skip a module */
361                 ac->part_req[ac->num_requests].module = ac->module;
362         }
363
364         ac->num_requests++;
365
366         return LDB_SUCCESS;
367 }
368
369 static int partition_call_first(struct partition_context *ac)
370 {
371         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
372 }
373
374 /**
375  * Send a request down to all the partitions (but not the sam.ldb file)
376  */
377 static int partition_send_all(struct ldb_module *module, 
378                               struct partition_context *ac, 
379                               struct ldb_request *req) 
380 {
381         unsigned int i;
382         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383                                                               struct partition_private_data);
384         int ret;
385
386         for (i=0; data && data->partitions && data->partitions[i]; i++) {
387                 ret = partition_prep_request(ac, data->partitions[i]);
388                 if (ret != LDB_SUCCESS) {
389                         return ret;
390                 }
391         }
392
393         /* fire the first one */
394         return partition_call_first(ac);
395 }
396
397 struct partition_copy_context {
398         struct ldb_module *module;
399         struct partition_context *partition_context;
400         struct ldb_request *request;
401         struct ldb_dn *dn;
402 };
403
404 /*
405  * A special DN has been updated in the primary partition. Now propagate those
406  * changes to the remaining partitions.
407  *
408  * Note: that the operations are asynchronous and this function is called
409  *       from partition_copy_all_callback_handler in response to an async
410  *       callback.
411  */
412 static int partition_copy_all_callback_action(
413         struct ldb_module *module,
414         struct partition_context *ac,
415         struct ldb_request *req,
416         struct ldb_dn *dn)
417
418 {
419
420         unsigned int i;
421         struct partition_private_data *data =
422                 talloc_get_type(
423                         ldb_module_get_private(module),
424                         struct partition_private_data);
425         int search_ret;
426         struct ldb_result *res;
427         /* now fetch the resulting object, and then copy it to all the
428          * other partitions. We need this approach to cope with the
429          * partitions getting out of sync. If for example the
430          * @ATTRIBUTES object exists on one partition but not the
431          * others then just doing each of the partitions in turn will
432          * lead to an error
433          */
434         search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
435         if (search_ret != LDB_SUCCESS) {
436                 return search_ret;
437         }
438
439         /* now delete the object in the other partitions, if requried
440         */
441         if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
442                 for (i=0; data->partitions && data->partitions[i]; i++) {
443                         int pret;
444                         pret = dsdb_module_del(data->partitions[i]->module,
445                                                dn,
446                                                DSDB_FLAG_NEXT_MODULE,
447                                                req);
448                         if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
449                                 /* we should only get success or no
450                                    such object from the other partitions */
451                                 return pret;
452                         }
453                 }
454
455                 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
456         }
457
458         /* now add/modify in the other partitions */
459         for (i=0; data->partitions && data->partitions[i]; i++) {
460                 struct ldb_message *modify_msg = NULL;
461                 int pret;
462                 unsigned int el_idx;
463
464                 pret = dsdb_module_add(data->partitions[i]->module,
465                                        res->msgs[0],
466                                        DSDB_FLAG_NEXT_MODULE,
467                                        req);
468                 if (pret == LDB_SUCCESS) {
469                         continue;
470                 }
471
472                 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
473                         return pret;
474                 }
475
476                 modify_msg = ldb_msg_copy(req, res->msgs[0]);
477                 if (modify_msg == NULL) {
478                         return ldb_module_oom(module);
479                 }
480
481                 /*
482                  * mark all the message elements as
483                  * LDB_FLAG_MOD_REPLACE
484                  */
485                 for (el_idx=0;
486                      el_idx < modify_msg->num_elements;
487                      el_idx++) {
488                         modify_msg->elements[el_idx].flags
489                                 = LDB_FLAG_MOD_REPLACE;
490                 }
491
492                 if (req->operation == LDB_MODIFY) {
493                         const struct ldb_message *req_msg = req->op.mod.message;
494                         /*
495                          * mark elements to be removed, if there were
496                          * deleted entirely above we need to delete
497                          * them here too
498                          */
499                         for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
500                                 if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
501                                     || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
502                                         req_msg->elements[el_idx].num_values == 0)) {
503                                         if (ldb_msg_find_element(modify_msg,
504                                                                  req_msg->elements[el_idx].name) != NULL) {
505                                                 continue;
506                                         }
507                                         ldb_msg_add_empty(modify_msg,
508                                                           req_msg->elements[el_idx].name,
509                                                           LDB_FLAG_MOD_REPLACE,
510                                                           NULL);
511                                 }
512                         }
513                 }
514
515                 pret = dsdb_module_modify(data->partitions[i]->module,
516                                           modify_msg,
517                                           DSDB_FLAG_NEXT_MODULE,
518                                           req);
519
520                 if (pret != LDB_SUCCESS) {
521                         return pret;
522                 }
523         }
524
525         return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
526 }
527
528
529 /*
530  * @brief call back function for the ldb operations on special DN's.
531  *
532  * As the LDB operations are async, and we wish to use the result
533  * the operations, a callback needs to be registered to process the results
534  * of the LDB operations.
535  *
536  * @param req the ldb request
537  * @param res the result of the operation
538  *
539  * @return the LDB_STATUS
540  */
541 static int partition_copy_all_callback_handler(
542         struct ldb_request *req,
543         struct ldb_reply *ares)
544 {
545         struct partition_copy_context *ac = NULL;
546         int error = ares->error;
547
548         ac = talloc_get_type(
549                 req->context,
550                 struct partition_copy_context);
551
552         if (!ares) {
553                 return ldb_module_done(
554                         ac->request,
555                         NULL,
556                         NULL,
557                         LDB_ERR_OPERATIONS_ERROR);
558         }
559
560         /* pass on to the callback */
561         switch (ares->type) {
562         case LDB_REPLY_ENTRY:
563                 return ldb_module_send_entry(
564                         ac->request,
565                         ares->message,
566                         ares->controls);
567
568         case LDB_REPLY_REFERRAL:
569                 return ldb_module_send_referral(
570                         ac->request,
571                         ares->referral);
572
573         case LDB_REPLY_DONE:
574                 error = ares->error;
575                 if (error == LDB_SUCCESS) {
576                         error = partition_copy_all_callback_action(
577                                 ac->module,
578                                 ac->partition_context,
579                                 ac->request,
580                                 ac->dn);
581                 }
582                 return ldb_module_done(
583                         ac->request,
584                         ares->controls,
585                         ares->response,
586                         error);
587
588         default:
589                 /* Can't happen */
590                 return LDB_ERR_OPERATIONS_ERROR;
591         }
592 }
593
594 /**
595  * send an operation to the top partition, then copy the resulting
596  * object to all other partitions.
597  */
598 static int partition_copy_all(
599         struct ldb_module *module,
600         struct partition_context *partition_context,
601         struct ldb_request *req,
602         struct ldb_dn *dn)
603 {
604         struct ldb_request *new_req = NULL;
605         struct ldb_context *ldb = NULL;
606         struct partition_copy_context *context = NULL;
607
608         int ret;
609
610         ldb = ldb_module_get_ctx(module);
611
612         context = talloc_zero(req, struct partition_copy_context);
613         if (context == NULL) {
614                 return ldb_oom(ldb);
615         }
616         context->module = module;
617         context->request = req;
618         context->dn = dn;
619         context->partition_context = partition_context;
620
621         switch (req->operation) {
622         case LDB_ADD:
623                 ret = ldb_build_add_req(
624                         &new_req,
625                         ldb,
626                         req,
627                         req->op.add.message,
628                         req->controls,
629                         context,
630                         partition_copy_all_callback_handler,
631                         req);
632                 break;
633         case LDB_MODIFY:
634                 ret = ldb_build_mod_req(
635                         &new_req,
636                         ldb,
637                         req,
638                         req->op.mod.message,
639                         req->controls,
640                         context,
641                         partition_copy_all_callback_handler,
642                         req);
643                 break;
644         case LDB_DELETE:
645                 ret = ldb_build_del_req(
646                         &new_req,
647                         ldb,
648                         req,
649                         req->op.del.dn,
650                         req->controls,
651                         context,
652                         partition_copy_all_callback_handler,
653                         req);
654                 break;
655         case LDB_RENAME:
656                 ret = ldb_build_rename_req(
657                         &new_req,
658                         ldb,
659                         req,
660                         req->op.rename.olddn,
661                         req->op.rename.newdn,
662                         req->controls,
663                         context,
664                         partition_copy_all_callback_handler,
665                         req);
666                 break;
667         default:
668                 /*
669                  * Shouldn't happen.
670                  */
671                 ldb_debug(
672                         ldb,
673                         LDB_DEBUG_ERROR,
674                         "Unexpected operation type (%d)\n", req->operation);
675                 ret = LDB_ERR_OPERATIONS_ERROR;
676                 break;
677         }
678         if (ret != LDB_SUCCESS) {
679                 return ret;
680         }
681         return ldb_next_request(module, new_req);
682 }
683 /**
684  * Figure out which backend a request needs to be aimed at.  Some
685  * requests must be replicated to all backends
686  */
687 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
688 {
689         struct partition_context *ac;
690         unsigned int i;
691         int ret;
692         struct dsdb_partition *partition;
693         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
694                                                               struct partition_private_data);
695
696         /* if we aren't initialised yet go further */
697         if (!data || !data->partitions) {
698                 return ldb_next_request(module, req);
699         }
700
701         if (ldb_dn_is_special(dn)) {
702                 /* Is this a special DN, we need to replicate to every backend? */
703                 for (i=0; data->replicate && data->replicate[i]; i++) {
704                         if (ldb_dn_compare(data->replicate[i], 
705                                            dn) == 0) {
706                                 
707                                 ac = partition_init_ctx(module, req);
708                                 if (!ac) {
709                                         return ldb_operr(ldb_module_get_ctx(module));
710                                 }
711                                 
712                                 return partition_copy_all(module, ac, req, dn);
713                         }
714                 }
715         }
716
717         /* Otherwise, we need to find the partition to fire it to */
718
719         /* Find partition */
720         partition = find_partition(data, dn, req);
721         if (!partition) {
722                 /*
723                  * if we haven't found a matching partition
724                  * pass the request to the main ldb
725                  *
726                  * TODO: we should maybe return an error here
727                  *       if it's not a special dn
728                  */
729
730                 return ldb_next_request(module, req);
731         }
732
733         ac = partition_init_ctx(module, req);
734         if (!ac) {
735                 return ldb_operr(ldb_module_get_ctx(module));
736         }
737
738         /* we need to add a control but we never touch the original request */
739         ret = partition_prep_request(ac, partition);
740         if (ret != LDB_SUCCESS) {
741                 return ret;
742         }
743
744         /* fire the first one */
745         return partition_call_first(ac);
746 }
747
748 /* search */
749 static int partition_search(struct ldb_module *module, struct ldb_request *req)
750 {
751         struct ldb_control **saved_controls;
752         /* Find backend */
753         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
754                                                               struct partition_private_data);
755         struct partition_context *ac;
756         struct ldb_context *ldb;
757         struct loadparm_context *lp_ctx;
758
759         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
760         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
761         struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
762         
763         struct ldb_search_options_control *search_options = NULL;
764         struct dsdb_partition *p;
765         unsigned int i, j;
766         int ret;
767         bool domain_scope = false, phantom_root = false;
768
769         p = find_partition(data, NULL, req);
770         if (p != NULL) {
771                 /* the caller specified what partition they want the
772                  * search - just pass it on
773                  */
774                 return ldb_next_request(p->module, req);
775         }
776
777         /* Get back the search options from the search control, and mark it as
778          * non-critical (to make backends and also dcpromo happy).
779          */
780         if (search_control) {
781                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
782                 search_control->critical = 0;
783
784         }
785
786         /* Remove the "domain_scope" control, so we don't confuse a backend
787          * server */
788         if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
789                 return ldb_oom(ldb_module_get_ctx(module));
790         }
791
792         /* if we aren't initialised yet go further */
793         if (!data || !data->partitions) {
794                 return ldb_next_request(module, req);
795         }
796
797         /* Special DNs without specified partition should go further */
798         if (ldb_dn_is_special(req->op.search.base)) {
799                 return ldb_next_request(module, req);
800         }
801
802         /* Locate the options */
803         domain_scope = (search_options
804                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
805                 || domain_scope_control;
806         phantom_root = search_options
807                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
808
809         /* Remove handled options from the search control flag */
810         if (search_options) {
811                 search_options->search_options = search_options->search_options
812                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
813                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
814         }
815
816         ac = partition_init_ctx(module, req);
817         if (!ac) {
818                 return ldb_operr(ldb_module_get_ctx(module));
819         }
820
821         ldb = ldb_module_get_ctx(ac->module);
822         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
823                                                 struct loadparm_context);
824
825         /* Search from the base DN */
826         if (ldb_dn_is_null(req->op.search.base)) {
827                 if (!phantom_root) {
828                         return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
829                 }
830                 return partition_send_all(module, ac, req);
831         }
832
833         for (i=0; data->partitions[i]; i++) {
834                 bool match = false, stop = false;
835
836                 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
837                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
838                                                 req->op.search.base) == 0) {
839                                 /* base DN is in a partial replica
840                                    with the NO_GLOBAL_CATALOG
841                                    control. This partition is invisible */
842                                 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
843                                 continue;
844                         }
845                 }
846
847                 if (phantom_root) {
848                         /* Phantom root: Find all partitions under the
849                          * search base. We match if:
850                          *
851                          * 1) the DN we are looking for exactly matches a
852                          *    certain partition and always stop
853                          * 2) the DN we are looking for is a parent of certain
854                          *    partitions and it isn't a scope base search
855                          * 3) the DN we are looking for is a child of a certain
856                          *    partition and always stop
857                          *    - we don't need to go any further up in the
858                          *    hierarchy!
859                          */
860                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
861                                            req->op.search.base) == 0) {
862                                 match = true;
863                                 stop = true;
864                         }
865                         if (!match &&
866                             (ldb_dn_compare_base(req->op.search.base,
867                                                  data->partitions[i]->ctrl->dn) == 0 &&
868                              req->op.search.scope != LDB_SCOPE_BASE)) {
869                                 match = true;
870                         }
871                         if (!match &&
872                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
873                                                 req->op.search.base) == 0) {
874                                 match = true;
875                                 stop = true; /* note that this relies on partition ordering */
876                         }
877                 } else {
878                         /* Domain scope: Find all partitions under the search
879                          * base.
880                          *
881                          * We generate referral candidates if we haven't
882                          * specified the domain scope control, haven't a base
883                          * search* scope and the DN we are looking for is a real
884                          * predecessor of certain partitions. When a new
885                          * referral candidate is nearer to the DN than an
886                          * existing one delete the latter (we want to have only
887                          * the closest ones). When we checked this for all
888                          * candidates we have the final referrals.
889                          *
890                          * We match if the DN we are looking for is a child of
891                          * a certain partition or the partition
892                          * DN itself - we don't need to go any further
893                          * up in the hierarchy!
894                          */
895                         if ((!domain_scope) &&
896                             (req->op.search.scope != LDB_SCOPE_BASE) &&
897                             (ldb_dn_compare_base(req->op.search.base,
898                                                  data->partitions[i]->ctrl->dn) == 0) &&
899                             (ldb_dn_compare(req->op.search.base,
900                                             data->partitions[i]->ctrl->dn) != 0)) {
901                                 char *ref = talloc_asprintf(ac,
902                                                             "ldap://%s/%s%s",
903                                                             lpcfg_dnsdomain(lp_ctx),
904                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
905                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
906
907                                 if (ref == NULL) {
908                                         return ldb_oom(ldb);
909                                 }
910
911                                 /* Initialise the referrals list */
912                                 if (ac->referrals == NULL) {
913                                         char **l = str_list_make_empty(ac);
914                                         ac->referrals = discard_const_p(const char *, l);
915                                         if (ac->referrals == NULL) {
916                                                 return ldb_oom(ldb);
917                                         }
918                                 }
919
920                                 /* Check if the new referral candidate is
921                                  * closer to the base DN than already
922                                  * saved ones and delete the latters */
923                                 j = 0;
924                                 while (ac->referrals[j] != NULL) {
925                                         if (strstr(ac->referrals[j],
926                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
927                                                 str_list_remove(ac->referrals,
928                                                                 ac->referrals[j]);
929                                         } else {
930                                                 ++j;
931                                         }
932                                 }
933
934                                 /* Add our new candidate */
935                                 ac->referrals = str_list_add(ac->referrals, ref);
936
937                                 talloc_free(ref);
938
939                                 if (ac->referrals == NULL) {
940                                         return ldb_oom(ldb);
941                                 }
942                         }
943                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
944                                 match = true;
945                                 stop = true; /* note that this relies on partition ordering */
946                         }
947                 }
948
949                 if (match) {
950                         ret = partition_prep_request(ac, data->partitions[i]);
951                         if (ret != LDB_SUCCESS) {
952                                 return ret;
953                         }
954                 }
955
956                 if (stop) break;
957         }
958
959         /* Perhaps we didn't match any partitions. Try the main partition */
960         if (ac->num_requests == 0) {
961                 talloc_free(ac);
962                 return ldb_next_request(module, req);
963         }
964
965         /* fire the first one */
966         return partition_call_first(ac);
967 }
968
969 /* add */
970 static int partition_add(struct ldb_module *module, struct ldb_request *req)
971 {
972         return partition_replicate(module, req, req->op.add.message->dn);
973 }
974
975 /* modify */
976 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
977 {
978         return partition_replicate(module, req, req->op.mod.message->dn);
979 }
980
981 /* delete */
982 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
983 {
984         return partition_replicate(module, req, req->op.del.dn);
985 }
986
987 /* rename */
988 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
989 {
990         /* Find backend */
991         struct dsdb_partition *backend, *backend2;
992         
993         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
994                                                               struct partition_private_data);
995
996         /* Skip the lot if 'data' isn't here yet (initialisation) */
997         if (!data) {
998                 return ldb_operr(ldb_module_get_ctx(module));
999         }
1000
1001         backend = find_partition(data, req->op.rename.olddn, req);
1002         backend2 = find_partition(data, req->op.rename.newdn, req);
1003
1004         if ((backend && !backend2) || (!backend && backend2)) {
1005                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1006         }
1007
1008         if (backend != backend2) {
1009                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1010                                        "Cannot rename from %s in %s to %s in %s: %s",
1011                                        ldb_dn_get_linearized(req->op.rename.olddn),
1012                                        ldb_dn_get_linearized(backend->ctrl->dn),
1013                                        ldb_dn_get_linearized(req->op.rename.newdn),
1014                                        ldb_dn_get_linearized(backend2->ctrl->dn),
1015                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1016                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1017         }
1018
1019         return partition_replicate(module, req, req->op.rename.olddn);
1020 }
1021
1022 /* start a transaction */
1023 int partition_start_trans(struct ldb_module *module)
1024 {
1025         int i;
1026         int ret;
1027         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1028                                                               struct partition_private_data);
1029         /* Look at base DN */
1030         /* Figure out which partition it is under */
1031         /* Skip the lot if 'data' isn't here yet (initialization) */
1032         if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1033                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1034         }
1035
1036         /* This order must match that in prepare_commit() and read_lock() */
1037         ret = ldb_next_start_trans(module);
1038         if (ret != LDB_SUCCESS) {
1039                 return ret;
1040         }
1041
1042         ret = partition_reload_if_required(module, data, NULL);
1043         if (ret != LDB_SUCCESS) {
1044                 ldb_next_del_trans(module);
1045                 return ret;
1046         }
1047
1048         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1049                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1050                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1051                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1052                 }
1053                 ret = ldb_next_start_trans(data->partitions[i]->module);
1054                 if (ret != LDB_SUCCESS) {
1055                         /* Back it out, if it fails on one */
1056                         for (i--; i >= 0; i--) {
1057                                 ldb_next_del_trans(data->partitions[i]->module);
1058                         }
1059                         ldb_next_del_trans(module);
1060                         partition_metadata_del_trans(module);
1061                         return ret;
1062                 }
1063         }
1064
1065         /*
1066          * Because in prepare_commit this must come last, to ensure
1067          * lock ordering we have to do this last here also 
1068          */
1069         ret = partition_metadata_start_trans(module);
1070         if (ret != LDB_SUCCESS) {
1071                 /* Back it out, if it fails on one */
1072                 for (i--; i >= 0; i--) {
1073                         ldb_next_del_trans(data->partitions[i]->module);
1074                 }
1075                 ldb_next_del_trans(module);
1076                 return ret;
1077         }
1078
1079         data->in_transaction++;
1080
1081         return LDB_SUCCESS;
1082 }
1083
1084 /* prepare for a commit */
1085 int partition_prepare_commit(struct ldb_module *module)
1086 {
1087         unsigned int i;
1088         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1089                                                               struct partition_private_data);
1090         int ret;
1091
1092         ret = ldb_next_prepare_commit(module);
1093         if (ret != LDB_SUCCESS) {
1094                 return ret;
1095         }
1096
1097         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1098                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1099                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1100                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1101                 }
1102                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1103                 if (ret != LDB_SUCCESS) {
1104                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1105                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1106                                                ldb_errstring(ldb_module_get_ctx(module)));
1107                         return ret;
1108                 }
1109         }
1110
1111         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1112                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1113         }
1114
1115         /* metadata prepare commit must come last, as other partitions could modify
1116          * the database inside the prepare commit method of a module */
1117         return partition_metadata_prepare_commit(module);
1118 }
1119
1120
1121 /* end a transaction */
1122 int partition_end_trans(struct ldb_module *module)
1123 {
1124         int ret, ret2;
1125         unsigned int i;
1126         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1127                                                               struct partition_private_data);
1128
1129         ret = LDB_SUCCESS;
1130
1131         if (data->in_transaction == 0) {
1132                 DEBUG(0,("partition end transaction mismatch\n"));
1133                 ret = LDB_ERR_OPERATIONS_ERROR;
1134         } else {
1135                 data->in_transaction--;
1136         }
1137
1138
1139         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1140                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1141                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
1142                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1143                 }
1144                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
1145                 if (ret2 != LDB_SUCCESS) {
1146                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
1147                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1148                                                ldb_errstring(ldb_module_get_ctx(module)));
1149                         ret = ret2;
1150                 }
1151         }
1152
1153         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1154                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1155         }
1156         ret2 = ldb_next_end_trans(module);
1157         if (ret2 != LDB_SUCCESS) {
1158                 ret = ret2;
1159         }
1160
1161         ret2 = partition_metadata_end_trans(module);
1162         if (ret2 != LDB_SUCCESS) {
1163                 ret = ret2;
1164         }
1165
1166         return ret;
1167 }
1168
1169 /* delete a transaction */
1170 int partition_del_trans(struct ldb_module *module)
1171 {
1172         int ret, final_ret = LDB_SUCCESS;
1173         unsigned int i;
1174         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1175                                                               struct partition_private_data);
1176
1177         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1178                 if (ldb_module_flags(ldb_module_get_ctx(module)) &
1179                     LDB_FLG_ENABLE_TRACING) {
1180                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
1181                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1182                 }
1183                 ret = ldb_next_del_trans(data->partitions[i]->module);
1184                 if (ret != LDB_SUCCESS) {
1185                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
1186                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1187                                                ldb_errstring(ldb_module_get_ctx(module)));
1188                         final_ret = ret;
1189                 }
1190         }       
1191
1192         if (data->in_transaction == 0) {
1193                 DEBUG(0,("partition del transaction mismatch\n"));
1194                 return ldb_operr(ldb_module_get_ctx(module));
1195         }
1196         data->in_transaction--;
1197
1198         if (ldb_module_flags(ldb_module_get_ctx(module)) &
1199             LDB_FLG_ENABLE_TRACING) {
1200                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1201         }
1202         ret = ldb_next_del_trans(module);
1203         if (ret != LDB_SUCCESS) {
1204                 final_ret = ret;
1205         }
1206
1207         ret = partition_metadata_del_trans(module);
1208         if (ret != LDB_SUCCESS) {
1209                 final_ret = ret;
1210         }
1211
1212         return final_ret;
1213 }
1214
1215 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
1216                                       uint64_t *seq_number,
1217                                       struct ldb_request *parent)
1218 {
1219         int ret;
1220         struct ldb_result *res;
1221         struct ldb_seqnum_request *tseq;
1222         struct ldb_seqnum_result *seqr;
1223
1224         tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1225         if (tseq == NULL) {
1226                 return ldb_oom(ldb_module_get_ctx(module));
1227         }
1228         tseq->type = LDB_SEQ_HIGHEST_SEQ;
1229         
1230         ret = dsdb_module_extended(module, tseq, &res,
1231                                    LDB_EXTENDED_SEQUENCE_NUMBER,
1232                                    tseq,
1233                                    DSDB_FLAG_NEXT_MODULE,
1234                                    parent);
1235         if (ret != LDB_SUCCESS) {
1236                 talloc_free(tseq);
1237                 return ret;
1238         }
1239         
1240         seqr = talloc_get_type_abort(res->extended->data,
1241                                      struct ldb_seqnum_result);
1242         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1243                 talloc_free(res);
1244                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1245                         "Primary backend in partition module returned a timestamp based seq");
1246         }
1247
1248         *seq_number = seqr->seq_num;
1249         talloc_free(tseq);
1250         return LDB_SUCCESS;
1251 }
1252
1253
1254 /*
1255  * Older version of sequence number as sum of sequence numbers for each partition
1256  */
1257 int partition_sequence_number_from_partitions(struct ldb_module *module,
1258                                               uint64_t *seqr)
1259 {
1260         int ret;
1261         unsigned int i;
1262         uint64_t seq_number = 0;
1263         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1264                                                               struct partition_private_data);
1265
1266         ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1267         if (ret != LDB_SUCCESS) {
1268                 return ret;
1269         }
1270         
1271         /* Skip the lot if 'data' isn't here yet (initialisation) */
1272         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1273                 struct ldb_seqnum_request *tseq;
1274                 struct ldb_seqnum_result *tseqr;
1275                 struct ldb_request *treq;
1276                 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1277                 if (res == NULL) {
1278                         return ldb_oom(ldb_module_get_ctx(module));
1279                 }
1280                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1281                 if (tseq == NULL) {
1282                         talloc_free(res);
1283                         return ldb_oom(ldb_module_get_ctx(module));
1284                 }
1285                 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1286                 
1287                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1288                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1289                                              tseq,
1290                                              NULL,
1291                                              res,
1292                                              ldb_extended_default_callback,
1293                                              NULL);
1294                 LDB_REQ_SET_LOCATION(treq);
1295                 if (ret != LDB_SUCCESS) {
1296                         talloc_free(res);
1297                         return ret;
1298                 }
1299                 
1300                 ret = partition_request(data->partitions[i]->module, treq);
1301                 if (ret != LDB_SUCCESS) {
1302                         talloc_free(res);
1303                         return ret;
1304                 }
1305                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1306                 if (ret != LDB_SUCCESS) {
1307                         talloc_free(res);
1308                         return ret;
1309                 }
1310                 tseqr = talloc_get_type(res->extended->data,
1311                                         struct ldb_seqnum_result);
1312                 seq_number += tseqr->seq_num;
1313                 talloc_free(res);
1314         }
1315
1316         *seqr = seq_number;
1317         return LDB_SUCCESS;
1318 }
1319
1320
1321 /*
1322  * Newer version of sequence number using metadata tdb
1323  */
1324 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1325 {
1326         struct ldb_extended *ext;
1327         struct ldb_seqnum_request *seq;
1328         struct ldb_seqnum_result *seqr;
1329         uint64_t seq_number;
1330         int ret;
1331
1332         seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1333         switch (seq->type) {
1334         case LDB_SEQ_NEXT:
1335                 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1336                 if (ret != LDB_SUCCESS) {
1337                         return ret;
1338                 }
1339                 break;
1340
1341         case LDB_SEQ_HIGHEST_SEQ:
1342                 ret = partition_metadata_sequence_number(module, &seq_number);
1343                 if (ret != LDB_SUCCESS) {
1344                         return ret;
1345                 }
1346                 break;
1347
1348         case LDB_SEQ_HIGHEST_TIMESTAMP:
1349                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1350                                         "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1351         }
1352
1353         ext = talloc_zero(req, struct ldb_extended);
1354         if (!ext) {
1355                 return ldb_module_oom(module);
1356         }
1357         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1358         if (seqr == NULL) {
1359                 talloc_free(ext);
1360                 return ldb_module_oom(module);
1361         }
1362         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1363         ext->data = seqr;
1364
1365         seqr->seq_num = seq_number;
1366         seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1367
1368         /* send request done */
1369         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1370 }
1371
1372 /* lock all the backends */
1373 int partition_read_lock(struct ldb_module *module)
1374 {
1375         int i;
1376         int ret;
1377         int ret2;
1378         struct ldb_context *ldb = ldb_module_get_ctx(module);
1379         struct partition_private_data *data = \
1380                 talloc_get_type(ldb_module_get_private(module),
1381                                 struct partition_private_data);
1382
1383         if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1384                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1385                           "partition_read_lock() -> (metadata partition)");
1386         }
1387
1388         /*
1389          * It is important to only do this for LOCK because:
1390          * - we don't want to unlock what we did not lock
1391          *
1392          * - we don't want to make a new lock on the sam.ldb
1393          *   (triggered inside this routine due to the seq num check)
1394          *   during an unlock phase as that will violate the lock
1395          *   ordering
1396          */
1397
1398         if (data == NULL) {
1399                 TALLOC_CTX *mem_ctx = talloc_new(module);
1400
1401                 data = talloc_zero(mem_ctx, struct partition_private_data);
1402                 if (data == NULL) {
1403                         talloc_free(mem_ctx);
1404                         return ldb_operr(ldb);
1405                 }
1406
1407                 /*
1408                  * When used from Samba4, this message is set by the
1409                  * samba4 module, as a fixed value not read from the
1410                  * DB.  This avoids listing modules in the DB
1411                  */
1412                 data->forced_module_msg = talloc_get_type(
1413                         ldb_get_opaque(ldb,
1414                                        DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1415                         struct ldb_message);
1416
1417                 ldb_module_set_private(module, talloc_steal(module,
1418                                                             data));
1419                 talloc_free(mem_ctx);
1420         }
1421
1422         /*
1423          * This will lock the metadata partition (sam.ldb) and
1424          * will also call event loops, so we do it before we
1425          * get the whole db lock.
1426          */
1427         ret = partition_reload_if_required(module, data, NULL);
1428         if (ret != LDB_SUCCESS) {
1429                 return ret;
1430         }
1431
1432         /*
1433          * This order must match that in prepare_commit(), start with
1434          * the metadata partition (sam.ldb) lock
1435          */
1436         ret = ldb_next_read_lock(module);
1437         if (ret != LDB_SUCCESS) {
1438                 ldb_debug_set(ldb,
1439                               LDB_DEBUG_FATAL,
1440                               "Failed to lock db: %s / %s for metadata partition",
1441                               ldb_errstring(ldb),
1442                               ldb_strerror(ret));
1443
1444                 return ret;
1445         }
1446
1447         /*
1448          * The metadata partition (sam.ldb) lock is not
1449          * enough to block another process in prepare_commit(),
1450          * because prepare_commit() is a no-op, if nothing
1451          * was changed in the specific backend.
1452          *
1453          * That means the following per partition locks are required.
1454          */
1455         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1456                 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1457                         ldb_debug(ldb, LDB_DEBUG_TRACE,
1458                                   "partition_read_lock() -> %s",
1459                                   ldb_dn_get_linearized(
1460                                           data->partitions[i]->ctrl->dn));
1461                 }
1462                 ret = ldb_next_read_lock(data->partitions[i]->module);
1463                 if (ret == LDB_SUCCESS) {
1464                         continue;
1465                 }
1466
1467                 ldb_debug_set(ldb,
1468                               LDB_DEBUG_FATAL,
1469                               "Failed to lock db: %s / %s for %s",
1470                               ldb_errstring(ldb),
1471                               ldb_strerror(ret),
1472                               ldb_dn_get_linearized(
1473                                       data->partitions[i]->ctrl->dn));
1474
1475                 /* Back it out, if it fails on one */
1476                 for (i--; i >= 0; i--) {
1477                         ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1478                         if (ret2 != LDB_SUCCESS) {
1479                                 ldb_debug(ldb,
1480                                           LDB_DEBUG_FATAL,
1481                                           "Failed to unlock db: %s / %s",
1482                                           ldb_errstring(ldb),
1483                                           ldb_strerror(ret2));
1484                         }
1485                 }
1486                 ret2 = ldb_next_read_unlock(module);
1487                 if (ret2 != LDB_SUCCESS) {
1488                         ldb_debug(ldb,
1489                                   LDB_DEBUG_FATAL,
1490                                   "Failed to unlock db: %s / %s",
1491                                   ldb_errstring(ldb),
1492                                   ldb_strerror(ret2));
1493                 }
1494                 return ret;
1495         }
1496
1497         return LDB_SUCCESS;
1498 }
1499
1500 /* unlock all the backends */
1501 int partition_read_unlock(struct ldb_module *module)
1502 {
1503         int i;
1504         int ret = LDB_SUCCESS;
1505         int ret2;
1506         struct ldb_context *ldb = ldb_module_get_ctx(module);
1507         struct partition_private_data *data = \
1508                 talloc_get_type(ldb_module_get_private(module),
1509                                 struct partition_private_data);
1510
1511         /*
1512          * This order must be similar to partition_{end,del}_trans()
1513          * the metadata partition (sam.ldb) unlock must be at the end.
1514          */
1515
1516         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1517                 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1518                         ldb_debug(ldb, LDB_DEBUG_TRACE,
1519                                   "partition_read_unlock() -> %s",
1520                                   ldb_dn_get_linearized(
1521                                           data->partitions[i]->ctrl->dn));
1522                 }
1523                 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1524                 if (ret2 != LDB_SUCCESS) {
1525                         ldb_debug_set(ldb,
1526                                       LDB_DEBUG_FATAL,
1527                                       "Failed to lock db: %s / %s for %s",
1528                                       ldb_errstring(ldb),
1529                                       ldb_strerror(ret),
1530                                       ldb_dn_get_linearized(
1531                                               data->partitions[i]->ctrl->dn));
1532
1533                         /*
1534                          * Don't overwrite the original failure code
1535                          * if there was one
1536                          */
1537                         if (ret == LDB_SUCCESS) {
1538                                 ret = ret2;
1539                         }
1540                 }
1541         }
1542
1543         if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1544                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1545                           "partition_read_unlock() -> (metadata partition)");
1546         }
1547
1548         ret2 = ldb_next_read_unlock(module);
1549         if (ret2 != LDB_SUCCESS) {
1550                 ldb_debug_set(ldb,
1551                               LDB_DEBUG_FATAL,
1552                               "Failed to unlock db: %s / %s for metadata partition",
1553                               ldb_errstring(ldb),
1554                               ldb_strerror(ret2));
1555
1556                 /*
1557                  * Don't overwrite the original failure code
1558                  * if there was one
1559                  */
1560                 if (ret == LDB_SUCCESS) {
1561                         ret = ret2;
1562                 }
1563         }
1564
1565         return ret;
1566 }
1567
1568 /* extended */
1569 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1570 {
1571         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1572                                                               struct partition_private_data);
1573         struct partition_context *ac;
1574         int ret;
1575
1576         /* if we aren't initialised yet go further */
1577         if (!data) {
1578                 return ldb_next_request(module, req);
1579         }
1580
1581         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1582                 /* Update the metadata.tdb to increment the schema version if needed*/
1583                 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1584                 ret = partition_metadata_inc_schema_sequence(module);
1585                 return ldb_module_done(req, NULL, NULL, ret);
1586         }
1587         
1588         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1589                 return partition_sequence_number(module, req);
1590         }
1591
1592         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1593                 return partition_create(module, req);
1594         }
1595
1596         /* 
1597          * as the extended operation has no dn
1598          * we need to send it to all partitions
1599          */
1600
1601         ac = partition_init_ctx(module, req);
1602         if (!ac) {
1603                 return ldb_operr(ldb_module_get_ctx(module));
1604         }
1605
1606         return partition_send_all(module, ac, req);
1607 }
1608
1609 static const struct ldb_module_ops ldb_partition_module_ops = {
1610         .name              = "partition",
1611         .init_context      = partition_init,
1612         .search            = partition_search,
1613         .add               = partition_add,
1614         .modify            = partition_modify,
1615         .del               = partition_delete,
1616         .rename            = partition_rename,
1617         .extended          = partition_extended,
1618         .start_transaction = partition_start_trans,
1619         .prepare_commit    = partition_prepare_commit,
1620         .end_transaction   = partition_end_trans,
1621         .del_transaction   = partition_del_trans,
1622         .read_lock         = partition_read_lock,
1623         .read_unlock       = partition_read_unlock
1624 };
1625
1626 int ldb_partition_module_init(const char *version)
1627 {
1628         LDB_MODULE_CHECK_VERSION(version);
1629         return ldb_register_module(&ldb_partition_module_ops);
1630 }